Skip to main content

nervusdb_core/query/
planner.rs

1use crate::error::Error;
2use crate::query::ast::*;
3use std::collections::HashSet;
4
5#[derive(Debug, Clone)]
6pub enum PhysicalPlan {
7    SingleRow(SingleRowNode),
8    Scan(ScanNode),
9    FtsCandidateScan(FtsCandidateScanNode),
10    VectorTopKScan(VectorTopKScanNode),
11    Filter(FilterNode),
12    Project(ProjectNode),
13    Limit(LimitNode),
14    Skip(SkipNode),
15    Sort(SortNode),
16    Distinct(DistinctNode),
17    Aggregate(AggregateNode),
18    NestedLoopJoin(NestedLoopJoinNode),
19    LeftOuterJoin(LeftOuterJoinNode),
20    Expand(ExpandNode),
21    ExpandVarLength(ExpandVarLengthNode),
22    Unwind(UnwindNode),
23    Create(CreateNode),
24    Set(SetNode),
25    Delete(DeleteNode),
26}
27
28#[derive(Debug, Clone)]
29pub struct SingleRowNode;
30
31#[derive(Debug, Clone)]
32pub struct ScanNode {
33    pub alias: String,
34    pub labels: Vec<String>,
35}
36
37#[derive(Debug, Clone)]
38pub struct FtsCandidateScanNode {
39    pub alias: String,
40    pub labels: Vec<String>,
41    pub property: String,
42    pub query: Expression,
43}
44
45#[derive(Debug, Clone)]
46pub struct VectorTopKScanNode {
47    pub alias: String,
48    pub labels: Vec<String>,
49    pub property: String,
50    pub query: Expression,
51    pub limit: u32,
52}
53
54#[derive(Debug, Clone)]
55pub struct FilterNode {
56    pub input: Box<PhysicalPlan>,
57    pub predicate: Expression,
58}
59
60#[derive(Debug, Clone)]
61pub struct ProjectNode {
62    pub input: Box<PhysicalPlan>,
63    pub projections: Vec<(Expression, String)>, // (Expr, Alias)
64}
65
66#[derive(Debug, Clone)]
67pub struct LimitNode {
68    pub input: Box<PhysicalPlan>,
69    pub limit: u32,
70}
71
72#[derive(Debug, Clone)]
73pub struct SkipNode {
74    pub input: Box<PhysicalPlan>,
75    pub skip: u32,
76}
77
78#[derive(Debug, Clone)]
79pub struct SortNode {
80    pub input: Box<PhysicalPlan>,
81    pub order_by: Vec<(Expression, Direction)>,
82}
83
84#[derive(Debug, Clone)]
85pub struct DistinctNode {
86    pub input: Box<PhysicalPlan>,
87}
88
89#[derive(Debug, Clone)]
90pub struct AggregateNode {
91    pub input: Box<PhysicalPlan>,
92    pub aggregations: Vec<(AggregateFunction, String)>, // (function, alias)
93    pub group_by: Vec<Expression>,
94}
95
96#[derive(Debug, Clone)]
97pub enum AggregateFunction {
98    Count(Option<Expression>), // None for count(*)
99    Sum(Expression),
100    Avg(Expression),
101    Min(Expression),
102    Max(Expression),
103    Collect(Expression),
104}
105
106#[derive(Debug, Clone)]
107pub struct NestedLoopJoinNode {
108    pub left: Box<PhysicalPlan>,
109    pub right: Box<PhysicalPlan>,
110    pub predicate: Option<Expression>,
111}
112
113#[derive(Debug, Clone)]
114pub struct LeftOuterJoinNode {
115    pub left: Box<PhysicalPlan>,
116    pub right: Box<PhysicalPlan>,
117    pub right_aliases: Vec<String>, // Variables to set to NULL if no match
118}
119
120#[derive(Debug, Clone)]
121pub struct ExpandNode {
122    pub input: Box<PhysicalPlan>,
123    pub start_node_alias: String,
124    pub rel_alias: String,
125    pub end_node_alias: String,
126    pub direction: RelationshipDirection,
127    pub rel_type: Option<String>,
128}
129
130#[derive(Debug, Clone)]
131pub struct ExpandVarLengthNode {
132    pub input: Box<PhysicalPlan>,
133    pub start_node_alias: String,
134    pub end_node_alias: String,
135    pub direction: RelationshipDirection,
136    pub rel_type: Option<String>,
137    pub min_hops: u32,
138    pub max_hops: u32,
139}
140
141#[derive(Debug, Clone)]
142pub struct UnwindNode {
143    pub input: Box<PhysicalPlan>,
144    pub expression: Expression,
145    pub alias: String,
146}
147
148#[derive(Debug, Clone)]
149pub struct CreateNode {
150    pub pattern: Pattern,
151}
152
153#[derive(Debug, Clone)]
154pub struct SetNode {
155    pub input: Box<PhysicalPlan>,
156    pub items: Vec<SetItem>,
157}
158
159#[derive(Debug, Clone)]
160pub struct DeleteNode {
161    pub input: Box<PhysicalPlan>,
162    pub detach: bool,
163    pub expressions: Vec<Expression>,
164}
165
166pub struct QueryPlanner;
167
168impl Default for QueryPlanner {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl QueryPlanner {
175    pub fn new() -> Self {
176        Self
177    }
178
179    pub fn plan(&self, query: Query) -> Result<PhysicalPlan, Error> {
180        fn make_filter(input: PhysicalPlan, predicate: Expression) -> PhysicalPlan {
181            QueryPlanner::maybe_rewrite_txt_score_filter(FilterNode {
182                input: Box::new(input),
183                predicate,
184            })
185        }
186
187        let mut plan: Option<PhysicalPlan> = None;
188        let mut where_clause: Option<WhereClause> = None;
189        let mut return_clause: Option<ReturnClause> = None;
190
191        for clause in query.clauses {
192            match clause {
193                Clause::Match(match_clause) => {
194                    let is_optional = match_clause.optional;
195                    let right_aliases: Vec<String> = if is_optional {
196                        match plan.as_ref() {
197                            Some(current_plan) => {
198                                let left_aliases = Self::extract_plan_output_aliases(current_plan);
199                                let pattern_aliases =
200                                    Self::extract_pattern_aliases(&match_clause.pattern);
201                                pattern_aliases.difference(&left_aliases).cloned().collect()
202                            }
203                            None => Vec::new(),
204                        }
205                    } else {
206                        Vec::new()
207                    };
208                    let match_plan = self.plan_match(match_clause)?;
209
210                    if let Some(current_plan) = plan {
211                        if is_optional {
212                            // OPTIONAL MATCH: Left outer join
213                            plan = Some(PhysicalPlan::LeftOuterJoin(LeftOuterJoinNode {
214                                left: Box::new(current_plan),
215                                right: Box::new(match_plan),
216                                right_aliases,
217                            }));
218                        } else {
219                            // Regular MATCH: Inner join
220                            plan = Some(PhysicalPlan::NestedLoopJoin(NestedLoopJoinNode {
221                                left: Box::new(current_plan),
222                                right: Box::new(match_plan),
223                                predicate: None,
224                            }));
225                        }
226                    } else {
227                        plan = Some(match_plan);
228                    }
229                }
230                Clause::Create(create_clause) => {
231                    let create_plan = PhysicalPlan::Create(CreateNode {
232                        pattern: create_clause.pattern,
233                    });
234                    if let Some(current_plan) = plan {
235                        // CREATE after MATCH: chain them
236                        plan = Some(PhysicalPlan::NestedLoopJoin(NestedLoopJoinNode {
237                            left: Box::new(current_plan),
238                            right: Box::new(create_plan),
239                            predicate: None,
240                        }));
241                    } else {
242                        plan = Some(create_plan);
243                    }
244                }
245                Clause::Unwind(unwind_clause) => {
246                    let input_plan = plan.unwrap_or(PhysicalPlan::SingleRow(SingleRowNode));
247                    plan = Some(PhysicalPlan::Unwind(UnwindNode {
248                        input: Box::new(input_plan),
249                        expression: unwind_clause.expression,
250                        alias: unwind_clause.alias,
251                    }));
252                }
253                Clause::Merge(_) => return Err(Error::NotImplemented("MERGE")),
254                Clause::Call(_) => return Err(Error::NotImplemented("CALL")),
255                Clause::Set(set_clause) => {
256                    // SET requires a previous plan (usually MATCH)
257                    let current_plan = plan.ok_or_else(|| {
258                        Error::Other(
259                            "SET clause requires a preceding MATCH or CREATE clause".to_string(),
260                        )
261                    })?;
262                    plan = Some(PhysicalPlan::Set(SetNode {
263                        input: Box::new(current_plan),
264                        items: set_clause.items,
265                    }));
266                }
267                Clause::Delete(delete_clause) => {
268                    // DELETE requires a previous plan (usually MATCH)
269                    let current_plan = plan.ok_or_else(|| {
270                        Error::Other("DELETE clause requires a preceding MATCH clause".to_string())
271                    })?;
272                    plan = Some(PhysicalPlan::Delete(DeleteNode {
273                        input: Box::new(current_plan),
274                        detach: delete_clause.detach,
275                        expressions: delete_clause.expressions,
276                    }));
277                }
278                Clause::Union(_) => return Err(Error::NotImplemented("UNION")),
279                Clause::Where(w) => {
280                    where_clause = Some(w);
281                }
282                Clause::Return(r) => {
283                    return_clause = Some(r);
284                }
285                Clause::With(with_clause) => {
286                    // WITH acts as a pipeline: project current results, then continue
287                    let current_plan = plan.ok_or_else(|| {
288                        Error::Other("WITH clause requires a preceding clause".to_string())
289                    })?;
290
291                    // Apply any pending WHERE first
292                    let mut with_plan = if let Some(w) = where_clause.take() {
293                        make_filter(current_plan, w.expression)
294                    } else {
295                        current_plan
296                    };
297
298                    // Project the WITH items
299                    let projections: Vec<(Expression, String)> = with_clause
300                        .items
301                        .into_iter()
302                        .map(|item| {
303                            let alias = item
304                                .alias
305                                .unwrap_or_else(|| Self::infer_alias(&item.expression));
306                            (item.expression, alias)
307                        })
308                        .collect();
309
310                    with_plan = PhysicalPlan::Project(ProjectNode {
311                        input: Box::new(with_plan),
312                        projections,
313                    });
314
315                    // Apply WITH's WHERE clause
316                    if let Some(w) = with_clause.where_clause {
317                        with_plan = make_filter(with_plan, w.expression);
318                    }
319
320                    if with_clause.distinct {
321                        with_plan = PhysicalPlan::Distinct(DistinctNode {
322                            input: Box::new(with_plan),
323                        });
324                    }
325
326                    // Apply ORDER BY
327                    if let Some(order_by) = with_clause.order_by {
328                        let order_items = order_by
329                            .items
330                            .into_iter()
331                            .map(|item| (item.expression, item.direction))
332                            .collect();
333                        with_plan = PhysicalPlan::Sort(SortNode {
334                            input: Box::new(with_plan),
335                            order_by: order_items,
336                        });
337                    }
338
339                    // Apply SKIP
340                    if let Some(skip) = with_clause.skip {
341                        with_plan = PhysicalPlan::Skip(SkipNode {
342                            input: Box::new(with_plan),
343                            skip,
344                        });
345                    }
346
347                    // Apply LIMIT
348                    if let Some(limit) = with_clause.limit {
349                        with_plan = PhysicalPlan::Limit(LimitNode {
350                            input: Box::new(with_plan),
351                            limit,
352                        });
353                    }
354
355                    plan = Some(with_plan);
356                }
357            }
358        }
359
360        let mut final_plan =
361            plan.ok_or_else(|| Error::Other("No MATCH or CREATE clause found".to_string()))?;
362
363        if let Some(w) = where_clause {
364            final_plan = make_filter(final_plan, w.expression);
365        }
366
367        if let Some(r) = return_clause {
368            // Check if any return item contains aggregate functions
369            let has_aggregates = r
370                .items
371                .iter()
372                .any(|item| Self::contains_aggregate(&item.expression));
373
374            if has_aggregates {
375                // Extract aggregations and non-aggregate expressions
376                let mut aggregations = Vec::new();
377                let mut projections = Vec::new();
378
379                for item in r.items {
380                    let alias = item
381                        .alias
382                        .clone()
383                        .unwrap_or_else(|| Self::infer_alias(&item.expression));
384
385                    if let Some(agg_func) = Self::extract_aggregate(&item.expression) {
386                        aggregations.push((agg_func, alias));
387                    } else {
388                        projections.push((item.expression, alias));
389                    }
390                }
391
392                // Add aggregate node
393                final_plan = PhysicalPlan::Aggregate(AggregateNode {
394                    input: Box::new(final_plan),
395                    aggregations,
396                    group_by: projections.iter().map(|(e, _)| e.clone()).collect(),
397                });
398
399                // Project the results
400                if !projections.is_empty() {
401                    final_plan = PhysicalPlan::Project(ProjectNode {
402                        input: Box::new(final_plan),
403                        projections,
404                    });
405                }
406            } else {
407                let projections = r
408                    .items
409                    .into_iter()
410                    .map(|item| {
411                        let alias = item
412                            .alias
413                            .unwrap_or_else(|| Self::infer_alias(&item.expression));
414                        (item.expression, alias)
415                    })
416                    .collect();
417
418                final_plan = PhysicalPlan::Project(ProjectNode {
419                    input: Box::new(final_plan),
420                    projections,
421                });
422            }
423
424            if r.distinct {
425                final_plan = PhysicalPlan::Distinct(DistinctNode {
426                    input: Box::new(final_plan),
427                });
428            }
429
430            // ORDER BY must come before SKIP and LIMIT
431            if let Some(order_by) = r.order_by {
432                let order_items = order_by
433                    .items
434                    .into_iter()
435                    .map(|item| (item.expression, item.direction))
436                    .collect();
437                final_plan = PhysicalPlan::Sort(SortNode {
438                    input: Box::new(final_plan),
439                    order_by: order_items,
440                });
441            }
442
443            // SKIP comes after ORDER BY
444            if let Some(skip) = r.skip {
445                final_plan = PhysicalPlan::Skip(SkipNode {
446                    input: Box::new(final_plan),
447                    skip,
448                });
449            }
450
451            // LIMIT comes last
452            if let Some(limit) = r.limit {
453                final_plan = PhysicalPlan::Limit(LimitNode {
454                    input: Box::new(final_plan),
455                    limit,
456                });
457            }
458        }
459
460        Ok(Self::maybe_rewrite_vector_topk(final_plan))
461    }
462
463    fn maybe_rewrite_txt_score_filter(filter: FilterNode) -> PhysicalPlan {
464        if !cfg!(all(feature = "fts", not(target_arch = "wasm32"))) {
465            return PhysicalPlan::Filter(filter);
466        }
467
468        let FilterNode { input, predicate } = filter;
469        let PhysicalPlan::Scan(scan) = *input else {
470            return PhysicalPlan::Filter(FilterNode { input, predicate });
471        };
472
473        let Some((property, query)) =
474            Self::find_txt_score_candidate(&predicate, scan.alias.as_str())
475        else {
476            return PhysicalPlan::Filter(FilterNode {
477                input: Box::new(PhysicalPlan::Scan(scan)),
478                predicate,
479            });
480        };
481
482        PhysicalPlan::Filter(FilterNode {
483            input: Box::new(PhysicalPlan::FtsCandidateScan(FtsCandidateScanNode {
484                alias: scan.alias,
485                labels: scan.labels,
486                property,
487                query,
488            })),
489            predicate,
490        })
491    }
492
493    fn find_txt_score_candidate(
494        predicate: &Expression,
495        scan_alias: &str,
496    ) -> Option<(String, Expression)> {
497        let mut conjuncts = Vec::new();
498        Self::flatten_and(predicate, &mut conjuncts);
499
500        conjuncts
501            .into_iter()
502            .find_map(|expr| Self::match_txt_score_threshold(expr, scan_alias))
503    }
504
505    fn flatten_and<'a>(expr: &'a Expression, out: &mut Vec<&'a Expression>) {
506        if let Expression::Binary(b) = expr
507            && matches!(b.operator, BinaryOperator::And)
508        {
509            Self::flatten_and(&b.left, out);
510            Self::flatten_and(&b.right, out);
511        } else {
512            out.push(expr);
513        }
514    }
515
516    fn match_txt_score_threshold(
517        expr: &Expression,
518        scan_alias: &str,
519    ) -> Option<(String, Expression)> {
520        let Expression::Binary(b) = expr else {
521            return None;
522        };
523        if !matches!(
524            b.operator,
525            BinaryOperator::GreaterThan | BinaryOperator::GreaterThanOrEqual
526        ) {
527            return None;
528        }
529
530        let Expression::FunctionCall(fc) = &b.left else {
531            return None;
532        };
533        if !fc.name.eq_ignore_ascii_case("txt_score") {
534            return None;
535        }
536
537        let Some(Expression::PropertyAccess(pa)) = fc.arguments.first() else {
538            return None;
539        };
540        if pa.variable != scan_alias || pa.property.is_empty() {
541            return None;
542        }
543
544        let query = fc.arguments.get(1)?.clone();
545        if !matches!(
546            query,
547            Expression::Parameter(_) | Expression::Literal(Literal::String(_))
548        ) {
549            return None;
550        }
551
552        let threshold = Self::number_literal(&b.right)?;
553        match b.operator {
554            BinaryOperator::GreaterThan if threshold < 0.0 => return None,
555            BinaryOperator::GreaterThanOrEqual if threshold <= 0.0 => return None,
556            BinaryOperator::GreaterThan | BinaryOperator::GreaterThanOrEqual => {}
557            _ => return None,
558        }
559
560        Some((pa.property.clone(), query))
561    }
562
563    fn number_literal(expr: &Expression) -> Option<f64> {
564        match expr {
565            Expression::Literal(Literal::Integer(i)) => Some(*i as f64),
566            Expression::Literal(Literal::Float(f)) => Some(*f),
567            _ => None,
568        }
569    }
570
571    fn maybe_rewrite_vector_topk(plan: PhysicalPlan) -> PhysicalPlan {
572        if !cfg!(all(feature = "vector", not(target_arch = "wasm32"))) {
573            return plan;
574        }
575
576        let PhysicalPlan::Limit(LimitNode { input, limit }) = plan else {
577            return plan;
578        };
579
580        let PhysicalPlan::Sort(SortNode { input, order_by }) = *input else {
581            return PhysicalPlan::Limit(LimitNode { input, limit });
582        };
583
584        let Some((order_expr, Direction::Descending)) = order_by.first() else {
585            return PhysicalPlan::Limit(LimitNode {
586                input: Box::new(PhysicalPlan::Sort(SortNode { input, order_by })),
587                limit,
588            });
589        };
590        if order_by.len() != 1 {
591            return PhysicalPlan::Limit(LimitNode {
592                input: Box::new(PhysicalPlan::Sort(SortNode { input, order_by })),
593                limit,
594            });
595        }
596
597        let Some((pa, query)) = Self::match_vec_similarity_order_by(order_expr) else {
598            return PhysicalPlan::Limit(LimitNode {
599                input: Box::new(PhysicalPlan::Sort(SortNode { input, order_by })),
600                limit,
601            });
602        };
603
604        let PhysicalPlan::Project(ProjectNode { input, projections }) = *input else {
605            return PhysicalPlan::Limit(LimitNode {
606                input: Box::new(PhysicalPlan::Sort(SortNode { input, order_by })),
607                limit,
608            });
609        };
610
611        let PhysicalPlan::Scan(scan) = *input else {
612            return PhysicalPlan::Limit(LimitNode {
613                input: Box::new(PhysicalPlan::Sort(SortNode {
614                    input: Box::new(PhysicalPlan::Project(ProjectNode { input, projections })),
615                    order_by,
616                })),
617                limit,
618            });
619        };
620
621        if !scan.labels.is_empty() {
622            return PhysicalPlan::Limit(LimitNode {
623                input: Box::new(PhysicalPlan::Sort(SortNode {
624                    input: Box::new(PhysicalPlan::Project(ProjectNode {
625                        input: Box::new(PhysicalPlan::Scan(scan)),
626                        projections,
627                    })),
628                    order_by,
629                })),
630                limit,
631            });
632        }
633        if pa.variable != scan.alias {
634            return PhysicalPlan::Limit(LimitNode {
635                input: Box::new(PhysicalPlan::Sort(SortNode {
636                    input: Box::new(PhysicalPlan::Project(ProjectNode {
637                        input: Box::new(PhysicalPlan::Scan(scan)),
638                        projections,
639                    })),
640                    order_by,
641                })),
642                limit,
643            });
644        }
645
646        if !projections.iter().any(|(expr, alias)| {
647            alias == scan.alias.as_str()
648                && matches!(expr, Expression::Variable(name) if name == scan.alias.as_str())
649        }) {
650            return PhysicalPlan::Limit(LimitNode {
651                input: Box::new(PhysicalPlan::Sort(SortNode {
652                    input: Box::new(PhysicalPlan::Project(ProjectNode {
653                        input: Box::new(PhysicalPlan::Scan(scan)),
654                        projections,
655                    })),
656                    order_by,
657                })),
658                limit,
659            });
660        }
661
662        PhysicalPlan::Project(ProjectNode {
663            input: Box::new(PhysicalPlan::VectorTopKScan(VectorTopKScanNode {
664                alias: scan.alias,
665                labels: scan.labels,
666                property: pa.property,
667                query,
668                limit,
669            })),
670            projections,
671        })
672    }
673
674    fn match_vec_similarity_order_by(expr: &Expression) -> Option<(PropertyAccess, Expression)> {
675        let Expression::FunctionCall(fc) = expr else {
676            return None;
677        };
678        if !fc.name.eq_ignore_ascii_case("vec_similarity") {
679            return None;
680        }
681
682        let Some(Expression::PropertyAccess(pa)) = fc.arguments.first() else {
683            return None;
684        };
685        if pa.variable.is_empty() || pa.property.is_empty() {
686            return None;
687        }
688
689        let query = fc.arguments.get(1)?.clone();
690        if !matches!(
691            query,
692            Expression::Parameter(_)
693                | Expression::List(_)
694                | Expression::Literal(Literal::String(_))
695        ) {
696            return None;
697        }
698
699        Some((pa.clone(), query))
700    }
701
702    /// Check if expression contains aggregate function
703    fn contains_aggregate(expr: &Expression) -> bool {
704        match expr {
705            Expression::FunctionCall(fc) => {
706                matches!(
707                    fc.name.to_uppercase().as_str(),
708                    "COUNT" | "SUM" | "AVG" | "MIN" | "MAX" | "COLLECT"
709                )
710            }
711            _ => false,
712        }
713    }
714
715    /// Extract aggregate function from expression
716    fn extract_aggregate(expr: &Expression) -> Option<AggregateFunction> {
717        if let Expression::FunctionCall(fc) = expr {
718            match fc.name.to_uppercase().as_str() {
719                "COUNT" => Some(AggregateFunction::Count(fc.arguments.first().cloned())),
720                "SUM" => fc.arguments.first().cloned().map(AggregateFunction::Sum),
721                "AVG" => fc.arguments.first().cloned().map(AggregateFunction::Avg),
722                "MIN" => fc.arguments.first().cloned().map(AggregateFunction::Min),
723                "MAX" => fc.arguments.first().cloned().map(AggregateFunction::Max),
724                "COLLECT" => fc
725                    .arguments
726                    .first()
727                    .cloned()
728                    .map(AggregateFunction::Collect),
729                _ => None,
730            }
731        } else {
732            None
733        }
734    }
735
736    fn plan_match(&self, match_clause: MatchClause) -> Result<PhysicalPlan, Error> {
737        let mut plan: Option<PhysicalPlan> = None;
738        let mut last_node_alias: Option<String> = None;
739        let mut elements = match_clause.pattern.elements.into_iter();
740        let mut anon_idx = 0;
741
742        while let Some(element) = elements.next() {
743            match element {
744                PathElement::Node(node) => {
745                    let alias = node.variable.unwrap_or_else(|| {
746                        anon_idx += 1;
747                        format!("_anon{}", anon_idx)
748                    });
749
750                    if let Some(current_plan) = plan {
751                        // If we already have a plan, this node should have been handled by an Expand.
752                        // If we are here, it means we have a disconnected node (Cartesian product).
753                        // Or maybe the previous element was NOT a relationship (impossible in valid Cypher path?).
754                        // Valid path: Node - Rel - Node - Rel - Node
755
756                        // For now, assume Cartesian product if not connected via Expand.
757                        let scan = PhysicalPlan::Scan(ScanNode {
758                            alias: alias.clone(),
759                            labels: node.labels,
760                        });
761
762                        plan = Some(PhysicalPlan::NestedLoopJoin(NestedLoopJoinNode {
763                            left: Box::new(current_plan),
764                            right: Box::new(scan),
765                            predicate: None,
766                        }));
767                    } else {
768                        // First node
769                        plan = Some(PhysicalPlan::Scan(ScanNode {
770                            alias: alias.clone(),
771                            labels: node.labels,
772                        }));
773                    }
774                    last_node_alias = Some(alias);
775                }
776                PathElement::Relationship(rel) => {
777                    let RelationshipPattern {
778                        variable,
779                        types,
780                        direction,
781                        properties,
782                        variable_length,
783                    } = rel;
784
785                    // Expect next element to be a Node
786                    if let Some(PathElement::Node(next_node)) = elements.next() {
787                        let start_alias = last_node_alias.ok_or_else(|| {
788                            Error::Other("Relationship without start node".to_string())
789                        })?;
790                        let end_alias = next_node.variable.unwrap_or_else(|| {
791                            anon_idx += 1;
792                            format!("_anon{}", anon_idx)
793                        });
794
795                        let current_plan = plan.ok_or_else(|| {
796                            Error::Other("Relationship without start node plan".to_string())
797                        })?;
798
799                        plan = if let Some(var_len) = variable_length {
800                            if variable.is_some() {
801                                return Err(Error::NotImplemented(
802                                    "variable-length relationship variables",
803                                ));
804                            }
805                            if properties.is_some() {
806                                return Err(Error::NotImplemented(
807                                    "variable-length relationship properties",
808                                ));
809                            }
810
811                            let min_hops = var_len.min.unwrap_or(1);
812                            let Some(max_hops) = var_len.max else {
813                                return Err(Error::NotImplemented(
814                                    "variable-length relationships without max",
815                                ));
816                            };
817
818                            Some(PhysicalPlan::ExpandVarLength(ExpandVarLengthNode {
819                                input: Box::new(current_plan),
820                                start_node_alias: start_alias,
821                                end_node_alias: end_alias.clone(),
822                                direction,
823                                rel_type: types.first().cloned(), // TODO: Handle multiple types
824                                min_hops,
825                                max_hops,
826                            }))
827                        } else {
828                            let rel_alias = variable.unwrap_or_else(|| "rel".to_string());
829                            Some(PhysicalPlan::Expand(ExpandNode {
830                                input: Box::new(current_plan),
831                                start_node_alias: start_alias,
832                                rel_alias,
833                                end_node_alias: end_alias.clone(),
834                                direction,
835                                rel_type: types.first().cloned(), // TODO: Handle multiple types
836                            }))
837                        };
838
839                        last_node_alias = Some(end_alias);
840                    } else {
841                        return Err(Error::Other(
842                            "Relationship must be followed by a Node".to_string(),
843                        ));
844                    }
845                }
846            }
847        }
848
849        plan.ok_or_else(|| Error::Other("Empty pattern".to_string()))
850    }
851
852    fn infer_alias(expr: &Expression) -> String {
853        match expr {
854            Expression::Variable(name) => name.clone(),
855            Expression::PropertyAccess(pa) => format!("{}.{}", pa.variable, pa.property),
856            _ => "col".to_string(),
857        }
858    }
859
860    fn extract_pattern_aliases(pattern: &Pattern) -> HashSet<String> {
861        let mut aliases = HashSet::new();
862        let mut anon_idx = 0;
863
864        for element in &pattern.elements {
865            match element {
866                PathElement::Node(node) => {
867                    let alias = node.variable.clone().unwrap_or_else(|| {
868                        anon_idx += 1;
869                        format!("_anon{}", anon_idx)
870                    });
871                    aliases.insert(alias);
872                }
873                PathElement::Relationship(rel) => {
874                    let alias = rel.variable.clone().unwrap_or_else(|| "rel".to_string());
875                    aliases.insert(alias);
876                }
877            }
878        }
879
880        aliases
881    }
882
883    fn extract_plan_output_aliases(plan: &PhysicalPlan) -> HashSet<String> {
884        match plan {
885            PhysicalPlan::SingleRow(_) => HashSet::new(),
886            PhysicalPlan::Scan(node) => HashSet::from([node.alias.clone()]),
887            PhysicalPlan::FtsCandidateScan(node) => HashSet::from([node.alias.clone()]),
888            PhysicalPlan::VectorTopKScan(node) => HashSet::from([node.alias.clone()]),
889            PhysicalPlan::Filter(node) => Self::extract_plan_output_aliases(&node.input),
890            PhysicalPlan::Project(node) => node
891                .projections
892                .iter()
893                .map(|(_, alias)| alias.clone())
894                .collect(),
895            PhysicalPlan::Limit(node) => Self::extract_plan_output_aliases(&node.input),
896            PhysicalPlan::Skip(node) => Self::extract_plan_output_aliases(&node.input),
897            PhysicalPlan::Sort(node) => Self::extract_plan_output_aliases(&node.input),
898            PhysicalPlan::Distinct(node) => Self::extract_plan_output_aliases(&node.input),
899            PhysicalPlan::Aggregate(node) => {
900                let mut out: HashSet<String> = node
901                    .aggregations
902                    .iter()
903                    .map(|(_, alias)| alias.clone())
904                    .collect();
905                for expr in &node.group_by {
906                    if let Expression::Variable(name) = expr {
907                        out.insert(name.clone());
908                    }
909                }
910                out
911            }
912            PhysicalPlan::NestedLoopJoin(node) => {
913                let mut out = Self::extract_plan_output_aliases(&node.left);
914                out.extend(Self::extract_plan_output_aliases(&node.right));
915                out
916            }
917            PhysicalPlan::LeftOuterJoin(node) => {
918                let mut out = Self::extract_plan_output_aliases(&node.left);
919                out.extend(Self::extract_plan_output_aliases(&node.right));
920                out
921            }
922            PhysicalPlan::Expand(node) => {
923                let mut out = Self::extract_plan_output_aliases(&node.input);
924                out.insert(node.start_node_alias.clone());
925                out.insert(node.rel_alias.clone());
926                out.insert(node.end_node_alias.clone());
927                out
928            }
929            PhysicalPlan::ExpandVarLength(node) => {
930                let mut out = Self::extract_plan_output_aliases(&node.input);
931                out.insert(node.start_node_alias.clone());
932                out.insert(node.end_node_alias.clone());
933                out
934            }
935            PhysicalPlan::Unwind(node) => {
936                let mut out = Self::extract_plan_output_aliases(&node.input);
937                out.insert(node.alias.clone());
938                out
939            }
940            PhysicalPlan::Create(_) | PhysicalPlan::Set(_) | PhysicalPlan::Delete(_) => {
941                HashSet::new()
942            }
943        }
944    }
945}