Skip to main content

mangle_analysis/
planner.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Result, anyhow};
16use fxhash::FxHashSet;
17use mangle_ir::physical::{self, Aggregate, CmpOp, Condition, DataSource, Expr, Op, Operand};
18use mangle_ir::{Inst, InstId, Ir, NameId};
19
20pub struct Planner<'a> {
21    ir: &'a mut Ir,
22    delta_pred: Option<NameId>,
23    fresh_counter: usize,
24    /// When true, emit `Op::HashJoin` for eligible two-premise joins. Off by
25    /// default. Defaults to the value of `MANGLE_HASHJOIN=1` at construction
26    /// time; tests and callers can override with `.with_hash_join(...)`.
27    hash_join: bool,
28}
29
30/// Read the `MANGLE_HASHJOIN` env var once. Used to seed `Planner::hash_join`
31/// so the flag can be set from the shell without code changes. Overridable
32/// per-planner via `with_hash_join`.
33fn hash_join_env_default() -> bool {
34    std::env::var("MANGLE_HASHJOIN")
35        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
36        .unwrap_or(false)
37}
38
39/// Replace the placeholder `Op::Nop` body inside a freshly-built `Op::HashJoin`
40/// with the real body planned after the join.
41fn splice_hash_join_body(op: Op, body: Op) -> Op {
42    match op {
43        Op::HashJoin {
44            build_source,
45            probe_source,
46            join_keys,
47            ..
48        } => Op::HashJoin {
49            build_source,
50            probe_source,
51            join_keys,
52            body: Box::new(body),
53        },
54        other => other,
55    }
56}
57
58impl<'a> Planner<'a> {
59    pub fn new(ir: &'a mut Ir) -> Self {
60        Self {
61            ir,
62            delta_pred: None,
63            fresh_counter: 0,
64            hash_join: hash_join_env_default(),
65        }
66    }
67
68    pub fn with_delta(mut self, delta_pred: NameId) -> Self {
69        self.delta_pred = Some(delta_pred);
70        self
71    }
72
73    /// Override the HashJoin emission flag (seeded from `MANGLE_HASHJOIN` env
74    /// var by default). Primarily for testing and for callers that want to
75    /// opt in or out explicitly.
76    pub fn with_hash_join(mut self, enabled: bool) -> Self {
77        self.hash_join = enabled;
78        self
79    }
80
81    pub fn plan_rule(mut self, rule_id: InstId) -> Result<Op> {
82        let (head, premises, transform) = match self.ir.get(rule_id) {
83            Inst::Rule {
84                head,
85                premises,
86                transform,
87            } => (*head, premises.clone(), transform.clone()),
88            _ => return Err(anyhow!("Not a rule")),
89        };
90
91        // Split transforms into blocks by 'do' statements
92        let blocks = self.split_transforms(transform);
93        let num_blocks = blocks.len();
94
95        let mut ops = Vec::new();
96        let mut current_source: Option<(NameId, Vec<NameId>)> = None;
97        let mut bound_vars = FxHashSet::default();
98
99        for (i, block) in blocks.into_iter().enumerate() {
100            let is_last = i == num_blocks - 1;
101
102            if i == 0 {
103                // Block 0: Premises + Lets
104                if is_last {
105                    // Only one block, no aggregations
106                    let op = self.plan_join_sequence(
107                        premises.clone(),
108                        &mut bound_vars,
109                        |planner, vars| {
110                            planner.plan_transforms_sequence(&block, vars, |p, v| {
111                                p.plan_head_insert(head, v)
112                            })
113                        },
114                    )?;
115                    ops.push(op);
116                } else {
117                    // Materialize to temp
118                    let temp_rel = self.fresh_var("temp_grp");
119                    let mut capture_vars: Vec<NameId> = Vec::new(); // Will be populated by continuation
120
121                    let op = self.plan_join_sequence(
122                        premises.clone(),
123                        &mut bound_vars,
124                        |planner, vars| {
125                            planner.plan_transforms_sequence(&block, vars, |_, v| {
126                                let mut sorted_vars: Vec<NameId> = v.iter().cloned().collect();
127                                sorted_vars.sort();
128                                capture_vars = sorted_vars.clone();
129                                let args =
130                                    sorted_vars.iter().map(|&var| Operand::Var(var)).collect();
131                                Ok(Op::Insert {
132                                    relation: temp_rel,
133                                    args,
134                                })
135                            })
136                        },
137                    )?;
138                    ops.push(op);
139                    current_source = Some((temp_rel, capture_vars));
140                }
141            } else {
142                // Block i > 0: Starts with 'do'
143                let (src_rel, src_vars) = current_source.take().expect("No source for aggregation");
144
145                if is_last {
146                    let op = self.plan_block_k(src_rel, src_vars, &block, |p, v| {
147                        p.plan_head_insert(head, v)
148                    })?;
149                    ops.push(op);
150                } else {
151                    let next_temp = self.fresh_var("temp_grp");
152                    let mut next_vars: Vec<NameId> = Vec::new();
153
154                    let op = self.plan_block_k(src_rel, src_vars, &block, |_, v| {
155                        let mut sorted_vars: Vec<NameId> = v.iter().cloned().collect();
156                        sorted_vars.sort();
157                        next_vars = sorted_vars.clone();
158                        let args = sorted_vars.iter().map(|&var| Operand::Var(var)).collect();
159                        Ok(Op::Insert {
160                            relation: next_temp,
161                            args,
162                        })
163                    })?;
164                    ops.push(op);
165                    current_source = Some((next_temp, next_vars));
166                }
167            }
168        }
169
170        if ops.len() == 1 {
171            Ok(ops.remove(0))
172        } else {
173            Ok(Op::Seq(ops))
174        }
175    }
176
177    fn split_transforms(&self, transforms: Vec<InstId>) -> Vec<Vec<InstId>> {
178        let mut blocks = Vec::new();
179        let mut current = Vec::new();
180        for t in transforms {
181            let inst = self.ir.get(t);
182            if let Inst::Transform { var: None, .. } = inst {
183                blocks.push(current);
184                current = Vec::new();
185            }
186            current.push(t);
187        }
188        blocks.push(current);
189        blocks
190    }
191
192    fn plan_block_k<F>(
193        &mut self,
194        source_rel: NameId,
195        source_vars: Vec<NameId>,
196        block: &[InstId],
197        continuation: F,
198    ) -> Result<Op>
199    where
200        F: FnOnce(&mut Self, &mut FxHashSet<NameId>) -> Result<Op>,
201    {
202        let do_stmt = block[0];
203        let rest = &block[1..];
204
205        let keys_insts = self.get_transform_app_args(do_stmt)?;
206        let mut keys = Vec::new();
207        for k in keys_insts {
208            if let Inst::Var(v) = self.ir.get(k) {
209                keys.push(*v);
210            } else {
211                return Err(anyhow!("GroupBy keys must be variables"));
212            }
213        }
214
215        let mut aggregates = Vec::new();
216        let mut lets = Vec::new();
217        for &t in rest {
218            if let Some(agg) = self.try_parse_aggregate(t)? {
219                aggregates.push(agg);
220            } else {
221                lets.push(t);
222            }
223        }
224
225        let mut inner_vars = FxHashSet::default();
226        for &k in &keys {
227            inner_vars.insert(k);
228        }
229        for agg in &aggregates {
230            inner_vars.insert(agg.var);
231        }
232
233        let body = self.plan_transforms_sequence(&lets, &mut inner_vars, continuation)?;
234
235        Ok(Op::GroupBy {
236            source: source_rel,
237            vars: source_vars,
238            keys,
239            aggregates,
240            body: Box::new(body),
241        })
242    }
243
244    fn plan_transforms_sequence<F>(
245        &mut self,
246        transforms: &[InstId],
247        bound_vars: &mut FxHashSet<NameId>,
248        continuation: F,
249    ) -> Result<Op>
250    where
251        F: FnOnce(&mut Self, &mut FxHashSet<NameId>) -> Result<Op>,
252    {
253        if transforms.is_empty() {
254            return continuation(self, bound_vars);
255        }
256
257        let t_id = transforms[0];
258        let rest = &transforms[1..];
259
260        let inst = self.ir.get(t_id).clone();
261        if let Inst::Transform {
262            var: Some(var),
263            app,
264        } = inst
265        {
266            self.inst_to_expr(app, |planner, expr| {
267                bound_vars.insert(var);
268                let body = planner.plan_transforms_sequence(rest, bound_vars, continuation)?;
269                Ok(Op::Let {
270                    var,
271                    expr,
272                    body: Box::new(body),
273                })
274            })
275        } else {
276            // Should not happen if split_transforms is correct
277            Err(anyhow!("Unexpected transform in sequence"))
278        }
279    }
280
281    fn fresh_var(&mut self, prefix: &str) -> NameId {
282        let id = self.fresh_counter;
283        self.fresh_counter += 1;
284        let name = format!("${}_{}", prefix, id);
285        self.ir.intern_name(name)
286    }
287
288    /// If this premise and the next one are both Atoms of the shape
289    /// `p(V1, V2, ...)` — every arg a fresh, unbound, non-repeating variable
290    /// — and they share at least one variable, consume the next premise and
291    /// return a `HashJoin` op whose `body` is `Op::Nop` (spliced in by the
292    /// caller). Returns `None` if the pattern does not match.
293    fn try_plan_hash_join(
294        &self,
295        predicate: NameId,
296        args: &[InstId],
297        premises: &mut Vec<InstId>,
298        bound_vars: &mut FxHashSet<NameId>,
299    ) -> Result<Option<Op>> {
300        let Some(build_vars) = self.all_fresh_distinct_vars(args, bound_vars) else {
301            return Ok(None);
302        };
303        let next_id = *premises.first().unwrap();
304        let next_inst = self.ir.get(next_id).clone();
305        let Inst::Atom {
306            predicate: next_pred,
307            args: next_args,
308        } = next_inst
309        else {
310            return Ok(None);
311        };
312        if Some(next_pred) == self.delta_pred {
313            return Ok(None);
314        }
315        let Some(probe_vars) = self.all_fresh_distinct_vars(&next_args, bound_vars) else {
316            return Ok(None);
317        };
318        let join_keys: Vec<NameId> = build_vars
319            .iter()
320            .filter(|v| probe_vars.contains(v))
321            .copied()
322            .collect();
323        if join_keys.is_empty() {
324            return Ok(None);
325        }
326
327        // Commit: consume the next premise and mark both sides' vars as bound.
328        premises.remove(0);
329        for v in build_vars.iter().chain(probe_vars.iter()) {
330            bound_vars.insert(*v);
331        }
332
333        Ok(Some(Op::HashJoin {
334            build_source: DataSource::Scan {
335                relation: predicate,
336                vars: build_vars,
337            },
338            probe_source: DataSource::Scan {
339                relation: next_pred,
340                vars: probe_vars,
341            },
342            join_keys,
343            body: Box::new(Op::Nop),
344        }))
345    }
346
347    /// Return the list of variables an atom's args bind, or `None` if any
348    /// arg is already bound, not a `Var`, or repeats. Used to gate the
349    /// HashJoin fast path — the existing nested-Iterate path handles the
350    /// general case.
351    fn all_fresh_distinct_vars(
352        &self,
353        args: &[InstId],
354        bound_vars: &FxHashSet<NameId>,
355    ) -> Option<Vec<NameId>> {
356        let mut out: Vec<NameId> = Vec::with_capacity(args.len());
357        for arg in args {
358            let Inst::Var(v) = self.ir.get(*arg) else {
359                return None;
360            };
361            if bound_vars.contains(v) || out.contains(v) {
362                return None;
363            }
364            out.push(*v);
365        }
366        Some(out)
367    }
368
369    fn plan_join_sequence<F>(
370        &mut self,
371        mut premises: Vec<InstId>,
372        bound_vars: &mut FxHashSet<NameId>,
373        continuation: F,
374    ) -> Result<Op>
375    where
376        F: FnOnce(&mut Self, &mut FxHashSet<NameId>) -> Result<Op>,
377    {
378        if premises.is_empty() {
379            return continuation(self, bound_vars);
380        }
381
382        let current_premise = premises.remove(0);
383        let inst = self.ir.get(current_premise).clone();
384
385        match inst {
386            Inst::Atom { predicate, args }
387                if matches!(
388                    self.ir.resolve_name(predicate),
389                    ":lt" | ":le" | ":gt" | ":ge"
390                        | ":time:lt" | ":time:le" | ":time:gt" | ":time:ge"
391                        | ":duration:lt" | ":duration:le" | ":duration:gt" | ":duration:ge"
392                ) =>
393            {
394                let cmp_op = match self.ir.resolve_name(predicate) {
395                    ":lt" | ":time:lt" | ":duration:lt" => CmpOp::Lt,
396                    ":le" | ":time:le" | ":duration:le" => CmpOp::Le,
397                    ":gt" | ":time:gt" | ":duration:gt" => CmpOp::Gt,
398                    ":ge" | ":time:ge" | ":duration:ge" => CmpOp::Ge,
399                    _ => unreachable!(),
400                };
401                if args.len() != 2 {
402                    return Err(anyhow!("Comparison predicate requires exactly 2 arguments"));
403                }
404                let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
405                self.with_eval(args[0], |this, left_op| {
406                    this.with_eval(args[1], |_this, right_op| {
407                        Ok(Op::Filter {
408                            cond: Condition::Cmp {
409                                op: cmp_op,
410                                left: left_op.clone(),
411                                right: right_op,
412                            },
413                            body: Box::new(body),
414                        })
415                    })
416                })
417            }
418            Inst::Atom { predicate, args }
419                if matches!(
420                    self.ir.resolve_name(predicate),
421                    ":string:starts_with"
422                        | ":string:ends_with"
423                        | ":string:contains"
424                        | ":match_prefix"
425                ) =>
426            {
427                if args.len() != 2 {
428                    return Err(anyhow!(
429                        "Built-in predicate requires exactly 2 arguments"
430                    ));
431                }
432                let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
433                self.with_eval(args[0], |this, left_op| {
434                    this.with_eval(args[1], |_this, right_op| {
435                        Ok(Op::Filter {
436                            cond: Condition::Call {
437                                function: predicate,
438                                args: vec![left_op.clone(), right_op],
439                            },
440                            body: Box::new(body),
441                        })
442                    })
443                })
444            }
445            Inst::Atom { predicate, args } => {
446                // Fast path: emit HashJoin when the env var
447                // `MANGLE_HASHJOIN=1` is set, neither side is the delta
448                // predicate, and both this premise and the next one are
449                // simple Atoms whose args are all fresh (unbound, unique,
450                // non-constant) variables sharing at least one join key.
451                //
452                // The planner is otherwise conservative: falling through to
453                // the existing nested-Iterate + IndexLookup path is always
454                // safe, so this is purely an opt-in performance path for
455                // the use cases where hash join beats index-nested-loop.
456                if self.hash_join
457                    && self.delta_pred.is_none()
458                    && !premises.is_empty()
459                {
460                    if let Some(op) = self.try_plan_hash_join(
461                        predicate,
462                        &args,
463                        &mut premises,
464                        bound_vars,
465                    )? {
466                        let body_op = self.plan_join_sequence(premises, bound_vars, continuation)?;
467                        return Ok(splice_hash_join_body(op, body_op));
468                    }
469                }
470
471                let mut scan_vars = Vec::new();
472                let mut new_bindings = Vec::new();
473
474                // Look for a potential index lookup
475                let mut index_lookup: Option<(usize, Operand)> = None;
476
477                for (i, arg) in args.iter().enumerate() {
478                    let arg_inst = self.ir.get(*arg).clone();
479                    match arg_inst {
480                        Inst::Var(v) if bound_vars.contains(&v) => {
481                            if index_lookup.is_none() {
482                                index_lookup = Some((i, Operand::Var(v)));
483                            }
484                        }
485                        Inst::Number(n) => {
486                            if index_lookup.is_none() {
487                                index_lookup =
488                                    Some((i, Operand::Const(physical::Constant::Number(n))));
489                            }
490                        }
491                        Inst::String(s) => {
492                            if index_lookup.is_none() {
493                                index_lookup =
494                                    Some((i, Operand::Const(physical::Constant::String(s))));
495                            }
496                        }
497                        Inst::Name(n) => {
498                            if index_lookup.is_none() {
499                                index_lookup =
500                                    Some((i, Operand::Const(physical::Constant::Name(n))));
501                            }
502                        }
503                        Inst::Float(fl) => {
504                            if index_lookup.is_none() {
505                                index_lookup =
506                                    Some((i, Operand::Const(physical::Constant::Float(fl))));
507                            }
508                        }
509                        Inst::Time(t) => {
510                            if index_lookup.is_none() {
511                                index_lookup =
512                                    Some((i, Operand::Const(physical::Constant::Time(t))));
513                            }
514                        }
515                        Inst::Duration(d) => {
516                            if index_lookup.is_none() {
517                                index_lookup =
518                                    Some((i, Operand::Const(physical::Constant::Duration(d))));
519                            }
520                        }
521                        _ => {}
522                    }
523                }
524
525                for arg in &args {
526                    if let Inst::Var(v) = self.ir.get(*arg)
527                        && !bound_vars.contains(v)
528                    {
529                        scan_vars.push(*v);
530                        new_bindings.push(*v);
531                        continue;
532                    }
533                    let tmp = self.fresh_var("scan");
534                    scan_vars.push(tmp);
535                    new_bindings.push(tmp);
536                }
537
538                for v in &new_bindings {
539                    bound_vars.insert(*v);
540                }
541
542                let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
543                let wrapped_body = self.apply_constraints(&args, &scan_vars, body)?;
544
545                let source = if let Some((col_idx, key)) = index_lookup {
546                    DataSource::IndexLookup {
547                        relation: predicate,
548                        col_idx,
549                        key,
550                        vars: scan_vars,
551                    }
552                } else if Some(predicate) == self.delta_pred {
553                    DataSource::ScanDelta {
554                        relation: predicate,
555                        vars: scan_vars,
556                    }
557                } else {
558                    DataSource::Scan {
559                        relation: predicate,
560                        vars: scan_vars,
561                    }
562                };
563                Ok(Op::Iterate {
564                    source,
565                    body: Box::new(wrapped_body),
566                })
567            }
568            Inst::Eq(l, r) => {
569                let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
570                self.wrap_eq_check(l, r, body)
571            }
572            Inst::Ineq(l, r) => {
573                let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
574                self.with_eval(l, |this, left_op| {
575                    this.with_eval(r, |_this, right_op| {
576                        Ok(Op::Filter {
577                            cond: Condition::Cmp {
578                                op: CmpOp::Neq,
579                                left: left_op.clone(),
580                                right: right_op,
581                            },
582                            body: Box::new(body),
583                        })
584                    })
585                })
586            }
587            Inst::NegAtom(inner) => {
588                let inner_inst = self.ir.get(inner).clone();
589                if let Inst::Atom { predicate, args } = inner_inst {
590                    let body = self.plan_join_sequence(premises, bound_vars, continuation)?;
591                    let mut neg_args = Vec::new();
592                    for arg in &args {
593                        let arg_inst = self.ir.get(*arg).clone();
594                        match arg_inst {
595                            Inst::Var(v) => neg_args.push(Operand::Var(v)),
596                            Inst::Number(n) => {
597                                neg_args.push(Operand::Const(physical::Constant::Number(n)))
598                            }
599                            Inst::String(s) => {
600                                neg_args.push(Operand::Const(physical::Constant::String(s)))
601                            }
602                            Inst::Name(n) => {
603                                neg_args.push(Operand::Const(physical::Constant::Name(n)))
604                            }
605                            Inst::Float(fl) => {
606                                neg_args.push(Operand::Const(physical::Constant::Float(fl)))
607                            }
608                            Inst::Time(t) => {
609                                neg_args.push(Operand::Const(physical::Constant::Time(t)))
610                            }
611                            Inst::Duration(d) => {
612                                neg_args.push(Operand::Const(physical::Constant::Duration(d)))
613                            }
614                            _ => return Err(anyhow!("Complex expression in negated atom")),
615                        }
616                    }
617                    Ok(Op::Filter {
618                        cond: Condition::Negation {
619                            relation: predicate,
620                            args: neg_args,
621                        },
622                        body: Box::new(body),
623                    })
624                } else {
625                    Err(anyhow!("NegAtom wraps non-atom"))
626                }
627            }
628            _ => Err(anyhow!("Unsupported premise type: {:?}", inst)),
629        }
630    }
631
632    fn apply_constraints(
633        &mut self,
634        args: &[InstId],
635        scan_vars: &[NameId],
636        mut body: Op,
637    ) -> Result<Op> {
638        for (i, arg) in args.iter().enumerate().rev() {
639            let scan_var = scan_vars[i];
640            let arg_inst = self.ir.get(*arg).clone();
641            match arg_inst {
642                Inst::Var(v) => {
643                    if v == scan_var {
644                        continue;
645                    }
646                    body = Op::Filter {
647                        cond: Condition::Cmp {
648                            op: CmpOp::Eq,
649                            left: Operand::Var(scan_var),
650                            right: Operand::Var(v),
651                        },
652                        body: Box::new(body),
653                    };
654                }
655                _ => {
656                    body = self.wrap_eval_check(*arg, Operand::Var(scan_var), body)?;
657                }
658            }
659        }
660        Ok(body)
661    }
662
663    fn wrap_eq_check(&mut self, l: InstId, r: InstId, body: Op) -> Result<Op> {
664        self.with_eval(l, |this, op_l| {
665            this.with_eval(r, |_this, op_r| {
666                Ok(Op::Filter {
667                    cond: Condition::Cmp {
668                        op: CmpOp::Eq,
669                        left: op_l,
670                        right: op_r,
671                    },
672                    body: Box::new(body),
673                })
674            })
675        })
676    }
677
678    fn wrap_eval_check(&mut self, inst: InstId, target: Operand, body: Op) -> Result<Op> {
679        self.with_eval(inst, |_this, op| {
680            Ok(Op::Filter {
681                cond: Condition::Cmp {
682                    op: CmpOp::Eq,
683                    left: target,
684                    right: op,
685                },
686                body: Box::new(body),
687            })
688        })
689    }
690
691    fn with_eval<F>(&mut self, inst: InstId, f: F) -> Result<Op>
692    where
693        F: FnOnce(&mut Self, Operand) -> Result<Op>,
694    {
695        let i = self.ir.get(inst).clone();
696        match i {
697            Inst::Var(v) => f(self, Operand::Var(v)),
698            Inst::String(s) => f(self, Operand::Const(physical::Constant::String(s))),
699            Inst::Number(n) => f(self, Operand::Const(physical::Constant::Number(n))),
700            Inst::Name(n) => f(self, Operand::Const(physical::Constant::Name(n))),
701            Inst::Float(fl) => f(self, Operand::Const(physical::Constant::Float(fl))),
702            Inst::Time(t) => f(self, Operand::Const(physical::Constant::Time(t))),
703            Inst::Duration(d) => f(self, Operand::Const(physical::Constant::Duration(d))),
704            Inst::ApplyFn { function, args } => self.with_eval_args(
705                &args,
706                0,
707                Vec::new(),
708                Box::new(|this, ops| {
709                    let tmp = this.fresh_var("call");
710                    let inner = f(this, Operand::Var(tmp))?;
711                    Ok(Op::Let {
712                        var: tmp,
713                        expr: Expr::Call {
714                            function,
715                            args: ops,
716                        },
717                        body: Box::new(inner),
718                    })
719                }),
720            ),
721            // Compound types: route through function calls
722            Inst::List(args) => {
723                let fn_name = self.ir.intern_name("fn:list".to_string());
724                self.with_eval_args(
725                    &args,
726                    0,
727                    Vec::new(),
728                    Box::new(|this, ops| {
729                        let tmp = this.fresh_var("list");
730                        let inner = f(this, Operand::Var(tmp))?;
731                        Ok(Op::Let {
732                            var: tmp,
733                            expr: Expr::Call {
734                                function: fn_name,
735                                args: ops,
736                            },
737                            body: Box::new(inner),
738                        })
739                    }),
740                )
741            }
742            Inst::Map { keys, values } => {
743                // Interleave keys and values: [k1, v1, k2, v2, ...]
744                let mut interleaved = Vec::with_capacity(keys.len() + values.len());
745                for (k, v) in keys.iter().zip(values.iter()) {
746                    interleaved.push(*k);
747                    interleaved.push(*v);
748                }
749                let fn_name = self.ir.intern_name("fn:map".to_string());
750                self.with_eval_args(
751                    &interleaved,
752                    0,
753                    Vec::new(),
754                    Box::new(|this, ops| {
755                        let tmp = this.fresh_var("map");
756                        let inner = f(this, Operand::Var(tmp))?;
757                        Ok(Op::Let {
758                            var: tmp,
759                            expr: Expr::Call {
760                                function: fn_name,
761                                args: ops,
762                            },
763                            body: Box::new(inner),
764                        })
765                    }),
766                )
767            }
768            Inst::Struct { fields, values } => {
769                // Interleave field names and values: [name1, val1, name2, val2, ...]
770                let mut interleaved = Vec::with_capacity(fields.len() + values.len());
771                for (field, val) in fields.iter().zip(values.iter()) {
772                    // Field names are NameIds, emit as Name constants
773                    let name_inst = self.ir.add_inst(Inst::Name(*field));
774                    interleaved.push(name_inst);
775                    interleaved.push(*val);
776                }
777                let fn_name = self.ir.intern_name("fn:struct".to_string());
778                self.with_eval_args(
779                    &interleaved,
780                    0,
781                    Vec::new(),
782                    Box::new(|this, ops| {
783                        let tmp = this.fresh_var("struct");
784                        let inner = f(this, Operand::Var(tmp))?;
785                        Ok(Op::Let {
786                            var: tmp,
787                            expr: Expr::Call {
788                                function: fn_name,
789                                args: ops,
790                            },
791                            body: Box::new(inner),
792                        })
793                    }),
794                )
795            }
796            _ => Err(anyhow!("Unsupported expression in evaluation")),
797        }
798    }
799
800    fn inst_to_expr<F>(&mut self, inst: InstId, f: F) -> Result<Op>
801    where
802        F: FnOnce(&mut Self, Expr) -> Result<Op>,
803    {
804        let i = self.ir.get(inst).clone();
805        match i {
806            Inst::ApplyFn { function, args } => self.with_eval_args(
807                &args,
808                0,
809                Vec::new(),
810                Box::new(|this, ops| {
811                    f(
812                        this,
813                        Expr::Call {
814                            function,
815                            args: ops,
816                        },
817                    )
818                }),
819            ),
820            _ => self.with_eval(inst, |this, op| f(this, Expr::Value(op))),
821        }
822    }
823
824    fn with_eval_args(
825        &mut self,
826        args: &[InstId],
827        index: usize,
828        mut acc: Vec<Operand>,
829        f: Box<dyn FnOnce(&mut Self, Vec<Operand>) -> Result<Op> + '_>,
830    ) -> Result<Op> {
831        if index >= args.len() {
832            return f(self, acc);
833        }
834        self.with_eval(args[index], |this, op| {
835            acc.push(op);
836            this.with_eval_args(args, index + 1, acc, f)
837        })
838    }
839
840    fn plan_head_insert(
841        &mut self,
842        head: InstId,
843        _bound_vars: &mut FxHashSet<NameId>,
844    ) -> Result<Op> {
845        let inst = self.ir.get(head).clone();
846        if let Inst::Atom { predicate, args } = inst {
847            self.with_eval_args(
848                &args,
849                0,
850                Vec::new(),
851                Box::new(|_this, ops| {
852                    Ok(Op::Insert {
853                        relation: predicate,
854                        args: ops,
855                    })
856                }),
857            )
858        } else {
859            Err(anyhow!("Head must be an atom"))
860        }
861    }
862
863    fn get_transform_app_args(&self, t_id: InstId) -> Result<Vec<InstId>> {
864        if let Inst::Transform { app, .. } = self.ir.get(t_id)
865            && let Inst::ApplyFn { args, .. } = self.ir.get(*app)
866        {
867            return Ok(args.clone());
868        }
869        Err(anyhow!("Invalid transform structure"))
870    }
871
872    fn try_parse_aggregate(&mut self, t_id: InstId) -> Result<Option<Aggregate>> {
873        let inst = self.ir.get(t_id).clone();
874        if let Inst::Transform {
875            var: Some(var),
876            app,
877        } = inst
878            && let Inst::ApplyFn { function, args } = self.ir.get(app).clone()
879        {
880            let func_name = self.ir.resolve_name(function);
881            if matches!(
882                func_name,
883                "fn:sum"
884                    | "fn:count"
885                    | "fn:max"
886                    | "fn:min"
887                    | "fn:collect"
888                    | "fn:float:sum"
889                    | "fn:float:max"
890                    | "fn:float:min"
891            ) {
892                let mut op_args = Vec::new();
893                for arg in args {
894                    let arg_inst = self.ir.get(arg).clone();
895                    match arg_inst {
896                        Inst::Var(v) => op_args.push(Operand::Var(v)),
897                        Inst::Number(n) => {
898                            op_args.push(Operand::Const(physical::Constant::Number(n)))
899                        }
900                        Inst::Float(fl) => {
901                            op_args.push(Operand::Const(physical::Constant::Float(fl)))
902                        }
903                        _ => {
904                            return Err(anyhow!(
905                                "Complex expressions in aggregates not supported yet"
906                            ));
907                        }
908                    }
909                }
910                return Ok(Some(Aggregate {
911                    var,
912                    func: function,
913                    args: op_args,
914                }));
915            }
916        }
917        Ok(None)
918    }
919}