Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33    EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34    FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35    HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42    VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52    convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56/// Range bounds for property-based range queries.
57struct RangeBounds<'a> {
58    min: Option<&'a Value>,
59    max: Option<&'a Value>,
60    min_inclusive: bool,
61    max_inclusive: bool,
62}
63
64/// Converts a logical plan to a physical operator tree for LPG stores.
65pub struct Planner {
66    /// The graph store (supports both read and write operations).
67    pub(super) store: Arc<dyn GraphStoreMut>,
68    /// Transaction manager for MVCC operations.
69    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70    /// Current transaction ID (if in a transaction).
71    pub(super) transaction_id: Option<TransactionId>,
72    /// Epoch to use for visibility checks.
73    pub(super) viewing_epoch: EpochId,
74    /// Counter for generating unique anonymous edge column names.
75    pub(super) anon_edge_counter: std::cell::Cell<u32>,
76    /// Whether to use factorized execution for multi-hop queries.
77    pub(super) factorized_execution: bool,
78    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
79    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
80    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81    /// Variables that hold edge IDs (from MATCH edge patterns).
82    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
83    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Optional constraint validator for schema enforcement during mutations.
85    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86    /// Catalog for user-defined procedure lookup.
87    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88    /// Shared parameter state for the currently planning correlated Apply.
89    /// Set by `plan_apply` before planning the inner operator, consumed by
90    /// `plan_operator` when encountering `ParameterScan`.
91    pub(super) correlated_param_state:
92        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93    /// Variables from variable-length expand patterns (group-list variables).
94    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
95    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96    /// When true, each physical operator is wrapped in `ProfiledOperator`.
97    profiling: std::cell::Cell<bool>,
98    /// Profile entries collected during planning (post-order).
99    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100    /// Optional write tracker for recording writes during mutations.
101    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
102}
103
104impl Planner {
105    /// Creates a new planner with the given store.
106    ///
107    /// This creates a planner without transaction context, using the current
108    /// epoch from the store for visibility.
109    #[must_use]
110    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
111        let epoch = store.current_epoch();
112        Self {
113            store,
114            transaction_manager: None,
115            transaction_id: None,
116            viewing_epoch: epoch,
117            anon_edge_counter: std::cell::Cell::new(0),
118            factorized_execution: true,
119            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
120            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
121            validator: None,
122            catalog: None,
123            correlated_param_state: std::cell::RefCell::new(None),
124            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
125            profiling: std::cell::Cell::new(false),
126            profile_entries: std::cell::RefCell::new(Vec::new()),
127            write_tracker: None,
128        }
129    }
130
131    /// Creates a new planner with transaction context for MVCC-aware planning.
132    #[must_use]
133    pub fn with_context(
134        store: Arc<dyn GraphStoreMut>,
135        transaction_manager: Arc<TransactionManager>,
136        transaction_id: Option<TransactionId>,
137        viewing_epoch: EpochId,
138    ) -> Self {
139        use crate::transaction::TransactionWriteTracker;
140
141        // Create write tracker when there's an active transaction
142        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
143            if transaction_id.is_some() {
144                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
145                    &transaction_manager,
146                ))))
147            } else {
148                None
149            };
150
151        Self {
152            store,
153            transaction_manager: Some(transaction_manager),
154            transaction_id,
155            viewing_epoch,
156            anon_edge_counter: std::cell::Cell::new(0),
157            factorized_execution: true,
158            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
159            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
160            validator: None,
161            catalog: None,
162            correlated_param_state: std::cell::RefCell::new(None),
163            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
164            profiling: std::cell::Cell::new(false),
165            profile_entries: std::cell::RefCell::new(Vec::new()),
166            write_tracker,
167        }
168    }
169
170    /// Returns the viewing epoch for this planner.
171    #[must_use]
172    pub fn viewing_epoch(&self) -> EpochId {
173        self.viewing_epoch
174    }
175
176    /// Returns the transaction ID for this planner, if any.
177    #[must_use]
178    pub fn transaction_id(&self) -> Option<TransactionId> {
179        self.transaction_id
180    }
181
182    /// Returns a reference to the transaction manager, if available.
183    #[must_use]
184    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
185        self.transaction_manager.as_ref()
186    }
187
188    /// Enables or disables factorized execution for multi-hop queries.
189    #[must_use]
190    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
191        self.factorized_execution = enabled;
192        self
193    }
194
195    /// Sets the constraint validator for schema enforcement during mutations.
196    #[must_use]
197    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
198        self.validator = Some(validator);
199        self
200    }
201
202    /// Sets the catalog for user-defined procedure lookup.
203    #[must_use]
204    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
205        self.catalog = Some(catalog);
206        self
207    }
208
209    /// Counts consecutive single-hop expand operations.
210    ///
211    /// Returns the count and the deepest non-expand operator (the base of the chain).
212    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
213        match op {
214            LogicalOperator::Expand(expand) => {
215                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
216
217                if is_single_hop {
218                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
219                    (inner_count + 1, base)
220                } else {
221                    (0, op)
222                }
223            }
224            _ => (0, op),
225        }
226    }
227
228    /// Collects expand operations from the outermost down to the base.
229    ///
230    /// Returns expands in order from innermost (base) to outermost.
231    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
232        let mut chain = Vec::new();
233        let mut current = op;
234
235        while let LogicalOperator::Expand(expand) = current {
236            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
237            if !is_single_hop {
238                break;
239            }
240            chain.push(expand);
241            current = &expand.input;
242        }
243
244        chain.reverse();
245        chain
246    }
247
248    /// Plans a logical plan into a physical operator.
249    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
250        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
251        Ok(PhysicalPlan {
252            operator,
253            columns,
254            adaptive_context: None,
255        })
256    }
257
258    /// Plans a logical plan with profiling: each physical operator is wrapped
259    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
260    /// collect row counts and timing. Returns the physical plan together with
261    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
262    /// items in post-order (children before parents).
263    pub fn plan_profiled(
264        &self,
265        logical_plan: &LogicalPlan,
266    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
267        self.profiling.set(true);
268        self.profile_entries.borrow_mut().clear();
269
270        let result = self.plan_operator(&logical_plan.root);
271
272        self.profiling.set(false);
273        let (operator, columns) = result?;
274        let entries = self.profile_entries.borrow_mut().drain(..).collect();
275
276        Ok((
277            PhysicalPlan {
278                operator,
279                columns,
280                adaptive_context: None,
281            },
282            entries,
283        ))
284    }
285
286    /// Plans a logical plan with adaptive execution support.
287    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
288        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
289
290        let mut adaptive_context = AdaptiveContext::new();
291        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
292
293        Ok(PhysicalPlan {
294            operator,
295            columns,
296            adaptive_context: Some(adaptive_context),
297        })
298    }
299
300    /// Collects cardinality estimates from the logical plan into an adaptive context.
301    fn collect_cardinality_estimates(
302        &self,
303        op: &LogicalOperator,
304        ctx: &mut AdaptiveContext,
305        depth: usize,
306    ) {
307        match op {
308            LogicalOperator::NodeScan(scan) => {
309                let estimate = if let Some(label) = &scan.label {
310                    self.store.nodes_by_label(label).len() as f64
311                } else {
312                    self.store.node_count() as f64
313                };
314                let id = format!("scan_{}", scan.variable);
315                ctx.set_estimate(&id, estimate);
316
317                if let Some(input) = &scan.input {
318                    self.collect_cardinality_estimates(input, ctx, depth + 1);
319                }
320            }
321            LogicalOperator::Filter(filter) => {
322                let input_estimate = self.estimate_cardinality(&filter.input);
323                let estimate = input_estimate * 0.3;
324                let id = format!("filter_{depth}");
325                ctx.set_estimate(&id, estimate);
326
327                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
328            }
329            LogicalOperator::Expand(expand) => {
330                let input_estimate = self.estimate_cardinality(&expand.input);
331                let stats = self.store.statistics();
332                let avg_degree = self.estimate_expand_degree(&stats, expand);
333                let estimate = input_estimate * avg_degree;
334                let id = format!("expand_{}", expand.to_variable);
335                ctx.set_estimate(&id, estimate);
336
337                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
338            }
339            LogicalOperator::Join(join) => {
340                let left_est = self.estimate_cardinality(&join.left);
341                let right_est = self.estimate_cardinality(&join.right);
342                let estimate = (left_est * right_est).sqrt();
343                let id = format!("join_{depth}");
344                ctx.set_estimate(&id, estimate);
345
346                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
347                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
348            }
349            LogicalOperator::Aggregate(agg) => {
350                let input_estimate = self.estimate_cardinality(&agg.input);
351                let estimate = if agg.group_by.is_empty() {
352                    1.0
353                } else {
354                    (input_estimate * 0.1).max(1.0)
355                };
356                let id = format!("aggregate_{depth}");
357                ctx.set_estimate(&id, estimate);
358
359                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
360            }
361            LogicalOperator::Distinct(distinct) => {
362                let input_estimate = self.estimate_cardinality(&distinct.input);
363                let estimate = (input_estimate * 0.5).max(1.0);
364                let id = format!("distinct_{depth}");
365                ctx.set_estimate(&id, estimate);
366
367                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
368            }
369            LogicalOperator::Return(ret) => {
370                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
371            }
372            LogicalOperator::Limit(limit) => {
373                let input_estimate = self.estimate_cardinality(&limit.input);
374                let estimate = (input_estimate).min(limit.count.estimate());
375                let id = format!("limit_{depth}");
376                ctx.set_estimate(&id, estimate);
377
378                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
379            }
380            LogicalOperator::Skip(skip) => {
381                let input_estimate = self.estimate_cardinality(&skip.input);
382                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
383                let id = format!("skip_{depth}");
384                ctx.set_estimate(&id, estimate);
385
386                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
387            }
388            LogicalOperator::Sort(sort) => {
389                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
390            }
391            LogicalOperator::Union(union) => {
392                let estimate: f64 = union
393                    .inputs
394                    .iter()
395                    .map(|input| self.estimate_cardinality(input))
396                    .sum();
397                let id = format!("union_{depth}");
398                ctx.set_estimate(&id, estimate);
399
400                for input in &union.inputs {
401                    self.collect_cardinality_estimates(input, ctx, depth + 1);
402                }
403            }
404            _ => {
405                // For other operators, try to recurse into known input patterns
406            }
407        }
408    }
409
410    /// Estimates cardinality for a logical operator subtree.
411    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
412        match op {
413            LogicalOperator::NodeScan(scan) => {
414                if let Some(label) = &scan.label {
415                    self.store.nodes_by_label(label).len() as f64
416                } else {
417                    self.store.node_count() as f64
418                }
419            }
420            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
421            LogicalOperator::Expand(expand) => {
422                let stats = self.store.statistics();
423                let avg_degree = self.estimate_expand_degree(&stats, expand);
424                self.estimate_cardinality(&expand.input) * avg_degree
425            }
426            LogicalOperator::Join(join) => {
427                let left = self.estimate_cardinality(&join.left);
428                let right = self.estimate_cardinality(&join.right);
429                (left * right).sqrt()
430            }
431            LogicalOperator::Aggregate(agg) => {
432                if agg.group_by.is_empty() {
433                    1.0
434                } else {
435                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
436                }
437            }
438            LogicalOperator::Distinct(distinct) => {
439                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
440            }
441            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
442            LogicalOperator::Limit(limit) => self
443                .estimate_cardinality(&limit.input)
444                .min(limit.count.estimate()),
445            LogicalOperator::Skip(skip) => {
446                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
447            }
448            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
449            LogicalOperator::Union(union) => union
450                .inputs
451                .iter()
452                .map(|input| self.estimate_cardinality(input))
453                .sum(),
454            LogicalOperator::Except(except) => {
455                let left = self.estimate_cardinality(&except.left);
456                let right = self.estimate_cardinality(&except.right);
457                (left - right).max(0.0)
458            }
459            LogicalOperator::Intersect(intersect) => {
460                let left = self.estimate_cardinality(&intersect.left);
461                let right = self.estimate_cardinality(&intersect.right);
462                left.min(right)
463            }
464            LogicalOperator::Otherwise(otherwise) => self
465                .estimate_cardinality(&otherwise.left)
466                .max(self.estimate_cardinality(&otherwise.right)),
467            _ => 1000.0,
468        }
469    }
470
471    /// Estimates the average edge degree for an expand operation using store statistics.
472    fn estimate_expand_degree(
473        &self,
474        stats: &grafeo_core::statistics::Statistics,
475        expand: &ExpandOp,
476    ) -> f64 {
477        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
478        if expand.edge_types.len() == 1 {
479            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
480        } else if stats.total_nodes > 0 {
481            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
482        } else {
483            10.0
484        }
485    }
486
487    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
488    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
489    fn maybe_profile(
490        &self,
491        result: Result<(Box<dyn Operator>, Vec<String>)>,
492        op: &LogicalOperator,
493    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
494        if self.profiling.get() {
495            let (physical, columns) = result?;
496            let (entry, stats) =
497                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
498            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
499            self.profile_entries.borrow_mut().push(entry);
500            Ok((Box::new(profiled), columns))
501        } else {
502            result
503        }
504    }
505
506    /// Plans a single logical operator.
507    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
508        let result = match op {
509            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
510            LogicalOperator::Expand(expand) => {
511                if self.factorized_execution {
512                    let (chain_len, _base) = Self::count_expand_chain(op);
513                    if chain_len >= 2 {
514                        return self.maybe_profile(self.plan_expand_chain(op), op);
515                    }
516                }
517                self.plan_expand(expand)
518            }
519            LogicalOperator::Return(ret) => self.plan_return(ret),
520            LogicalOperator::Filter(filter) => self.plan_filter(filter),
521            LogicalOperator::Project(project) => self.plan_project(project),
522            LogicalOperator::Limit(limit) => self.plan_limit(limit),
523            LogicalOperator::Skip(skip) => self.plan_skip(skip),
524            LogicalOperator::Sort(sort) => self.plan_sort(sort),
525            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
526            LogicalOperator::Join(join) => self.plan_join(join),
527            LogicalOperator::Union(union) => self.plan_union(union),
528            LogicalOperator::Except(except) => self.plan_except(except),
529            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
530            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
531            LogicalOperator::Apply(apply) => self.plan_apply(apply),
532            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
533            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
534            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
535            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
536            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
537            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
538            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
539            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
540            LogicalOperator::Merge(merge) => self.plan_merge(merge),
541            LogicalOperator::MergeRelationship(merge_rel) => {
542                self.plan_merge_relationship(merge_rel)
543            }
544            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
545            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
546            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
547            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
548            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
549            #[cfg(feature = "algos")]
550            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
551            #[cfg(not(feature = "algos"))]
552            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
553                "CALL procedures require the 'algos' feature".to_string(),
554            )),
555            LogicalOperator::ParameterScan(_param_scan) => {
556                let state = self
557                    .correlated_param_state
558                    .borrow()
559                    .clone()
560                    .ok_or_else(|| {
561                        Error::Internal(
562                            "ParameterScan without correlated Apply context".to_string(),
563                        )
564                    })?;
565                // Use the actual column names from the ParameterState (which may
566                // have been expanded from "*" to real variable names in plan_apply)
567                let columns = state.columns.clone();
568                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
569                Ok((operator, columns))
570            }
571            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
572            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
573            LogicalOperator::LoadData(load) => {
574                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
575                    load.path.clone(),
576                    load.format,
577                    load.with_headers,
578                    load.field_terminator,
579                    load.variable.clone(),
580                ));
581                Ok((operator, vec![load.variable.clone()]))
582            }
583            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
584            LogicalOperator::VectorScan(_) => Err(Error::Internal(
585                "VectorScan requires vector-index feature".to_string(),
586            )),
587            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
588                "VectorJoin requires vector-index feature".to_string(),
589            )),
590            _ => Err(Error::Internal(format!(
591                "Unsupported operator: {:?}",
592                std::mem::discriminant(op)
593            ))),
594        };
595        self.maybe_profile(result, op)
596    }
597
598    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
599    fn plan_horizontal_aggregate(
600        &self,
601        ha: &HorizontalAggregateOp,
602    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
603        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
604
605        let list_col_idx = child_columns
606            .iter()
607            .position(|c| c == &ha.list_column)
608            .ok_or_else(|| {
609                Error::Internal(format!(
610                    "HorizontalAggregate list column '{}' not found in {:?}",
611                    ha.list_column, child_columns
612                ))
613            })?;
614
615        let entity_kind = match ha.entity_kind {
616            LogicalEntityKind::Edge => EntityKind::Edge,
617            LogicalEntityKind::Node => EntityKind::Node,
618        };
619
620        let function = convert_aggregate_function(ha.function);
621        let input_column_count = child_columns.len();
622
623        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
624            child_op,
625            list_col_idx,
626            entity_kind,
627            function,
628            ha.property.clone(),
629            Arc::clone(&self.store) as Arc<dyn GraphStore>,
630            input_column_count,
631        ));
632
633        let mut columns = child_columns;
634        columns.push(ha.alias.clone());
635        // Mark the result as a scalar column
636        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
637
638        Ok((operator, columns))
639    }
640
641    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
642    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
643        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
644        let key_idx = child_columns
645            .iter()
646            .position(|c| c == &mc.key_var)
647            .ok_or_else(|| {
648                Error::Internal(format!(
649                    "MapCollect key '{}' not in columns {:?}",
650                    mc.key_var, child_columns
651                ))
652            })?;
653        let value_idx = child_columns
654            .iter()
655            .position(|c| c == &mc.value_var)
656            .ok_or_else(|| {
657                Error::Internal(format!(
658                    "MapCollect value '{}' not in columns {:?}",
659                    mc.value_var, child_columns
660                ))
661            })?;
662        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
663        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
664        Ok((operator, vec![mc.alias.clone()]))
665    }
666}
667
668/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
669#[cfg(feature = "algos")]
670struct StaticResultOperator {
671    rows: Vec<Vec<Value>>,
672    column_indices: Vec<usize>,
673    row_index: usize,
674}
675
676#[cfg(feature = "algos")]
677impl Operator for StaticResultOperator {
678    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
679        use grafeo_core::execution::DataChunk;
680
681        if self.row_index >= self.rows.len() {
682            return Ok(None);
683        }
684
685        let remaining = self.rows.len() - self.row_index;
686        let chunk_rows = remaining.min(1024);
687        let col_count = self.column_indices.len();
688
689        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
690        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
691
692        for row_offset in 0..chunk_rows {
693            let row = &self.rows[self.row_index + row_offset];
694            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
695                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
696                if let Some(col) = chunk.column_mut(col_idx) {
697                    col.push_value(value);
698                }
699            }
700        }
701        chunk.set_count(chunk_rows);
702
703        self.row_index += chunk_rows;
704        Ok(Some(chunk))
705    }
706
707    fn reset(&mut self) {
708        self.row_index = 0;
709    }
710
711    fn name(&self) -> &'static str {
712        "StaticResult"
713    }
714}
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719    use crate::query::plan::{
720        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
721        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
722        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
723        SkipOp as LogicalSkipOp, SortKey, SortOp,
724    };
725    use grafeo_common::types::Value;
726    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
727    use grafeo_core::graph::GraphStoreMut;
728    use grafeo_core::graph::lpg::LpgStore;
729
730    fn create_test_store() -> Arc<dyn GraphStoreMut> {
731        let store = Arc::new(LpgStore::new().unwrap());
732        store.create_node(&["Person"]);
733        store.create_node(&["Person"]);
734        store.create_node(&["Company"]);
735        store
736    }
737
738    // ==================== Simple Scan Tests ====================
739
740    #[test]
741    fn test_plan_simple_scan() {
742        let store = create_test_store();
743        let planner = Planner::new(store);
744
745        // MATCH (n:Person) RETURN n
746        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
747            items: vec![ReturnItem {
748                expression: LogicalExpression::Variable("n".to_string()),
749                alias: None,
750            }],
751            distinct: false,
752            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
753                variable: "n".to_string(),
754                label: Some("Person".to_string()),
755                input: None,
756            })),
757        }));
758
759        let physical = planner.plan(&logical).unwrap();
760        assert_eq!(physical.columns(), &["n"]);
761    }
762
763    #[test]
764    fn test_plan_scan_without_label() {
765        let store = create_test_store();
766        let planner = Planner::new(store);
767
768        // MATCH (n) RETURN n
769        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
770            items: vec![ReturnItem {
771                expression: LogicalExpression::Variable("n".to_string()),
772                alias: None,
773            }],
774            distinct: false,
775            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
776                variable: "n".to_string(),
777                label: None,
778                input: None,
779            })),
780        }));
781
782        let physical = planner.plan(&logical).unwrap();
783        assert_eq!(physical.columns(), &["n"]);
784    }
785
786    #[test]
787    fn test_plan_return_with_alias() {
788        let store = create_test_store();
789        let planner = Planner::new(store);
790
791        // MATCH (n:Person) RETURN n AS person
792        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
793            items: vec![ReturnItem {
794                expression: LogicalExpression::Variable("n".to_string()),
795                alias: Some("person".to_string()),
796            }],
797            distinct: false,
798            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
799                variable: "n".to_string(),
800                label: Some("Person".to_string()),
801                input: None,
802            })),
803        }));
804
805        let physical = planner.plan(&logical).unwrap();
806        assert_eq!(physical.columns(), &["person"]);
807    }
808
809    #[test]
810    fn test_plan_return_property() {
811        let store = create_test_store();
812        let planner = Planner::new(store);
813
814        // MATCH (n:Person) RETURN n.name
815        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
816            items: vec![ReturnItem {
817                expression: LogicalExpression::Property {
818                    variable: "n".to_string(),
819                    property: "name".to_string(),
820                },
821                alias: None,
822            }],
823            distinct: false,
824            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
825                variable: "n".to_string(),
826                label: Some("Person".to_string()),
827                input: None,
828            })),
829        }));
830
831        let physical = planner.plan(&logical).unwrap();
832        assert_eq!(physical.columns(), &["n.name"]);
833    }
834
835    #[test]
836    fn test_plan_return_literal() {
837        let store = create_test_store();
838        let planner = Planner::new(store);
839
840        // MATCH (n) RETURN 42 AS answer
841        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
842            items: vec![ReturnItem {
843                expression: LogicalExpression::Literal(Value::Int64(42)),
844                alias: Some("answer".to_string()),
845            }],
846            distinct: false,
847            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
848                variable: "n".to_string(),
849                label: None,
850                input: None,
851            })),
852        }));
853
854        let physical = planner.plan(&logical).unwrap();
855        assert_eq!(physical.columns(), &["answer"]);
856    }
857
858    // ==================== Filter Tests ====================
859
860    #[test]
861    fn test_plan_filter_equality() {
862        let store = create_test_store();
863        let planner = Planner::new(store);
864
865        // MATCH (n:Person) WHERE n.age = 30 RETURN n
866        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
867            items: vec![ReturnItem {
868                expression: LogicalExpression::Variable("n".to_string()),
869                alias: None,
870            }],
871            distinct: false,
872            input: Box::new(LogicalOperator::Filter(FilterOp {
873                predicate: LogicalExpression::Binary {
874                    left: Box::new(LogicalExpression::Property {
875                        variable: "n".to_string(),
876                        property: "age".to_string(),
877                    }),
878                    op: BinaryOp::Eq,
879                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
880                },
881                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
882                    variable: "n".to_string(),
883                    label: Some("Person".to_string()),
884                    input: None,
885                })),
886                pushdown_hint: None,
887            })),
888        }));
889
890        let physical = planner.plan(&logical).unwrap();
891        assert_eq!(physical.columns(), &["n"]);
892    }
893
894    #[test]
895    fn test_plan_filter_compound_and() {
896        let store = create_test_store();
897        let planner = Planner::new(store);
898
899        // WHERE n.age > 20 AND n.age < 40
900        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
901            items: vec![ReturnItem {
902                expression: LogicalExpression::Variable("n".to_string()),
903                alias: None,
904            }],
905            distinct: false,
906            input: Box::new(LogicalOperator::Filter(FilterOp {
907                predicate: LogicalExpression::Binary {
908                    left: Box::new(LogicalExpression::Binary {
909                        left: Box::new(LogicalExpression::Property {
910                            variable: "n".to_string(),
911                            property: "age".to_string(),
912                        }),
913                        op: BinaryOp::Gt,
914                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
915                    }),
916                    op: BinaryOp::And,
917                    right: Box::new(LogicalExpression::Binary {
918                        left: Box::new(LogicalExpression::Property {
919                            variable: "n".to_string(),
920                            property: "age".to_string(),
921                        }),
922                        op: BinaryOp::Lt,
923                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
924                    }),
925                },
926                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
927                    variable: "n".to_string(),
928                    label: None,
929                    input: None,
930                })),
931                pushdown_hint: None,
932            })),
933        }));
934
935        let physical = planner.plan(&logical).unwrap();
936        assert_eq!(physical.columns(), &["n"]);
937    }
938
939    #[test]
940    fn test_plan_filter_unary_not() {
941        let store = create_test_store();
942        let planner = Planner::new(store);
943
944        // WHERE NOT n.active
945        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
946            items: vec![ReturnItem {
947                expression: LogicalExpression::Variable("n".to_string()),
948                alias: None,
949            }],
950            distinct: false,
951            input: Box::new(LogicalOperator::Filter(FilterOp {
952                predicate: LogicalExpression::Unary {
953                    op: UnaryOp::Not,
954                    operand: Box::new(LogicalExpression::Property {
955                        variable: "n".to_string(),
956                        property: "active".to_string(),
957                    }),
958                },
959                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
960                    variable: "n".to_string(),
961                    label: None,
962                    input: None,
963                })),
964                pushdown_hint: None,
965            })),
966        }));
967
968        let physical = planner.plan(&logical).unwrap();
969        assert_eq!(physical.columns(), &["n"]);
970    }
971
972    #[test]
973    fn test_plan_filter_is_null() {
974        let store = create_test_store();
975        let planner = Planner::new(store);
976
977        // WHERE n.email IS NULL
978        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
979            items: vec![ReturnItem {
980                expression: LogicalExpression::Variable("n".to_string()),
981                alias: None,
982            }],
983            distinct: false,
984            input: Box::new(LogicalOperator::Filter(FilterOp {
985                predicate: LogicalExpression::Unary {
986                    op: UnaryOp::IsNull,
987                    operand: Box::new(LogicalExpression::Property {
988                        variable: "n".to_string(),
989                        property: "email".to_string(),
990                    }),
991                },
992                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
993                    variable: "n".to_string(),
994                    label: None,
995                    input: None,
996                })),
997                pushdown_hint: None,
998            })),
999        }));
1000
1001        let physical = planner.plan(&logical).unwrap();
1002        assert_eq!(physical.columns(), &["n"]);
1003    }
1004
1005    #[test]
1006    fn test_plan_filter_function_call() {
1007        let store = create_test_store();
1008        let planner = Planner::new(store);
1009
1010        // WHERE size(n.friends) > 0
1011        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1012            items: vec![ReturnItem {
1013                expression: LogicalExpression::Variable("n".to_string()),
1014                alias: None,
1015            }],
1016            distinct: false,
1017            input: Box::new(LogicalOperator::Filter(FilterOp {
1018                predicate: LogicalExpression::Binary {
1019                    left: Box::new(LogicalExpression::FunctionCall {
1020                        name: "size".to_string(),
1021                        args: vec![LogicalExpression::Property {
1022                            variable: "n".to_string(),
1023                            property: "friends".to_string(),
1024                        }],
1025                        distinct: false,
1026                    }),
1027                    op: BinaryOp::Gt,
1028                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1029                },
1030                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1031                    variable: "n".to_string(),
1032                    label: None,
1033                    input: None,
1034                })),
1035                pushdown_hint: None,
1036            })),
1037        }));
1038
1039        let physical = planner.plan(&logical).unwrap();
1040        assert_eq!(physical.columns(), &["n"]);
1041    }
1042
1043    // ==================== Expand Tests ====================
1044
1045    #[test]
1046    fn test_plan_expand_outgoing() {
1047        let store = create_test_store();
1048        let planner = Planner::new(store);
1049
1050        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1051        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1052            items: vec![
1053                ReturnItem {
1054                    expression: LogicalExpression::Variable("a".to_string()),
1055                    alias: None,
1056                },
1057                ReturnItem {
1058                    expression: LogicalExpression::Variable("b".to_string()),
1059                    alias: None,
1060                },
1061            ],
1062            distinct: false,
1063            input: Box::new(LogicalOperator::Expand(ExpandOp {
1064                from_variable: "a".to_string(),
1065                to_variable: "b".to_string(),
1066                edge_variable: None,
1067                direction: ExpandDirection::Outgoing,
1068                edge_types: vec!["KNOWS".to_string()],
1069                min_hops: 1,
1070                max_hops: Some(1),
1071                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1072                    variable: "a".to_string(),
1073                    label: Some("Person".to_string()),
1074                    input: None,
1075                })),
1076                path_alias: None,
1077                path_mode: PathMode::Walk,
1078            })),
1079        }));
1080
1081        let physical = planner.plan(&logical).unwrap();
1082        // The return should have columns [a, b]
1083        assert!(physical.columns().contains(&"a".to_string()));
1084        assert!(physical.columns().contains(&"b".to_string()));
1085    }
1086
1087    #[test]
1088    fn test_plan_expand_with_edge_variable() {
1089        let store = create_test_store();
1090        let planner = Planner::new(store);
1091
1092        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1093        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1094            items: vec![
1095                ReturnItem {
1096                    expression: LogicalExpression::Variable("a".to_string()),
1097                    alias: None,
1098                },
1099                ReturnItem {
1100                    expression: LogicalExpression::Variable("r".to_string()),
1101                    alias: None,
1102                },
1103                ReturnItem {
1104                    expression: LogicalExpression::Variable("b".to_string()),
1105                    alias: None,
1106                },
1107            ],
1108            distinct: false,
1109            input: Box::new(LogicalOperator::Expand(ExpandOp {
1110                from_variable: "a".to_string(),
1111                to_variable: "b".to_string(),
1112                edge_variable: Some("r".to_string()),
1113                direction: ExpandDirection::Outgoing,
1114                edge_types: vec!["KNOWS".to_string()],
1115                min_hops: 1,
1116                max_hops: Some(1),
1117                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1118                    variable: "a".to_string(),
1119                    label: None,
1120                    input: None,
1121                })),
1122                path_alias: None,
1123                path_mode: PathMode::Walk,
1124            })),
1125        }));
1126
1127        let physical = planner.plan(&logical).unwrap();
1128        assert!(physical.columns().contains(&"a".to_string()));
1129        assert!(physical.columns().contains(&"r".to_string()));
1130        assert!(physical.columns().contains(&"b".to_string()));
1131    }
1132
1133    // ==================== Limit/Skip/Sort Tests ====================
1134
1135    #[test]
1136    fn test_plan_limit() {
1137        let store = create_test_store();
1138        let planner = Planner::new(store);
1139
1140        // MATCH (n) RETURN n LIMIT 10
1141        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1142            items: vec![ReturnItem {
1143                expression: LogicalExpression::Variable("n".to_string()),
1144                alias: None,
1145            }],
1146            distinct: false,
1147            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1148                count: 10.into(),
1149                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1150                    variable: "n".to_string(),
1151                    label: None,
1152                    input: None,
1153                })),
1154            })),
1155        }));
1156
1157        let physical = planner.plan(&logical).unwrap();
1158        assert_eq!(physical.columns(), &["n"]);
1159    }
1160
1161    #[test]
1162    fn test_plan_skip() {
1163        let store = create_test_store();
1164        let planner = Planner::new(store);
1165
1166        // MATCH (n) RETURN n SKIP 5
1167        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1168            items: vec![ReturnItem {
1169                expression: LogicalExpression::Variable("n".to_string()),
1170                alias: None,
1171            }],
1172            distinct: false,
1173            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1174                count: 5.into(),
1175                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1176                    variable: "n".to_string(),
1177                    label: None,
1178                    input: None,
1179                })),
1180            })),
1181        }));
1182
1183        let physical = planner.plan(&logical).unwrap();
1184        assert_eq!(physical.columns(), &["n"]);
1185    }
1186
1187    #[test]
1188    fn test_plan_sort() {
1189        let store = create_test_store();
1190        let planner = Planner::new(store);
1191
1192        // MATCH (n) RETURN n ORDER BY n.name ASC
1193        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1194            items: vec![ReturnItem {
1195                expression: LogicalExpression::Variable("n".to_string()),
1196                alias: None,
1197            }],
1198            distinct: false,
1199            input: Box::new(LogicalOperator::Sort(SortOp {
1200                keys: vec![SortKey {
1201                    expression: LogicalExpression::Variable("n".to_string()),
1202                    order: SortOrder::Ascending,
1203                    nulls: None,
1204                }],
1205                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1206                    variable: "n".to_string(),
1207                    label: None,
1208                    input: None,
1209                })),
1210            })),
1211        }));
1212
1213        let physical = planner.plan(&logical).unwrap();
1214        assert_eq!(physical.columns(), &["n"]);
1215    }
1216
1217    #[test]
1218    fn test_plan_sort_descending() {
1219        let store = create_test_store();
1220        let planner = Planner::new(store);
1221
1222        // ORDER BY n DESC
1223        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1224            items: vec![ReturnItem {
1225                expression: LogicalExpression::Variable("n".to_string()),
1226                alias: None,
1227            }],
1228            distinct: false,
1229            input: Box::new(LogicalOperator::Sort(SortOp {
1230                keys: vec![SortKey {
1231                    expression: LogicalExpression::Variable("n".to_string()),
1232                    order: SortOrder::Descending,
1233                    nulls: None,
1234                }],
1235                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1236                    variable: "n".to_string(),
1237                    label: None,
1238                    input: None,
1239                })),
1240            })),
1241        }));
1242
1243        let physical = planner.plan(&logical).unwrap();
1244        assert_eq!(physical.columns(), &["n"]);
1245    }
1246
1247    #[test]
1248    fn test_plan_distinct() {
1249        let store = create_test_store();
1250        let planner = Planner::new(store);
1251
1252        // MATCH (n) RETURN DISTINCT n
1253        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1254            items: vec![ReturnItem {
1255                expression: LogicalExpression::Variable("n".to_string()),
1256                alias: None,
1257            }],
1258            distinct: false,
1259            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1260                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1261                    variable: "n".to_string(),
1262                    label: None,
1263                    input: None,
1264                })),
1265                columns: None,
1266            })),
1267        }));
1268
1269        let physical = planner.plan(&logical).unwrap();
1270        assert_eq!(physical.columns(), &["n"]);
1271    }
1272
1273    #[test]
1274    fn test_plan_distinct_with_columns() {
1275        let store = create_test_store();
1276        let planner = Planner::new(store);
1277
1278        // DISTINCT on specific columns (column-specific dedup)
1279        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1280            items: vec![ReturnItem {
1281                expression: LogicalExpression::Variable("n".to_string()),
1282                alias: None,
1283            }],
1284            distinct: false,
1285            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1286                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1287                    variable: "n".to_string(),
1288                    label: None,
1289                    input: None,
1290                })),
1291                columns: Some(vec!["n".to_string()]),
1292            })),
1293        }));
1294
1295        let physical = planner.plan(&logical).unwrap();
1296        assert_eq!(physical.columns(), &["n"]);
1297    }
1298
1299    #[test]
1300    fn test_plan_distinct_with_nonexistent_columns() {
1301        let store = create_test_store();
1302        let planner = Planner::new(store);
1303
1304        // When distinct columns don't match any output columns,
1305        // it falls back to full-row distinct.
1306        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1307            items: vec![ReturnItem {
1308                expression: LogicalExpression::Variable("n".to_string()),
1309                alias: None,
1310            }],
1311            distinct: false,
1312            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1313                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1314                    variable: "n".to_string(),
1315                    label: None,
1316                    input: None,
1317                })),
1318                columns: Some(vec!["nonexistent".to_string()]),
1319            })),
1320        }));
1321
1322        let physical = planner.plan(&logical).unwrap();
1323        assert_eq!(physical.columns(), &["n"]);
1324    }
1325
1326    // ==================== Aggregate Tests ====================
1327
1328    #[test]
1329    fn test_plan_aggregate_count() {
1330        let store = create_test_store();
1331        let planner = Planner::new(store);
1332
1333        // MATCH (n) RETURN count(n)
1334        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1335            items: vec![ReturnItem {
1336                expression: LogicalExpression::Variable("cnt".to_string()),
1337                alias: None,
1338            }],
1339            distinct: false,
1340            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1341                group_by: vec![],
1342                aggregates: vec![LogicalAggregateExpr {
1343                    function: LogicalAggregateFunction::Count,
1344                    expression: Some(LogicalExpression::Variable("n".to_string())),
1345                    expression2: None,
1346                    distinct: false,
1347                    alias: Some("cnt".to_string()),
1348                    percentile: None,
1349                    separator: None,
1350                }],
1351                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1352                    variable: "n".to_string(),
1353                    label: None,
1354                    input: None,
1355                })),
1356                having: None,
1357            })),
1358        }));
1359
1360        let physical = planner.plan(&logical).unwrap();
1361        assert!(physical.columns().contains(&"cnt".to_string()));
1362    }
1363
1364    #[test]
1365    fn test_plan_aggregate_with_group_by() {
1366        let store = create_test_store();
1367        let planner = Planner::new(store);
1368
1369        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1370        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1371            group_by: vec![LogicalExpression::Property {
1372                variable: "n".to_string(),
1373                property: "city".to_string(),
1374            }],
1375            aggregates: vec![LogicalAggregateExpr {
1376                function: LogicalAggregateFunction::Count,
1377                expression: Some(LogicalExpression::Variable("n".to_string())),
1378                expression2: None,
1379                distinct: false,
1380                alias: Some("cnt".to_string()),
1381                percentile: None,
1382                separator: None,
1383            }],
1384            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1385                variable: "n".to_string(),
1386                label: Some("Person".to_string()),
1387                input: None,
1388            })),
1389            having: None,
1390        }));
1391
1392        let physical = planner.plan(&logical).unwrap();
1393        assert_eq!(physical.columns().len(), 2);
1394    }
1395
1396    #[test]
1397    fn test_plan_aggregate_sum() {
1398        let store = create_test_store();
1399        let planner = Planner::new(store);
1400
1401        // SUM(n.value)
1402        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1403            group_by: vec![],
1404            aggregates: vec![LogicalAggregateExpr {
1405                function: LogicalAggregateFunction::Sum,
1406                expression: Some(LogicalExpression::Property {
1407                    variable: "n".to_string(),
1408                    property: "value".to_string(),
1409                }),
1410                expression2: None,
1411                distinct: false,
1412                alias: Some("total".to_string()),
1413                percentile: None,
1414                separator: None,
1415            }],
1416            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1417                variable: "n".to_string(),
1418                label: None,
1419                input: None,
1420            })),
1421            having: None,
1422        }));
1423
1424        let physical = planner.plan(&logical).unwrap();
1425        assert!(physical.columns().contains(&"total".to_string()));
1426    }
1427
1428    #[test]
1429    fn test_plan_aggregate_avg() {
1430        let store = create_test_store();
1431        let planner = Planner::new(store);
1432
1433        // AVG(n.score)
1434        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1435            group_by: vec![],
1436            aggregates: vec![LogicalAggregateExpr {
1437                function: LogicalAggregateFunction::Avg,
1438                expression: Some(LogicalExpression::Property {
1439                    variable: "n".to_string(),
1440                    property: "score".to_string(),
1441                }),
1442                expression2: None,
1443                distinct: false,
1444                alias: Some("average".to_string()),
1445                percentile: None,
1446                separator: None,
1447            }],
1448            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1449                variable: "n".to_string(),
1450                label: None,
1451                input: None,
1452            })),
1453            having: None,
1454        }));
1455
1456        let physical = planner.plan(&logical).unwrap();
1457        assert!(physical.columns().contains(&"average".to_string()));
1458    }
1459
1460    #[test]
1461    fn test_plan_aggregate_min_max() {
1462        let store = create_test_store();
1463        let planner = Planner::new(store);
1464
1465        // MIN(n.age), MAX(n.age)
1466        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1467            group_by: vec![],
1468            aggregates: vec![
1469                LogicalAggregateExpr {
1470                    function: LogicalAggregateFunction::Min,
1471                    expression: Some(LogicalExpression::Property {
1472                        variable: "n".to_string(),
1473                        property: "age".to_string(),
1474                    }),
1475                    expression2: None,
1476                    distinct: false,
1477                    alias: Some("youngest".to_string()),
1478                    percentile: None,
1479                    separator: None,
1480                },
1481                LogicalAggregateExpr {
1482                    function: LogicalAggregateFunction::Max,
1483                    expression: Some(LogicalExpression::Property {
1484                        variable: "n".to_string(),
1485                        property: "age".to_string(),
1486                    }),
1487                    expression2: None,
1488                    distinct: false,
1489                    alias: Some("oldest".to_string()),
1490                    percentile: None,
1491                    separator: None,
1492                },
1493            ],
1494            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1495                variable: "n".to_string(),
1496                label: None,
1497                input: None,
1498            })),
1499            having: None,
1500        }));
1501
1502        let physical = planner.plan(&logical).unwrap();
1503        assert!(physical.columns().contains(&"youngest".to_string()));
1504        assert!(physical.columns().contains(&"oldest".to_string()));
1505    }
1506
1507    // ==================== Join Tests ====================
1508
1509    #[test]
1510    fn test_plan_inner_join() {
1511        let store = create_test_store();
1512        let planner = Planner::new(store);
1513
1514        // Inner join between two scans
1515        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1516            items: vec![
1517                ReturnItem {
1518                    expression: LogicalExpression::Variable("a".to_string()),
1519                    alias: None,
1520                },
1521                ReturnItem {
1522                    expression: LogicalExpression::Variable("b".to_string()),
1523                    alias: None,
1524                },
1525            ],
1526            distinct: false,
1527            input: Box::new(LogicalOperator::Join(JoinOp {
1528                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1529                    variable: "a".to_string(),
1530                    label: Some("Person".to_string()),
1531                    input: None,
1532                })),
1533                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1534                    variable: "b".to_string(),
1535                    label: Some("Company".to_string()),
1536                    input: None,
1537                })),
1538                join_type: JoinType::Inner,
1539                conditions: vec![JoinCondition {
1540                    left: LogicalExpression::Variable("a".to_string()),
1541                    right: LogicalExpression::Variable("b".to_string()),
1542                }],
1543            })),
1544        }));
1545
1546        let physical = planner.plan(&logical).unwrap();
1547        assert!(physical.columns().contains(&"a".to_string()));
1548        assert!(physical.columns().contains(&"b".to_string()));
1549    }
1550
1551    #[test]
1552    fn test_plan_cross_join() {
1553        let store = create_test_store();
1554        let planner = Planner::new(store);
1555
1556        // Cross join (no conditions)
1557        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1558            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1559                variable: "a".to_string(),
1560                label: None,
1561                input: None,
1562            })),
1563            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1564                variable: "b".to_string(),
1565                label: None,
1566                input: None,
1567            })),
1568            join_type: JoinType::Cross,
1569            conditions: vec![],
1570        }));
1571
1572        let physical = planner.plan(&logical).unwrap();
1573        assert_eq!(physical.columns().len(), 2);
1574    }
1575
1576    #[test]
1577    fn test_plan_left_join() {
1578        let store = create_test_store();
1579        let planner = Planner::new(store);
1580
1581        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1582            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1583                variable: "a".to_string(),
1584                label: None,
1585                input: None,
1586            })),
1587            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588                variable: "b".to_string(),
1589                label: None,
1590                input: None,
1591            })),
1592            join_type: JoinType::Left,
1593            conditions: vec![],
1594        }));
1595
1596        let physical = planner.plan(&logical).unwrap();
1597        assert_eq!(physical.columns().len(), 2);
1598    }
1599
1600    // ==================== Mutation Tests ====================
1601
1602    #[test]
1603    fn test_plan_create_node() {
1604        let store = create_test_store();
1605        let planner = Planner::new(store);
1606
1607        // CREATE (n:Person {name: 'Alix'})
1608        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1609            variable: "n".to_string(),
1610            labels: vec!["Person".to_string()],
1611            properties: vec![(
1612                "name".to_string(),
1613                LogicalExpression::Literal(Value::String("Alix".into())),
1614            )],
1615            input: None,
1616        }));
1617
1618        let physical = planner.plan(&logical).unwrap();
1619        assert!(physical.columns().contains(&"n".to_string()));
1620    }
1621
1622    #[test]
1623    fn test_plan_create_edge() {
1624        let store = create_test_store();
1625        let planner = Planner::new(store);
1626
1627        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1628        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1629            variable: Some("r".to_string()),
1630            from_variable: "a".to_string(),
1631            to_variable: "b".to_string(),
1632            edge_type: "KNOWS".to_string(),
1633            properties: vec![],
1634            input: Box::new(LogicalOperator::Join(JoinOp {
1635                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1636                    variable: "a".to_string(),
1637                    label: None,
1638                    input: None,
1639                })),
1640                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1641                    variable: "b".to_string(),
1642                    label: None,
1643                    input: None,
1644                })),
1645                join_type: JoinType::Cross,
1646                conditions: vec![],
1647            })),
1648        }));
1649
1650        let physical = planner.plan(&logical).unwrap();
1651        assert!(physical.columns().contains(&"r".to_string()));
1652    }
1653
1654    #[test]
1655    fn test_plan_delete_node() {
1656        let store = create_test_store();
1657        let planner = Planner::new(store);
1658
1659        // MATCH (n) DELETE n
1660        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1661            variable: "n".to_string(),
1662            detach: false,
1663            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1664                variable: "n".to_string(),
1665                label: None,
1666                input: None,
1667            })),
1668        }));
1669
1670        let physical = planner.plan(&logical).unwrap();
1671        assert!(physical.columns().contains(&"n".to_string()));
1672    }
1673
1674    // ==================== Error Cases ====================
1675
1676    #[test]
1677    fn test_plan_empty_errors() {
1678        let store = create_test_store();
1679        let planner = Planner::new(store);
1680
1681        let logical = LogicalPlan::new(LogicalOperator::Empty);
1682        let result = planner.plan(&logical);
1683        assert!(result.is_err());
1684    }
1685
1686    #[test]
1687    fn test_plan_missing_variable_in_return() {
1688        let store = create_test_store();
1689        let planner = Planner::new(store);
1690
1691        // Return variable that doesn't exist in input
1692        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1693            items: vec![ReturnItem {
1694                expression: LogicalExpression::Variable("missing".to_string()),
1695                alias: None,
1696            }],
1697            distinct: false,
1698            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1699                variable: "n".to_string(),
1700                label: None,
1701                input: None,
1702            })),
1703        }));
1704
1705        let result = planner.plan(&logical);
1706        assert!(result.is_err());
1707    }
1708
1709    // ==================== Helper Function Tests ====================
1710
1711    #[test]
1712    fn test_convert_binary_ops() {
1713        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1714        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1715        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1716        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1717        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1718        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1719        assert!(convert_binary_op(BinaryOp::And).is_ok());
1720        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1721        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1722        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1723        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1724        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1725    }
1726
1727    #[test]
1728    fn test_convert_unary_ops() {
1729        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1730        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1731        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1732        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1733    }
1734
1735    #[test]
1736    fn test_convert_aggregate_functions() {
1737        assert!(matches!(
1738            convert_aggregate_function(LogicalAggregateFunction::Count),
1739            PhysicalAggregateFunction::Count
1740        ));
1741        assert!(matches!(
1742            convert_aggregate_function(LogicalAggregateFunction::Sum),
1743            PhysicalAggregateFunction::Sum
1744        ));
1745        assert!(matches!(
1746            convert_aggregate_function(LogicalAggregateFunction::Avg),
1747            PhysicalAggregateFunction::Avg
1748        ));
1749        assert!(matches!(
1750            convert_aggregate_function(LogicalAggregateFunction::Min),
1751            PhysicalAggregateFunction::Min
1752        ));
1753        assert!(matches!(
1754            convert_aggregate_function(LogicalAggregateFunction::Max),
1755            PhysicalAggregateFunction::Max
1756        ));
1757    }
1758
1759    #[test]
1760    fn test_planner_accessors() {
1761        let store = create_test_store();
1762        let planner = Planner::new(Arc::clone(&store));
1763
1764        assert!(planner.transaction_id().is_none());
1765        assert!(planner.transaction_manager().is_none());
1766        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1767    }
1768
1769    #[test]
1770    fn test_physical_plan_accessors() {
1771        let store = create_test_store();
1772        let planner = Planner::new(store);
1773
1774        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1775            variable: "n".to_string(),
1776            label: None,
1777            input: None,
1778        }));
1779
1780        let physical = planner.plan(&logical).unwrap();
1781        assert_eq!(physical.columns(), &["n"]);
1782
1783        // Test into_operator
1784        let _ = physical.into_operator();
1785    }
1786
1787    // ==================== Adaptive Planning Tests ====================
1788
1789    #[test]
1790    fn test_plan_adaptive_with_scan() {
1791        let store = create_test_store();
1792        let planner = Planner::new(store);
1793
1794        // MATCH (n:Person) RETURN n
1795        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1796            items: vec![ReturnItem {
1797                expression: LogicalExpression::Variable("n".to_string()),
1798                alias: None,
1799            }],
1800            distinct: false,
1801            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1802                variable: "n".to_string(),
1803                label: Some("Person".to_string()),
1804                input: None,
1805            })),
1806        }));
1807
1808        let physical = planner.plan_adaptive(&logical).unwrap();
1809        assert_eq!(physical.columns(), &["n"]);
1810        // Should have adaptive context with estimates
1811        assert!(physical.adaptive_context.is_some());
1812    }
1813
1814    #[test]
1815    fn test_plan_adaptive_with_filter() {
1816        let store = create_test_store();
1817        let planner = Planner::new(store);
1818
1819        // MATCH (n) WHERE n.age > 30 RETURN n
1820        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1821            items: vec![ReturnItem {
1822                expression: LogicalExpression::Variable("n".to_string()),
1823                alias: None,
1824            }],
1825            distinct: false,
1826            input: Box::new(LogicalOperator::Filter(FilterOp {
1827                predicate: LogicalExpression::Binary {
1828                    left: Box::new(LogicalExpression::Property {
1829                        variable: "n".to_string(),
1830                        property: "age".to_string(),
1831                    }),
1832                    op: BinaryOp::Gt,
1833                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1834                },
1835                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1836                    variable: "n".to_string(),
1837                    label: None,
1838                    input: None,
1839                })),
1840                pushdown_hint: None,
1841            })),
1842        }));
1843
1844        let physical = planner.plan_adaptive(&logical).unwrap();
1845        assert!(physical.adaptive_context.is_some());
1846    }
1847
1848    #[test]
1849    fn test_plan_adaptive_with_expand() {
1850        let store = create_test_store();
1851        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1852
1853        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1854        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1855            items: vec![
1856                ReturnItem {
1857                    expression: LogicalExpression::Variable("a".to_string()),
1858                    alias: None,
1859                },
1860                ReturnItem {
1861                    expression: LogicalExpression::Variable("b".to_string()),
1862                    alias: None,
1863                },
1864            ],
1865            distinct: false,
1866            input: Box::new(LogicalOperator::Expand(ExpandOp {
1867                from_variable: "a".to_string(),
1868                to_variable: "b".to_string(),
1869                edge_variable: None,
1870                direction: ExpandDirection::Outgoing,
1871                edge_types: vec!["KNOWS".to_string()],
1872                min_hops: 1,
1873                max_hops: Some(1),
1874                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1875                    variable: "a".to_string(),
1876                    label: None,
1877                    input: None,
1878                })),
1879                path_alias: None,
1880                path_mode: PathMode::Walk,
1881            })),
1882        }));
1883
1884        let physical = planner.plan_adaptive(&logical).unwrap();
1885        assert!(physical.adaptive_context.is_some());
1886    }
1887
1888    #[test]
1889    fn test_plan_adaptive_with_join() {
1890        let store = create_test_store();
1891        let planner = Planner::new(store);
1892
1893        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1894            items: vec![
1895                ReturnItem {
1896                    expression: LogicalExpression::Variable("a".to_string()),
1897                    alias: None,
1898                },
1899                ReturnItem {
1900                    expression: LogicalExpression::Variable("b".to_string()),
1901                    alias: None,
1902                },
1903            ],
1904            distinct: false,
1905            input: Box::new(LogicalOperator::Join(JoinOp {
1906                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1907                    variable: "a".to_string(),
1908                    label: None,
1909                    input: None,
1910                })),
1911                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1912                    variable: "b".to_string(),
1913                    label: None,
1914                    input: None,
1915                })),
1916                join_type: JoinType::Cross,
1917                conditions: vec![],
1918            })),
1919        }));
1920
1921        let physical = planner.plan_adaptive(&logical).unwrap();
1922        assert!(physical.adaptive_context.is_some());
1923    }
1924
1925    #[test]
1926    fn test_plan_adaptive_with_aggregate() {
1927        let store = create_test_store();
1928        let planner = Planner::new(store);
1929
1930        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1931            group_by: vec![],
1932            aggregates: vec![LogicalAggregateExpr {
1933                function: LogicalAggregateFunction::Count,
1934                expression: Some(LogicalExpression::Variable("n".to_string())),
1935                expression2: None,
1936                distinct: false,
1937                alias: Some("cnt".to_string()),
1938                percentile: None,
1939                separator: None,
1940            }],
1941            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942                variable: "n".to_string(),
1943                label: None,
1944                input: None,
1945            })),
1946            having: None,
1947        }));
1948
1949        let physical = planner.plan_adaptive(&logical).unwrap();
1950        assert!(physical.adaptive_context.is_some());
1951    }
1952
1953    #[test]
1954    fn test_plan_adaptive_with_distinct() {
1955        let store = create_test_store();
1956        let planner = Planner::new(store);
1957
1958        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1959            items: vec![ReturnItem {
1960                expression: LogicalExpression::Variable("n".to_string()),
1961                alias: None,
1962            }],
1963            distinct: false,
1964            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1965                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1966                    variable: "n".to_string(),
1967                    label: None,
1968                    input: None,
1969                })),
1970                columns: None,
1971            })),
1972        }));
1973
1974        let physical = planner.plan_adaptive(&logical).unwrap();
1975        assert!(physical.adaptive_context.is_some());
1976    }
1977
1978    #[test]
1979    fn test_plan_adaptive_with_limit() {
1980        let store = create_test_store();
1981        let planner = Planner::new(store);
1982
1983        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1984            items: vec![ReturnItem {
1985                expression: LogicalExpression::Variable("n".to_string()),
1986                alias: None,
1987            }],
1988            distinct: false,
1989            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1990                count: 10.into(),
1991                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1992                    variable: "n".to_string(),
1993                    label: None,
1994                    input: None,
1995                })),
1996            })),
1997        }));
1998
1999        let physical = planner.plan_adaptive(&logical).unwrap();
2000        assert!(physical.adaptive_context.is_some());
2001    }
2002
2003    #[test]
2004    fn test_plan_adaptive_with_skip() {
2005        let store = create_test_store();
2006        let planner = Planner::new(store);
2007
2008        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2009            items: vec![ReturnItem {
2010                expression: LogicalExpression::Variable("n".to_string()),
2011                alias: None,
2012            }],
2013            distinct: false,
2014            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2015                count: 5.into(),
2016                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2017                    variable: "n".to_string(),
2018                    label: None,
2019                    input: None,
2020                })),
2021            })),
2022        }));
2023
2024        let physical = planner.plan_adaptive(&logical).unwrap();
2025        assert!(physical.adaptive_context.is_some());
2026    }
2027
2028    #[test]
2029    fn test_plan_adaptive_with_sort() {
2030        let store = create_test_store();
2031        let planner = Planner::new(store);
2032
2033        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2034            items: vec![ReturnItem {
2035                expression: LogicalExpression::Variable("n".to_string()),
2036                alias: None,
2037            }],
2038            distinct: false,
2039            input: Box::new(LogicalOperator::Sort(SortOp {
2040                keys: vec![SortKey {
2041                    expression: LogicalExpression::Variable("n".to_string()),
2042                    order: SortOrder::Ascending,
2043                    nulls: None,
2044                }],
2045                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2046                    variable: "n".to_string(),
2047                    label: None,
2048                    input: None,
2049                })),
2050            })),
2051        }));
2052
2053        let physical = planner.plan_adaptive(&logical).unwrap();
2054        assert!(physical.adaptive_context.is_some());
2055    }
2056
2057    #[test]
2058    fn test_plan_adaptive_with_union() {
2059        let store = create_test_store();
2060        let planner = Planner::new(store);
2061
2062        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2063            items: vec![ReturnItem {
2064                expression: LogicalExpression::Variable("n".to_string()),
2065                alias: None,
2066            }],
2067            distinct: false,
2068            input: Box::new(LogicalOperator::Union(UnionOp {
2069                inputs: vec![
2070                    LogicalOperator::NodeScan(NodeScanOp {
2071                        variable: "n".to_string(),
2072                        label: Some("Person".to_string()),
2073                        input: None,
2074                    }),
2075                    LogicalOperator::NodeScan(NodeScanOp {
2076                        variable: "n".to_string(),
2077                        label: Some("Company".to_string()),
2078                        input: None,
2079                    }),
2080                ],
2081            })),
2082        }));
2083
2084        let physical = planner.plan_adaptive(&logical).unwrap();
2085        assert!(physical.adaptive_context.is_some());
2086    }
2087
2088    // ==================== Variable Length Path Tests ====================
2089
2090    #[test]
2091    fn test_plan_expand_variable_length() {
2092        let store = create_test_store();
2093        let planner = Planner::new(store);
2094
2095        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2096        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2097            items: vec![
2098                ReturnItem {
2099                    expression: LogicalExpression::Variable("a".to_string()),
2100                    alias: None,
2101                },
2102                ReturnItem {
2103                    expression: LogicalExpression::Variable("b".to_string()),
2104                    alias: None,
2105                },
2106            ],
2107            distinct: false,
2108            input: Box::new(LogicalOperator::Expand(ExpandOp {
2109                from_variable: "a".to_string(),
2110                to_variable: "b".to_string(),
2111                edge_variable: None,
2112                direction: ExpandDirection::Outgoing,
2113                edge_types: vec!["KNOWS".to_string()],
2114                min_hops: 1,
2115                max_hops: Some(3),
2116                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2117                    variable: "a".to_string(),
2118                    label: None,
2119                    input: None,
2120                })),
2121                path_alias: None,
2122                path_mode: PathMode::Walk,
2123            })),
2124        }));
2125
2126        let physical = planner.plan(&logical).unwrap();
2127        assert!(physical.columns().contains(&"a".to_string()));
2128        assert!(physical.columns().contains(&"b".to_string()));
2129    }
2130
2131    #[test]
2132    fn test_plan_expand_with_path_alias() {
2133        let store = create_test_store();
2134        let planner = Planner::new(store);
2135
2136        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2137        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2138            items: vec![
2139                ReturnItem {
2140                    expression: LogicalExpression::Variable("a".to_string()),
2141                    alias: None,
2142                },
2143                ReturnItem {
2144                    expression: LogicalExpression::Variable("b".to_string()),
2145                    alias: None,
2146                },
2147            ],
2148            distinct: false,
2149            input: Box::new(LogicalOperator::Expand(ExpandOp {
2150                from_variable: "a".to_string(),
2151                to_variable: "b".to_string(),
2152                edge_variable: None,
2153                direction: ExpandDirection::Outgoing,
2154                edge_types: vec!["KNOWS".to_string()],
2155                min_hops: 1,
2156                max_hops: Some(3),
2157                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2158                    variable: "a".to_string(),
2159                    label: None,
2160                    input: None,
2161                })),
2162                path_alias: Some("p".to_string()),
2163                path_mode: PathMode::Walk,
2164            })),
2165        }));
2166
2167        let physical = planner.plan(&logical).unwrap();
2168        // Verify plan was created successfully with expected output columns
2169        assert!(physical.columns().contains(&"a".to_string()));
2170        assert!(physical.columns().contains(&"b".to_string()));
2171    }
2172
2173    #[test]
2174    fn test_plan_expand_incoming() {
2175        let store = create_test_store();
2176        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2177
2178        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2179        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2180            items: vec![
2181                ReturnItem {
2182                    expression: LogicalExpression::Variable("a".to_string()),
2183                    alias: None,
2184                },
2185                ReturnItem {
2186                    expression: LogicalExpression::Variable("b".to_string()),
2187                    alias: None,
2188                },
2189            ],
2190            distinct: false,
2191            input: Box::new(LogicalOperator::Expand(ExpandOp {
2192                from_variable: "a".to_string(),
2193                to_variable: "b".to_string(),
2194                edge_variable: None,
2195                direction: ExpandDirection::Incoming,
2196                edge_types: vec!["KNOWS".to_string()],
2197                min_hops: 1,
2198                max_hops: Some(1),
2199                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2200                    variable: "a".to_string(),
2201                    label: None,
2202                    input: None,
2203                })),
2204                path_alias: None,
2205                path_mode: PathMode::Walk,
2206            })),
2207        }));
2208
2209        let physical = planner.plan(&logical).unwrap();
2210        assert!(physical.columns().contains(&"a".to_string()));
2211        assert!(physical.columns().contains(&"b".to_string()));
2212    }
2213
2214    #[test]
2215    fn test_plan_expand_both_directions() {
2216        let store = create_test_store();
2217        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2218
2219        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2220        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2221            items: vec![
2222                ReturnItem {
2223                    expression: LogicalExpression::Variable("a".to_string()),
2224                    alias: None,
2225                },
2226                ReturnItem {
2227                    expression: LogicalExpression::Variable("b".to_string()),
2228                    alias: None,
2229                },
2230            ],
2231            distinct: false,
2232            input: Box::new(LogicalOperator::Expand(ExpandOp {
2233                from_variable: "a".to_string(),
2234                to_variable: "b".to_string(),
2235                edge_variable: None,
2236                direction: ExpandDirection::Both,
2237                edge_types: vec!["KNOWS".to_string()],
2238                min_hops: 1,
2239                max_hops: Some(1),
2240                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2241                    variable: "a".to_string(),
2242                    label: None,
2243                    input: None,
2244                })),
2245                path_alias: None,
2246                path_mode: PathMode::Walk,
2247            })),
2248        }));
2249
2250        let physical = planner.plan(&logical).unwrap();
2251        assert!(physical.columns().contains(&"a".to_string()));
2252        assert!(physical.columns().contains(&"b".to_string()));
2253    }
2254
2255    // ==================== With Context Tests ====================
2256
2257    #[test]
2258    fn test_planner_with_context() {
2259        use crate::transaction::TransactionManager;
2260
2261        let store = create_test_store();
2262        let transaction_manager = Arc::new(TransactionManager::new());
2263        let transaction_id = transaction_manager.begin();
2264        let epoch = transaction_manager.current_epoch();
2265
2266        let planner = Planner::with_context(
2267            Arc::clone(&store),
2268            Arc::clone(&transaction_manager),
2269            Some(transaction_id),
2270            epoch,
2271        );
2272
2273        assert_eq!(planner.transaction_id(), Some(transaction_id));
2274        assert!(planner.transaction_manager().is_some());
2275        assert_eq!(planner.viewing_epoch(), epoch);
2276    }
2277
2278    #[test]
2279    fn test_planner_with_factorized_execution_disabled() {
2280        let store = create_test_store();
2281        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2282
2283        // Two consecutive expands - should NOT use factorized execution
2284        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2285            items: vec![
2286                ReturnItem {
2287                    expression: LogicalExpression::Variable("a".to_string()),
2288                    alias: None,
2289                },
2290                ReturnItem {
2291                    expression: LogicalExpression::Variable("c".to_string()),
2292                    alias: None,
2293                },
2294            ],
2295            distinct: false,
2296            input: Box::new(LogicalOperator::Expand(ExpandOp {
2297                from_variable: "b".to_string(),
2298                to_variable: "c".to_string(),
2299                edge_variable: None,
2300                direction: ExpandDirection::Outgoing,
2301                edge_types: vec![],
2302                min_hops: 1,
2303                max_hops: Some(1),
2304                input: Box::new(LogicalOperator::Expand(ExpandOp {
2305                    from_variable: "a".to_string(),
2306                    to_variable: "b".to_string(),
2307                    edge_variable: None,
2308                    direction: ExpandDirection::Outgoing,
2309                    edge_types: vec![],
2310                    min_hops: 1,
2311                    max_hops: Some(1),
2312                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2313                        variable: "a".to_string(),
2314                        label: None,
2315                        input: None,
2316                    })),
2317                    path_alias: None,
2318                    path_mode: PathMode::Walk,
2319                })),
2320                path_alias: None,
2321                path_mode: PathMode::Walk,
2322            })),
2323        }));
2324
2325        let physical = planner.plan(&logical).unwrap();
2326        assert!(physical.columns().contains(&"a".to_string()));
2327        assert!(physical.columns().contains(&"c".to_string()));
2328    }
2329
2330    // ==================== Sort with Property Tests ====================
2331
2332    #[test]
2333    fn test_plan_sort_by_property() {
2334        let store = create_test_store();
2335        let planner = Planner::new(store);
2336
2337        // MATCH (n) RETURN n ORDER BY n.name ASC
2338        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2339            items: vec![ReturnItem {
2340                expression: LogicalExpression::Variable("n".to_string()),
2341                alias: None,
2342            }],
2343            distinct: false,
2344            input: Box::new(LogicalOperator::Sort(SortOp {
2345                keys: vec![SortKey {
2346                    expression: LogicalExpression::Property {
2347                        variable: "n".to_string(),
2348                        property: "name".to_string(),
2349                    },
2350                    order: SortOrder::Ascending,
2351                    nulls: None,
2352                }],
2353                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2354                    variable: "n".to_string(),
2355                    label: None,
2356                    input: None,
2357                })),
2358            })),
2359        }));
2360
2361        let physical = planner.plan(&logical).unwrap();
2362        // Should have the property column projected
2363        assert!(physical.columns().contains(&"n".to_string()));
2364    }
2365
2366    // ==================== Scan with Input Tests ====================
2367
2368    #[test]
2369    fn test_plan_scan_with_input() {
2370        let store = create_test_store();
2371        let planner = Planner::new(store);
2372
2373        // A scan with another scan as input (for chained patterns)
2374        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2375            items: vec![
2376                ReturnItem {
2377                    expression: LogicalExpression::Variable("a".to_string()),
2378                    alias: None,
2379                },
2380                ReturnItem {
2381                    expression: LogicalExpression::Variable("b".to_string()),
2382                    alias: None,
2383                },
2384            ],
2385            distinct: false,
2386            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2387                variable: "b".to_string(),
2388                label: Some("Company".to_string()),
2389                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2390                    variable: "a".to_string(),
2391                    label: Some("Person".to_string()),
2392                    input: None,
2393                }))),
2394            })),
2395        }));
2396
2397        let physical = planner.plan(&logical).unwrap();
2398        assert!(physical.columns().contains(&"a".to_string()));
2399        assert!(physical.columns().contains(&"b".to_string()));
2400    }
2401}