Skip to main content

omnigraph_compiler/ir/
lower.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::catalog::Catalog;
4use crate::error::Result;
5use crate::query::ast::*;
6use crate::query::typecheck::TypeContext;
7use crate::types::Direction;
8
9use super::*;
10
11pub fn lower_query(
12    catalog: &Catalog,
13    query: &QueryDecl,
14    type_ctx: &TypeContext,
15) -> Result<QueryIR> {
16    if !query.mutations.is_empty() {
17        return Err(crate::error::NanoError::Plan(
18            "cannot lower mutation query with read-query lowerer".to_string(),
19        ));
20    }
21    let param_names: HashSet<String> = query.params.iter().map(|p| p.name.clone()).collect();
22
23    let mut pipeline = Vec::new();
24    let mut bound_vars = HashSet::new();
25
26    lower_clauses(
27        catalog,
28        &query.match_clause,
29        type_ctx,
30        &mut pipeline,
31        &mut bound_vars,
32        &param_names,
33    )?;
34
35    let return_exprs: Vec<IRProjection> = query
36        .return_clause
37        .iter()
38        .map(|p| IRProjection {
39            expr: lower_expr(&p.expr, &param_names),
40            alias: p.alias.clone(),
41        })
42        .collect();
43
44    let order_by: Vec<IROrdering> = query
45        .order_clause
46        .iter()
47        .map(|o| IROrdering {
48            expr: lower_expr(&o.expr, &param_names),
49            descending: o.descending,
50        })
51        .collect();
52
53    Ok(QueryIR {
54        name: query.name.clone(),
55        params: query.params.clone(),
56        pipeline,
57        return_exprs,
58        order_by,
59        limit: query.limit,
60    })
61}
62
63pub fn lower_mutation_query(query: &QueryDecl) -> Result<MutationIR> {
64    if query.mutations.is_empty() {
65        return Err(crate::error::NanoError::Plan(
66            "query does not contain a mutation body".to_string(),
67        ));
68    }
69    let param_names: HashSet<String> = query.params.iter().map(|p| p.name.clone()).collect();
70
71    let ops = query
72        .mutations
73        .iter()
74        .map(|m| lower_single_mutation(m, &param_names))
75        .collect::<Result<Vec<_>>>()?;
76
77    Ok(MutationIR {
78        name: query.name.clone(),
79        params: query.params.clone(),
80        ops,
81    })
82}
83
84fn lower_single_mutation(
85    mutation: &Mutation,
86    param_names: &HashSet<String>,
87) -> Result<MutationOpIR> {
88    match mutation {
89        Mutation::Insert(insert) => Ok(MutationOpIR::Insert {
90            type_name: insert.type_name.clone(),
91            assignments: insert
92                .assignments
93                .iter()
94                .map(|a| IRAssignment {
95                    property: a.property.clone(),
96                    value: lower_match_value(&a.value, param_names),
97                })
98                .collect(),
99        }),
100        Mutation::Update(update) => Ok(MutationOpIR::Update {
101            type_name: update.type_name.clone(),
102            assignments: update
103                .assignments
104                .iter()
105                .map(|a| IRAssignment {
106                    property: a.property.clone(),
107                    value: lower_match_value(&a.value, param_names),
108                })
109                .collect(),
110            predicate: IRMutationPredicate {
111                property: update.predicate.property.clone(),
112                op: update.predicate.op,
113                value: lower_match_value(&update.predicate.value, param_names),
114            },
115        }),
116        Mutation::Delete(delete) => Ok(MutationOpIR::Delete {
117            type_name: delete.type_name.clone(),
118            predicate: IRMutationPredicate {
119                property: delete.predicate.property.clone(),
120                op: delete.predicate.op,
121                value: lower_match_value(&delete.predicate.value, param_names),
122            },
123        }),
124    }
125}
126
127fn lower_clauses(
128    catalog: &Catalog,
129    clauses: &[Clause],
130    type_ctx: &TypeContext,
131    pipeline: &mut Vec<IROp>,
132    bound_vars: &mut HashSet<String>,
133    param_names: &HashSet<String>,
134) -> Result<()> {
135    // Separate clause types for ordering: bindings first, then traversals, then filters
136    let mut bindings = Vec::new();
137    let mut traversals = Vec::new();
138    let mut filters = Vec::new();
139    let mut negations = Vec::new();
140
141    for clause in clauses {
142        match clause {
143            Clause::Binding(b) => bindings.push(b),
144            Clause::Traversal(t) => traversals.push(t),
145            Clause::Filter(f) => filters.push(f),
146            Clause::Negation(inner) => negations.push(inner),
147        }
148    }
149
150    // ── Determine which bindings are "deferred" ─────────────────────────
151    //
152    // When multiple bindings in the same match clause are connected by
153    // traversals, only the first-declared binding needs a NodeScan; the
154    // rest will be introduced by Expand operations.  Making them all
155    // NodeScans triggers expensive cross-joins followed by cycle-closing
156    // filters.
157    //
158    // Algorithm: build an undirected graph of variables connected by
159    // traversals, then walk connected components in binding declaration
160    // order.  The first binding in each component becomes the root (gets
161    // a NodeScan); all other bindings in the same component are deferred
162    // — their inline filters become post-Expand Filter ops.
163
164    let binding_set: HashSet<&str> = bindings.iter().map(|b| b.variable.as_str()).collect();
165
166    // Build undirected traversal adjacency (variable → neighbours).
167    // Exclude the anonymous wildcard "_" so it cannot falsely bridge
168    // otherwise-independent components.
169    let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
170    for t in &traversals {
171        let src = t.src.as_str();
172        let dst = t.dst.as_str();
173        if src != "_" && dst != "_" {
174            adj.entry(src).or_default().push(dst);
175            adj.entry(dst).or_default().push(src);
176        }
177    }
178
179    // Walk components to find deferred binding variables
180    let mut deferred_set: HashSet<String> = HashSet::new();
181    let mut component_visited: HashSet<&str> = HashSet::new();
182
183    for binding in &bindings {
184        if component_visited.contains(binding.variable.as_str()) {
185            continue;
186        }
187        // BFS from this binding through the traversal graph
188        let mut queue = VecDeque::new();
189        queue.push_back(binding.variable.as_str());
190        let mut component_bindings: Vec<&str> = Vec::new();
191
192        while let Some(var) = queue.pop_front() {
193            if !component_visited.insert(var) {
194                continue;
195            }
196            if binding_set.contains(var) {
197                component_bindings.push(var);
198            }
199            if let Some(neighbours) = adj.get(var) {
200                for &n in neighbours {
201                    if !component_visited.contains(n) {
202                        queue.push_back(n);
203                    }
204                }
205            }
206        }
207
208        // First binding in the component is the root; defer the rest.
209        for var in component_bindings.into_iter().skip(1) {
210            deferred_set.insert(var.to_string());
211        }
212    }
213
214    // Build deferred filters map for variables introduced by traversals
215    let mut deferred_filters: HashMap<String, Vec<IRFilter>> = HashMap::new();
216
217    // Lower bindings into NodeScan ops (skip deferred ones)
218    for binding in &bindings {
219        let node_type = catalog
220            .node_types
221            .get(&binding.type_name)
222            .expect("binding type was validated during typecheck");
223
224        let binding_filters = build_binding_filters(binding, node_type, param_names);
225
226        if deferred_set.contains(&binding.variable) {
227            // Save filters for emission after the Expand that introduces
228            // this variable.
229            if !binding_filters.is_empty() {
230                deferred_filters.insert(binding.variable.clone(), binding_filters);
231            }
232            continue;
233        }
234
235        pipeline.push(IROp::NodeScan {
236            variable: binding.variable.clone(),
237            type_name: binding.type_name.clone(),
238            filters: binding_filters,
239        });
240        bound_vars.insert(binding.variable.clone());
241    }
242
243    // Lower traversals into Expand ops.
244    //
245    // Traversals are processed iteratively rather than in a single pass
246    // because deferred bindings mean a traversal's source might not be
247    // bound until a prior traversal introduces it.  Each pass processes
248    // every traversal that has at least one bound endpoint; this repeats
249    // until all traversals are consumed.
250    let mut remaining: Vec<&Traversal> = traversals.to_vec();
251    while !remaining.is_empty() {
252        let mut next_remaining = Vec::new();
253        for traversal in &remaining {
254            let src_bound = bound_vars.contains(&traversal.src);
255            let dst_bound = bound_vars.contains(&traversal.dst);
256            if !src_bound && !dst_bound {
257                next_remaining.push(*traversal);
258                continue;
259            }
260
261            let edge = catalog
262                .lookup_edge_by_name(&traversal.edge_name)
263                .ok_or_else(|| {
264                    crate::error::NanoError::Plan(format!(
265                        "lowering traversal referenced missing edge '{}' after typecheck",
266                        traversal.edge_name
267                    ))
268                })?;
269
270            let direction = type_ctx
271                .traversals
272                .iter()
273                .find(|rt| {
274                    rt.src == traversal.src
275                        && rt.dst == traversal.dst
276                        && rt.edge_type == edge.name
277                })
278                .map(|rt| rt.direction)
279                .unwrap_or(Direction::Out);
280
281            let dst_type = match direction {
282                Direction::Out => edge.to_type.clone(),
283                Direction::In => edge.from_type.clone(),
284            };
285
286            if src_bound && dst_bound {
287                // Cycle closing: expand to a temp var, then filter temp.id = dst.id
288                let temp_var = format!("__temp_{}", traversal.dst);
289                pipeline.push(IROp::Expand {
290                    src_var: traversal.src.clone(),
291                    dst_var: temp_var.clone(),
292                    edge_type: edge.name.clone(),
293                    direction,
294                    dst_type,
295                    min_hops: traversal.min_hops,
296                    max_hops: traversal.max_hops,
297                    dst_filters: vec![],
298                });
299                pipeline.push(IROp::Filter(IRFilter {
300                    left: IRExpr::PropAccess {
301                        variable: temp_var,
302                        property: "id".to_string(),
303                    },
304                    op: CompOp::Eq,
305                    right: IRExpr::PropAccess {
306                        variable: traversal.dst.clone(),
307                        property: "id".to_string(),
308                    },
309                }));
310            } else if !src_bound && dst_bound {
311                // Reverse expand: dst is bound, src is not.
312                let reverse_dir = match direction {
313                    Direction::Out => Direction::In,
314                    Direction::In => Direction::Out,
315                };
316                let src_type = match direction {
317                    Direction::Out => edge.from_type.clone(),
318                    Direction::In => edge.to_type.clone(),
319                };
320                let introduced_filters =
321                    deferred_filters.remove(&traversal.src).unwrap_or_default();
322                pipeline.push(IROp::Expand {
323                    src_var: traversal.dst.clone(),
324                    dst_var: traversal.src.clone(),
325                    edge_type: edge.name.clone(),
326                    direction: reverse_dir,
327                    dst_type: src_type,
328                    min_hops: traversal.min_hops,
329                    max_hops: traversal.max_hops,
330                    dst_filters: introduced_filters,
331                });
332                if traversal.src != "_" {
333                    bound_vars.insert(traversal.src.clone());
334                }
335            } else {
336                // Normal expand: src is bound, dst is not.
337                let introduced_filters =
338                    deferred_filters.remove(&traversal.dst).unwrap_or_default();
339                pipeline.push(IROp::Expand {
340                    src_var: traversal.src.clone(),
341                    dst_var: traversal.dst.clone(),
342                    edge_type: edge.name.clone(),
343                    direction,
344                    dst_type,
345                    min_hops: traversal.min_hops,
346                    max_hops: traversal.max_hops,
347                    dst_filters: introduced_filters,
348                });
349                if traversal.dst != "_" {
350                    bound_vars.insert(traversal.dst.clone());
351                }
352            }
353        }
354        if next_remaining.len() == remaining.len() {
355            break;
356        }
357        remaining = next_remaining;
358    }
359
360    // Lower explicit filters
361    for filter in &filters {
362        pipeline.push(IROp::Filter(IRFilter {
363            left: lower_expr(&filter.left, param_names),
364            op: filter.op,
365            right: lower_expr(&filter.right, param_names),
366        }));
367    }
368
369    // Lower negations into AntiJoin ops
370    for neg_clauses in &negations {
371        // Find outer-bound variable referenced in the negation
372        let outer_var = find_outer_var(neg_clauses, bound_vars);
373
374        let mut inner_pipeline = Vec::new();
375        let mut inner_bound = bound_vars.clone();
376        lower_clauses(
377            catalog,
378            neg_clauses,
379            type_ctx,
380            &mut inner_pipeline,
381            &mut inner_bound,
382            param_names,
383        )?;
384
385        pipeline.push(IROp::AntiJoin {
386            outer_var: outer_var.unwrap_or_default(),
387            inner: inner_pipeline,
388        });
389    }
390
391    Ok(())
392}
393
394/// Build IR filters from a binding's inline property matches.
395fn build_binding_filters(
396    binding: &Binding,
397    node_type: &crate::catalog::NodeType,
398    param_names: &HashSet<String>,
399) -> Vec<IRFilter> {
400    let mut filters = Vec::new();
401    for pm in &binding.prop_matches {
402        let prop = node_type
403            .properties
404            .get(&pm.prop_name)
405            .expect("binding property was validated during typecheck");
406        let op = if prop.list {
407            CompOp::Contains
408        } else {
409            CompOp::Eq
410        };
411        let right = match &pm.value {
412            MatchValue::Literal(lit) => IRExpr::Literal(lit.clone()),
413            MatchValue::Now => IRExpr::Param(NOW_PARAM_NAME.to_string()),
414            MatchValue::Variable(v) => {
415                if param_names.contains(v) {
416                    IRExpr::Param(v.clone())
417                } else {
418                    IRExpr::Variable(v.clone())
419                }
420            }
421        };
422        filters.push(IRFilter {
423            left: IRExpr::PropAccess {
424                variable: binding.variable.clone(),
425                property: pm.prop_name.clone(),
426            },
427            op,
428            right,
429        });
430    }
431    filters
432}
433
434fn find_outer_var(clauses: &[Clause], outer_bound: &HashSet<String>) -> Option<String> {
435    for clause in clauses {
436        match clause {
437            Clause::Traversal(t) => {
438                if outer_bound.contains(&t.src) {
439                    return Some(t.src.clone());
440                }
441                if outer_bound.contains(&t.dst) {
442                    return Some(t.dst.clone());
443                }
444            }
445            Clause::Filter(f) => {
446                if let Some(v) = expr_var(&f.left)
447                    && outer_bound.contains(&v)
448                {
449                    return Some(v);
450                }
451                if let Some(v) = expr_var(&f.right)
452                    && outer_bound.contains(&v)
453                {
454                    return Some(v);
455                }
456            }
457            Clause::Binding(b) => {
458                if outer_bound.contains(&b.variable) {
459                    return Some(b.variable.clone());
460                }
461            }
462            _ => {}
463        }
464    }
465    None
466}
467
468fn expr_var(expr: &Expr) -> Option<String> {
469    match expr {
470        Expr::Now => None,
471        Expr::PropAccess { variable, .. } => Some(variable.clone()),
472        Expr::Variable(v) => Some(v.clone()),
473        Expr::Nearest { variable, .. } => Some(variable.clone()),
474        Expr::Search { field, query } => expr_var(field).or_else(|| expr_var(query)),
475        Expr::Fuzzy {
476            field,
477            query,
478            max_edits,
479        } => expr_var(field)
480            .or_else(|| expr_var(query))
481            .or_else(|| max_edits.as_deref().and_then(expr_var)),
482        Expr::MatchText { field, query } => expr_var(field).or_else(|| expr_var(query)),
483        Expr::Bm25 { field, query } => expr_var(field).or_else(|| expr_var(query)),
484        Expr::Rrf {
485            primary,
486            secondary,
487            k,
488        } => expr_var(primary)
489            .or_else(|| expr_var(secondary))
490            .or_else(|| k.as_deref().and_then(expr_var)),
491        Expr::Aggregate { arg, .. } => expr_var(arg),
492        _ => None,
493    }
494}
495
496fn lower_expr(expr: &Expr, param_names: &HashSet<String>) -> IRExpr {
497    match expr {
498        Expr::Now => IRExpr::Param(NOW_PARAM_NAME.to_string()),
499        Expr::PropAccess { variable, property } => IRExpr::PropAccess {
500            variable: variable.clone(),
501            property: property.clone(),
502        },
503        Expr::Nearest {
504            variable,
505            property,
506            query,
507        } => IRExpr::Nearest {
508            variable: variable.clone(),
509            property: property.clone(),
510            query: Box::new(lower_expr(query, param_names)),
511        },
512        Expr::Search { field, query } => IRExpr::Search {
513            field: Box::new(lower_expr(field, param_names)),
514            query: Box::new(lower_expr(query, param_names)),
515        },
516        Expr::Fuzzy {
517            field,
518            query,
519            max_edits,
520        } => IRExpr::Fuzzy {
521            field: Box::new(lower_expr(field, param_names)),
522            query: Box::new(lower_expr(query, param_names)),
523            max_edits: max_edits
524                .as_ref()
525                .map(|expr| Box::new(lower_expr(expr, param_names))),
526        },
527        Expr::MatchText { field, query } => IRExpr::MatchText {
528            field: Box::new(lower_expr(field, param_names)),
529            query: Box::new(lower_expr(query, param_names)),
530        },
531        Expr::Bm25 { field, query } => IRExpr::Bm25 {
532            field: Box::new(lower_expr(field, param_names)),
533            query: Box::new(lower_expr(query, param_names)),
534        },
535        Expr::Rrf {
536            primary,
537            secondary,
538            k,
539        } => IRExpr::Rrf {
540            primary: Box::new(lower_expr(primary, param_names)),
541            secondary: Box::new(lower_expr(secondary, param_names)),
542            k: k.as_ref()
543                .map(|expr| Box::new(lower_expr(expr, param_names))),
544        },
545        Expr::Variable(v) => {
546            if param_names.contains(v) {
547                IRExpr::Param(v.clone())
548            } else {
549                IRExpr::Variable(v.clone())
550            }
551        }
552        Expr::Literal(l) => IRExpr::Literal(l.clone()),
553        Expr::Aggregate { func, arg } => IRExpr::Aggregate {
554            func: *func,
555            arg: Box::new(lower_expr(arg, param_names)),
556        },
557        Expr::AliasRef(name) => IRExpr::AliasRef(name.clone()),
558    }
559}
560
561fn lower_match_value(value: &MatchValue, param_names: &HashSet<String>) -> IRExpr {
562    match value {
563        MatchValue::Now => IRExpr::Param(NOW_PARAM_NAME.to_string()),
564        MatchValue::Literal(l) => IRExpr::Literal(l.clone()),
565        MatchValue::Variable(v) => {
566            if param_names.contains(v) {
567                IRExpr::Param(v.clone())
568            } else {
569                IRExpr::Variable(v.clone())
570            }
571        }
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use crate::catalog::build_catalog;
579    use crate::query::parser::parse_query;
580    use crate::query::typecheck::{CheckedQuery, typecheck_query, typecheck_query_decl};
581    use crate::schema::parser::parse_schema;
582
583    fn setup() -> Catalog {
584        let schema = parse_schema(
585            r#"
586node Person { name: String  age: I32? }
587node Company { name: String }
588edge Knows: Person -> Person { since: Date? }
589edge WorksAt: Person -> Company
590"#,
591        )
592        .unwrap();
593        build_catalog(&schema).unwrap()
594    }
595
596    #[test]
597    fn test_lower_basic() {
598        let catalog = setup();
599        let qf = parse_query(
600            r#"
601query q($name: String) {
602    match {
603        $p: Person { name: $name }
604        $p knows $f
605    }
606    return { $f.name, $f.age }
607}
608"#,
609        )
610        .unwrap();
611        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
612        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
613
614        assert_eq!(ir.pipeline.len(), 2); // NodeScan + Expand
615        assert_eq!(ir.return_exprs.len(), 2);
616    }
617
618    #[test]
619    fn test_lower_negation() {
620        let catalog = setup();
621        let qf = parse_query(
622            r#"
623query q() {
624    match {
625        $p: Person
626        not { $p worksAt $_ }
627    }
628    return { $p.name }
629}
630"#,
631        )
632        .unwrap();
633        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
634        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
635
636        assert_eq!(ir.pipeline.len(), 2); // NodeScan + AntiJoin
637        assert!(matches!(&ir.pipeline[1], IROp::AntiJoin { .. }));
638    }
639
640    #[test]
641    fn test_lower_mutation_update() {
642        let catalog = setup();
643        let qf = parse_query(
644            r#"
645query q($name: String, $age: I32) {
646    update Person set { age: $age } where name = $name
647}
648"#,
649        )
650        .unwrap();
651        let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
652        assert!(matches!(checked, CheckedQuery::Mutation(_)));
653
654        let ir = lower_mutation_query(&qf.queries[0]).unwrap();
655        match &ir.ops[0] {
656            MutationOpIR::Update {
657                type_name,
658                assignments,
659                predicate,
660            } => {
661                assert_eq!(type_name, "Person");
662                assert_eq!(assignments.len(), 1);
663                assert_eq!(assignments[0].property, "age");
664                assert_eq!(predicate.property, "name");
665            }
666            _ => panic!("expected update mutation op"),
667        }
668    }
669
670    #[test]
671    fn test_lower_bounded_traversal() {
672        let catalog = setup();
673        let qf = parse_query(
674            r#"
675query q() {
676    match {
677        $p: Person
678        $p knows{1,3} $f
679    }
680    return { $f.name }
681}
682"#,
683        )
684        .unwrap();
685        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
686        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
687        let expand = ir
688            .pipeline
689            .iter()
690            .find_map(|op| match op {
691                IROp::Expand {
692                    min_hops, max_hops, ..
693                } => Some((*min_hops, *max_hops)),
694                _ => None,
695            })
696            .expect("expected expand op");
697        assert_eq!(expand.0, 1);
698        assert_eq!(expand.1, Some(3));
699    }
700
701    #[test]
702    fn test_lower_now_uses_reserved_runtime_param() {
703        let catalog = setup();
704        let qf = parse_query(
705            r#"
706query stamp() {
707    match { $p: Person }
708    return { now() as ts }
709}
710"#,
711        )
712        .unwrap();
713        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
714        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
715
716        assert!(matches!(
717            ir.return_exprs[0].expr,
718            IRExpr::Param(ref name) if name == NOW_PARAM_NAME
719        ));
720    }
721
722    #[test]
723    fn test_lower_mutation_now_uses_reserved_runtime_param() {
724        let catalog = build_catalog(
725            &parse_schema(
726                r#"
727node Event {
728    slug: String @key
729    updated_at: DateTime?
730}
731"#,
732            )
733            .unwrap(),
734        )
735        .unwrap();
736        let qf = parse_query(
737            r#"
738query stamp() {
739    update Event set { updated_at: now() } where updated_at = now()
740}
741"#,
742        )
743        .unwrap();
744        let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
745        assert!(matches!(checked, CheckedQuery::Mutation(_)));
746
747        let ir = lower_mutation_query(&qf.queries[0]).unwrap();
748        match &ir.ops[0] {
749            MutationOpIR::Update {
750                assignments,
751                predicate,
752                ..
753            } => {
754                assert!(matches!(
755                    assignments[0].value,
756                    IRExpr::Param(ref name) if name == NOW_PARAM_NAME
757                ));
758                assert!(matches!(
759                    predicate.value,
760                    IRExpr::Param(ref name) if name == NOW_PARAM_NAME
761                ));
762            }
763            _ => panic!("expected update mutation op"),
764        }
765    }
766
767    #[test]
768    fn test_lower_multi_mutation() {
769        let catalog = setup();
770        let qf = parse_query(
771            r#"
772query q($name: String, $age: I32, $friend: String) {
773    insert Person { name: $name, age: $age }
774    insert Knows { from: $name, to: $friend }
775}
776"#,
777        )
778        .unwrap();
779        let checked = typecheck_query_decl(&catalog, &qf.queries[0]).unwrap();
780        assert!(matches!(checked, CheckedQuery::Mutation(_)));
781
782        let ir = lower_mutation_query(&qf.queries[0]).unwrap();
783        assert_eq!(ir.ops.len(), 2);
784        assert!(
785            matches!(&ir.ops[0], MutationOpIR::Insert { type_name, .. } if type_name == "Person")
786        );
787        assert!(
788            matches!(&ir.ops[1], MutationOpIR::Insert { type_name, .. } if type_name == "Knows")
789        );
790    }
791
792    /// Destination binding is deferred: NodeScan + Expand + Filter (no cross-join).
793    #[test]
794    fn test_lower_traversal_with_destination_binding() {
795        let catalog = setup();
796        let qf = parse_query(
797            r#"
798query q() {
799    match {
800        $p: Person
801        $p worksAt $c
802        $c: Company { name: "Acme" }
803    }
804    return { $p.name, $c.name }
805}
806"#,
807        )
808        .unwrap();
809        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
810        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
811
812        // Should be: NodeScan($p) → Expand($p→$c, dst_filters=[name=="Acme"])
813        // NOT:       NodeScan($p) → NodeScan($c) → cross-join → cycle-close
814        assert_eq!(ir.pipeline.len(), 2);
815        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
816        assert!(matches!(
817            &ir.pipeline[1],
818            IROp::Expand { src_var, dst_var, dst_filters, .. }
819            if src_var == "p" && dst_var == "c" && dst_filters.len() == 1
820        ));
821    }
822
823    /// Multi-hop chain: all intermediate and final bindings are deferred.
824    #[test]
825    fn test_lower_chain_defers_all_intermediate_bindings() {
826        let catalog = setup();
827        let qf = parse_query(
828            r#"
829query q() {
830    match {
831        $p: Person { name: "Alice" }
832        $p knows $f
833        $f: Person { name: "Bob" }
834        $f worksAt $c
835        $c: Company { name: "Acme" }
836    }
837    return { $c.name }
838}
839"#,
840        )
841        .unwrap();
842        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
843        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
844
845        // Should be: NodeScan($p,[name=Alice]) → Expand($p→$f, [name==Bob])
846        //            → Expand($f→$c, [name==Acme])
847        assert_eq!(ir.pipeline.len(), 3);
848        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
849        assert!(matches!(
850            &ir.pipeline[1],
851            IROp::Expand { src_var, dst_var, dst_filters, .. }
852            if src_var == "p" && dst_var == "f" && dst_filters.len() == 1
853        ));
854        assert!(matches!(
855            &ir.pipeline[2],
856            IROp::Expand { src_var, dst_var, dst_filters, .. }
857            if src_var == "f" && dst_var == "c" && dst_filters.len() == 1
858        ));
859    }
860
861    /// Reverse traversal: source binding is deferred when destination is the root.
862    #[test]
863    fn test_lower_reverse_traversal_defers_source_binding() {
864        let catalog = setup();
865        let qf = parse_query(
866            r#"
867query q() {
868    match {
869        $c: Company { name: "Acme" }
870        $p worksAt $c
871        $p: Person { name: "Alice" }
872    }
873    return { $p.name }
874}
875"#,
876        )
877        .unwrap();
878        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
879        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
880
881        // $c is root (first declared). $p is deferred (connected via traversal).
882        // Traversal $p worksAt $c: $c is bound, $p is not → reverse expand.
883        // Pipeline: NodeScan($c,[name=Acme]) → Expand($c→$p, In, [name==Alice])
884        assert_eq!(ir.pipeline.len(), 2);
885        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "c"));
886        assert!(matches!(
887            &ir.pipeline[1],
888            IROp::Expand { src_var, dst_var, dst_filters, .. }
889            if src_var == "c" && dst_var == "p" && dst_filters.len() == 1
890        ));
891    }
892
893    /// Independent bindings (no traversal) still cross-join.
894    #[test]
895    fn test_lower_independent_bindings_still_cross_join() {
896        let catalog = setup();
897        let qf = parse_query(
898            r#"
899query q() {
900    match {
901        $p: Person
902        $c: Company
903    }
904    return { $p.name, $c.name }
905}
906"#,
907        )
908        .unwrap();
909        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
910        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
911
912        // No traversal connecting them → both get NodeScans (cross-join at runtime)
913        assert_eq!(ir.pipeline.len(), 2);
914        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
915        assert!(matches!(&ir.pipeline[1], IROp::NodeScan { variable, .. } if variable == "c"));
916    }
917
918    /// Destination binding without filters: no NodeScan, no post-expand filter.
919    #[test]
920    fn test_lower_destination_binding_without_filters() {
921        let catalog = setup();
922        let qf = parse_query(
923            r#"
924query q() {
925    match {
926        $p: Person
927        $p worksAt $c
928        $c: Company
929    }
930    return { $p.name, $c.name }
931}
932"#,
933        )
934        .unwrap();
935        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
936        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
937
938        // $c binding is deferred (no filters) → just NodeScan + Expand
939        assert_eq!(ir.pipeline.len(), 2);
940        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
941        assert!(matches!(
942            &ir.pipeline[1],
943            IROp::Expand { src_var, dst_var, .. }
944            if src_var == "p" && dst_var == "c"
945        ));
946    }
947
948    /// Traversals declared in non-topological order are reordered automatically.
949    #[test]
950    fn test_lower_out_of_order_traversals() {
951        let catalog = setup();
952        let qf = parse_query(
953            r#"
954query q() {
955    match {
956        $p: Person
957        $f worksAt $c
958        $p knows $f
959        $f: Person
960        $c: Company { name: "Acme" }
961    }
962    return { $c.name }
963}
964"#,
965        )
966        .unwrap();
967        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
968        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
969
970        // Even though "$f worksAt $c" is declared before "$p knows $f",
971        // the iterative lowering processes "$p knows $f" first (because $p
972        // is bound) and then "$f worksAt $c" (once $f is bound).
973        assert_eq!(ir.pipeline.len(), 3);
974        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
975        // First expand: $p → $f (knows)
976        assert!(matches!(
977            &ir.pipeline[1],
978            IROp::Expand { src_var, dst_var, .. }
979            if src_var == "p" && dst_var == "f"
980        ));
981        // Second expand: $f → $c (worksAt), with filter from $c binding
982        assert!(matches!(
983            &ir.pipeline[2],
984            IROp::Expand { src_var, dst_var, dst_filters, .. }
985            if src_var == "f" && dst_var == "c" && dst_filters.len() == 1
986        ));
987    }
988
989    /// Wildcard $_ must not bridge unrelated components in the adjacency graph.
990    #[test]
991    fn test_lower_wildcard_does_not_bridge_components() {
992        let catalog = setup();
993        let qf = parse_query(
994            r#"
995query q() {
996    match {
997        $p: Person
998        $p knows $_
999        $c: Company
1000    }
1001    return { $p.name, $c.name }
1002}
1003"#,
1004        )
1005        .unwrap();
1006        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1007        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1008
1009        // $p and $c are in separate components (connected only through $_).
1010        // Both must get their own NodeScan — $c must NOT be deferred.
1011        // Bindings are emitted first, then traversals.
1012        assert_eq!(ir.pipeline.len(), 3);
1013        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
1014        assert!(matches!(&ir.pipeline[1], IROp::NodeScan { variable, .. } if variable == "c"));
1015        // The expand for $p knows $_ (wildcard destination)
1016        assert!(matches!(
1017            &ir.pipeline[2],
1018            IROp::Expand { src_var, dst_var, .. }
1019            if src_var == "p" && dst_var == "_"
1020        ));
1021    }
1022
1023    /// Fan-out: one root fans to two deferred destinations via different edges.
1024    #[test]
1025    fn test_lower_fan_out_topology() {
1026        let catalog = setup();
1027        let qf = parse_query(
1028            r#"
1029query q() {
1030    match {
1031        $p: Person { name: "Alice" }
1032        $p knows $f
1033        $f: Person { name: "Bob" }
1034        $p worksAt $c
1035        $c: Company { name: "Acme" }
1036    }
1037    return { $f.name, $c.name }
1038}
1039"#,
1040        )
1041        .unwrap();
1042        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1043        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1044
1045        // Root: $p. Deferred: $f, $c (both reachable from $p).
1046        assert_eq!(ir.pipeline.len(), 3);
1047        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
1048        assert!(matches!(
1049            &ir.pipeline[1],
1050            IROp::Expand { src_var, dst_var, dst_filters, .. }
1051            if src_var == "p" && dst_var == "f" && dst_filters.len() == 1
1052        ));
1053        assert!(matches!(
1054            &ir.pipeline[2],
1055            IROp::Expand { src_var, dst_var, dst_filters, .. }
1056            if src_var == "p" && dst_var == "c" && dst_filters.len() == 1
1057        ));
1058    }
1059
1060    /// Fan-in: two sources converge on one destination; second source is
1061    /// introduced via reverse expand from the shared destination.
1062    #[test]
1063    fn test_lower_fan_in_topology() {
1064        let catalog = setup();
1065        let qf = parse_query(
1066            r#"
1067query q() {
1068    match {
1069        $a: Person { name: "Alice" }
1070        $a knows $c
1071        $b: Person { name: "Bob" }
1072        $b knows $c
1073        $c: Person
1074    }
1075    return { $a.name, $b.name, $c.name }
1076}
1077"#,
1078        )
1079        .unwrap();
1080        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1081        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1082
1083        // Root: $a (first in component {a,b,c}). Deferred: $b, $c.
1084        // $a knows $c: expand(a→c). $b knows $c: reverse expand(c→b).
1085        assert_eq!(ir.pipeline.len(), 3);
1086        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "a"));
1087        assert!(matches!(
1088            &ir.pipeline[1],
1089            IROp::Expand { src_var, dst_var, dst_filters, .. }
1090            if src_var == "a" && dst_var == "c" && dst_filters.is_empty()
1091        ));
1092        assert!(matches!(
1093            &ir.pipeline[2],
1094            IROp::Expand { src_var, dst_var, dst_filters, .. }
1095            if src_var == "c" && dst_var == "b" && dst_filters.len() == 1
1096        ));
1097    }
1098
1099    /// Genuine graph cycle: deferred binding is introduced by first traversal,
1100    /// second traversal triggers cycle-closing.
1101    #[test]
1102    fn test_lower_cycle_with_deferred_binding() {
1103        let catalog = setup();
1104        let qf = parse_query(
1105            r#"
1106query q() {
1107    match {
1108        $a: Person
1109        $a knows $b
1110        $b: Person { name: "Bob" }
1111        $b knows $a
1112    }
1113    return { $a.name }
1114}
1115"#,
1116        )
1117        .unwrap();
1118        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1119        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1120
1121        // $b is deferred, introduced by first expand.
1122        // Second traversal ($b knows $a) is genuine cycle-closing.
1123        assert_eq!(ir.pipeline.len(), 4);
1124        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "a"));
1125        assert!(matches!(
1126            &ir.pipeline[1],
1127            IROp::Expand { src_var, dst_var, dst_filters, .. }
1128            if src_var == "a" && dst_var == "b" && dst_filters.len() == 1
1129        ));
1130        // Cycle-closing expand to __temp_a
1131        assert!(matches!(
1132            &ir.pipeline[2],
1133            IROp::Expand { src_var, dst_var, dst_filters, .. }
1134            if src_var == "b" && dst_var.starts_with("__temp_") && dst_filters.is_empty()
1135        ));
1136        // Cycle-closing filter: __temp_a.id == a.id
1137        assert!(matches!(&ir.pipeline[3], IROp::Filter(_)));
1138    }
1139
1140    /// Multiple filters on a single deferred binding.
1141    #[test]
1142    fn test_lower_multiple_filters_on_deferred_binding() {
1143        let catalog = setup();
1144        let qf = parse_query(
1145            r#"
1146query q() {
1147    match {
1148        $p: Person
1149        $p knows $f
1150        $f: Person { name: "Bob", age: 25 }
1151    }
1152    return { $f.name }
1153}
1154"#,
1155        )
1156        .unwrap();
1157        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1158        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1159
1160        // Two prop_matches → two dst_filters on the Expand.
1161        assert_eq!(ir.pipeline.len(), 2);
1162        assert!(matches!(
1163            &ir.pipeline[1],
1164            IROp::Expand { dst_filters, .. }
1165            if dst_filters.len() == 2
1166        ));
1167    }
1168
1169    /// Parameter in a deferred binding filter (unit test level).
1170    #[test]
1171    fn test_lower_param_filter_on_deferred_binding() {
1172        let catalog = setup();
1173        let qf = parse_query(
1174            r#"
1175query q($company: String) {
1176    match {
1177        $p: Person
1178        $p worksAt $c
1179        $c: Company { name: $company }
1180    }
1181    return { $p.name }
1182}
1183"#,
1184        )
1185        .unwrap();
1186        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1187        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1188
1189        assert_eq!(ir.pipeline.len(), 2);
1190        assert!(matches!(
1191            &ir.pipeline[1],
1192            IROp::Expand { dst_filters, .. }
1193            if dst_filters.len() == 1
1194        ));
1195        // The filter's right-hand side should be a Param, not a Literal
1196        if let IROp::Expand { dst_filters, .. } = &ir.pipeline[1] {
1197            assert!(matches!(&dst_filters[0].right, IRExpr::Param(name) if name == "company"));
1198        }
1199    }
1200
1201    /// Negation with inner binding: inner binding is NOT deferred because
1202    /// bound_vars (from outer scope) is not in binding_set for the inner call.
1203    /// This documents current behavior — the inner pipeline uses a NodeScan +
1204    /// cycle-closing, which is correct but less efficient than deferral.
1205    #[test]
1206    fn test_lower_negation_with_inner_binding() {
1207        let catalog = setup();
1208        let qf = parse_query(
1209            r#"
1210query q() {
1211    match {
1212        $p: Person
1213        not {
1214            $p worksAt $c
1215            $c: Company { name: "Acme" }
1216        }
1217    }
1218    return { $p.name }
1219}
1220"#,
1221        )
1222        .unwrap();
1223        let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
1224        let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
1225
1226        // Outer: NodeScan($p) + AntiJoin
1227        assert_eq!(ir.pipeline.len(), 2);
1228        assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
1229        let IROp::AntiJoin { inner, .. } = &ir.pipeline[1] else {
1230            panic!("expected AntiJoin");
1231        };
1232        // Inner pipeline: $c is NOT deferred (it's the only binding in the
1233        // inner scope), so it gets a NodeScan + cycle-closing (3 ops).
1234        assert_eq!(inner.len(), 3);
1235        assert!(matches!(&inner[0], IROp::NodeScan { variable, .. } if variable == "c"));
1236        assert!(matches!(&inner[1], IROp::Expand { .. }));
1237        assert!(matches!(&inner[2], IROp::Filter(_)));
1238    }
1239}