Skip to main content

mangle_interpreter/
lib.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Mangle Interpreter
16//!
17//! A pure Rust interpreter for the Mangle Intermediate Representation (IR).
18//!
19//! This crate enables the **Edge Mode** of execution, allowing Mangle programs
20//! to run on devices or in environments where a WebAssembly runtime is not available
21//! or desired.
22//!
23//! It executes the Physical IR operations (`Op`) directly.
24//!
25//! ## Usage
26//!
27//! See `mangle-driver` for the high-level API to compile and execute source code.
28
29use anyhow::{Result, anyhow};
30use mangle_ir::physical::{Aggregate, CmpOp, Condition, Constant, DataSource, Expr, Op, Operand};
31use mangle_ir::{Ir, NameId};
32use std::collections::HashMap;
33
34pub use mangle_common::{CompoundKind, Store, Value};
35
36/// Internal storage cell. Compound values are stored inline as a `CompoundStart`
37/// marker (holding the kind and element count) followed by that many cells
38/// (which may themselves be nested compounds). Scalar values are stored directly.
39#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
40enum Cell {
41    Val(Value),
42    /// Marks the start of a compound with `n` logical elements following.
43    CompoundStart(CompoundKind, usize),
44}
45
46/// Flatten a `Value` into a sequence of `Cell`s.
47/// Scalar values become a single `Cell::Val`. Compound values become
48/// `Cell::CompoundStart(n)` followed by the flattened cells of each element.
49fn flatten_value(v: &Value, out: &mut Vec<Cell>) {
50    match v {
51        Value::Compound(kind, elems) => {
52            out.push(Cell::CompoundStart(*kind, elems.len()));
53            for elem in elems {
54                flatten_value(elem, out);
55            }
56        }
57        _ => out.push(Cell::Val(v.clone())),
58    }
59}
60
61/// Flatten a tuple of Values into a flat Vec of Cells.
62fn flatten_tuple(tuple: &[Value]) -> Vec<Cell> {
63    let mut cells = Vec::new();
64    for v in tuple {
65        flatten_value(v, &mut cells);
66    }
67    cells
68}
69
70/// Read one logical Value from cells starting at `pos`, advancing `pos`.
71fn unflatten_one(cells: &[Cell], pos: &mut usize) -> Value {
72    match &cells[*pos] {
73        Cell::Val(v) => {
74            *pos += 1;
75            v.clone()
76        }
77        Cell::CompoundStart(kind, n) => {
78            let kind = *kind;
79            let n = *n;
80            *pos += 1;
81            let mut elems = Vec::with_capacity(n);
82            for _ in 0..n {
83                elems.push(unflatten_one(cells, pos));
84            }
85            Value::Compound(kind, elems)
86        }
87    }
88}
89
90/// Skip past one logical value in a cell slice, advancing `pos`.
91fn skip_one(cells: &[Cell], pos: &mut usize) {
92    match &cells[*pos] {
93        Cell::Val(_) => *pos += 1,
94        Cell::CompoundStart(_, n) => {
95            let n = *n;
96            *pos += 1;
97            for _ in 0..n {
98                skip_one(cells, pos);
99            }
100        }
101    }
102}
103
104/// Reconstruct a `Vec<Value>` tuple from flat cells, reading `n_cols` logical values.
105fn unflatten_tuple(cells: &[Cell], n_cols: usize) -> Vec<Value> {
106    let mut pos = 0;
107    let mut tuple = Vec::with_capacity(n_cols);
108    for _ in 0..n_cols {
109        tuple.push(unflatten_one(cells, &mut pos));
110    }
111    tuple
112}
113
114/// A simple in-memory implementation of `Store`.
115/// Supports semi-naive evaluation by tracking "stable" and "delta" facts.
116///
117/// Compound values are stored inline: each `Value::Compound` is flattened into
118/// a `CompoundStart(n)` marker followed by its elements. This avoids nested
119/// heap allocations in stored tuples and enables future per-field indexing.
120#[derive(Default)]
121pub struct MemStore {
122    // Stable facts from previous iterations
123    stable: HashMap<String, Vec<Vec<Cell>>>,
124    // New facts from the current iteration
125    delta: HashMap<String, Vec<Vec<Cell>>>,
126    // Facts being collected for the next iteration
127    next_delta: HashMap<String, Vec<Vec<Cell>>>,
128
129    // Number of logical columns per relation (set on first insert).
130    arity: HashMap<String, usize>,
131
132    // Secondary indexes: (relation_name, col_idx) -> { Cell -> [row_indices] }
133    // Indexes use the first Cell of each logical column (scalars: the value itself,
134    // compounds: CompoundStart(n)). This enables index lookups on scalar columns.
135    stable_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
136    delta_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
137}
138
139impl MemStore {
140    pub fn new() -> Self {
141        Self::default()
142    }
143
144    /// Registers a relation (creating it if absent) to allow scanning it.
145    pub fn create_relation(&mut self, relation: &str) {
146        self.stable.entry(relation.to_string()).or_default();
147    }
148
149    /// Add a fact manually (for testing/setup). Auto-creates relation in stable.
150    pub fn add_fact(&mut self, relation: &str, args: Vec<Value>) {
151        let n_cols = args.len();
152        let cells = flatten_tuple(&args);
153        let table = self.stable.entry(relation.to_string()).or_default();
154        if !table.contains(&cells) {
155            let row_idx = table.len();
156            self.arity.entry(relation.to_string()).or_insert(n_cols);
157            table.push(cells.clone());
158            index_cells(&mut self.stable_indexes, relation, &cells, n_cols, row_idx);
159        }
160    }
161
162    /// Rebuilds stable and delta indexes for a single relation after mutation.
163    fn rebuild_indexes_for(&mut self, relation: &str) {
164        self.stable_indexes.retain(|(rel, _), _| rel != relation);
165        self.delta_indexes.retain(|(rel, _), _| rel != relation);
166
167        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
168
169        if let Some(table) = self.stable.get(relation) {
170            for (row_idx, cells) in table.iter().enumerate() {
171                index_cells(&mut self.stable_indexes, relation, cells, n_cols, row_idx);
172            }
173        }
174
175        if let Some(table) = self.delta.get(relation) {
176            for (row_idx, cells) in table.iter().enumerate() {
177                index_cells(&mut self.delta_indexes, relation, cells, n_cols, row_idx);
178            }
179        }
180    }
181
182    /// Unflatten stored cells into Values using the relation's arity.
183    fn to_values(&self, relation: &str, cells: &[Cell]) -> Vec<Value> {
184        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
185        if n_cols == 0 {
186            // Fallback: treat each cell as a scalar
187            cells
188                .iter()
189                .map(|c| match c {
190                    Cell::Val(v) => v.clone(),
191                    Cell::CompoundStart(_, _) => Value::Null,
192                })
193                .collect()
194        } else {
195            unflatten_tuple(cells, n_cols)
196        }
197    }
198
199    pub fn get_facts(&self, relation: &str) -> Vec<Vec<Value>> {
200        let mut all: Vec<Vec<Value>> = self
201            .stable
202            .get(relation)
203            .into_iter()
204            .flatten()
205            .map(|cells| self.to_values(relation, cells))
206            .collect();
207        if let Some(d) = self.delta.get(relation) {
208            for cells in d {
209                all.push(self.to_values(relation, cells));
210            }
211        }
212        all
213    }
214
215    /// Coalesce temporal intervals for a relation.
216    /// Groups facts by their non-temporal columns (all except the last 2),
217    /// sorts intervals by start time, and merges overlapping/adjacent intervals.
218    /// This prevents interval explosion in recursive temporal rules.
219    pub fn coalesce_temporal(&mut self, relation: &str) {
220        let n_cols = match self.arity.get(relation) {
221            Some(&n) if n >= 2 => n,
222            _ => return,
223        };
224        let key_cols = n_cols - 2; // non-temporal columns
225
226        // Collect all facts (stable + delta) as Value tuples
227        let mut all_facts: Vec<Vec<Value>> = Vec::new();
228        if let Some(table) = self.stable.get(relation) {
229            for cells in table {
230                all_facts.push(unflatten_tuple(cells, n_cols));
231            }
232        }
233        if let Some(table) = self.delta.get(relation) {
234            for cells in table {
235                all_facts.push(unflatten_tuple(cells, n_cols));
236            }
237        }
238
239        if all_facts.is_empty() {
240            return;
241        }
242
243        // Group by non-temporal key columns
244        let mut groups: HashMap<Vec<Value>, Vec<(i64, i64)>> = HashMap::new();
245        for fact in &all_facts {
246            let key: Vec<Value> = fact[..key_cols].to_vec();
247            let start = match &fact[key_cols] {
248                Value::Time(t) => *t,
249                Value::Number(n) => *n,
250                _ => continue,
251            };
252            let end = match &fact[key_cols + 1] {
253                Value::Time(t) => *t,
254                Value::Number(n) => *n,
255                _ => continue,
256            };
257            groups.entry(key).or_default().push((start, end));
258        }
259
260        // Coalesce intervals within each group
261        let mut coalesced_facts: Vec<Vec<Value>> = Vec::new();
262        for (key, mut intervals) in groups {
263            intervals.sort_by_key(|&(s, _)| s);
264            let mut merged: Vec<(i64, i64)> = vec![intervals[0]];
265            for &(s, e) in &intervals[1..] {
266                let last = merged.last_mut().unwrap();
267                // Merge if overlapping or adjacent (within 1 nanosecond)
268                if s <= last.1.saturating_add(1) {
269                    last.1 = last.1.max(e);
270                } else {
271                    merged.push((s, e));
272                }
273            }
274            for (start, end) in merged {
275                let mut fact = key.clone();
276                fact.push(Value::Time(start));
277                fact.push(Value::Time(end));
278                coalesced_facts.push(fact);
279            }
280        }
281
282        // Replace stable with coalesced facts, clear delta
283        let coalesced_cells: Vec<Vec<Cell>> = coalesced_facts
284            .iter()
285            .map(|fact| flatten_tuple(fact))
286            .collect();
287        self.stable.insert(relation.to_string(), coalesced_cells);
288        self.delta.remove(relation);
289        self.next_delta.remove(relation);
290        self.rebuild_indexes_for(relation);
291    }
292}
293
294/// Index a flattened tuple's logical columns.
295fn index_cells(
296    indexes: &mut HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
297    relation: &str,
298    cells: &[Cell],
299    n_cols: usize,
300    row_idx: usize,
301) {
302    let mut pos = 0;
303    for col_idx in 0..n_cols {
304        let key = cells[pos].clone();
305        skip_one(cells, &mut pos);
306        indexes
307            .entry((relation.to_string(), col_idx))
308            .or_default()
309            .entry(key)
310            .or_default()
311            .push(row_idx);
312    }
313}
314
315/// Convert a Value to its index Cell key (for index lookup matching).
316fn value_to_index_key(v: &Value) -> Cell {
317    match v {
318        Value::Compound(kind, elems) => Cell::CompoundStart(*kind, elems.len()),
319        _ => Cell::Val(v.clone()),
320    }
321}
322
323impl Store for MemStore {
324    fn scan(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
325        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
326        let s = self
327            .stable
328            .get(relation)
329            .into_iter()
330            .flatten()
331            .map(move |cells| unflatten_tuple(cells, n_cols));
332        let d = self
333            .delta
334            .get(relation)
335            .into_iter()
336            .flatten()
337            .map(move |cells| unflatten_tuple(cells, n_cols));
338        Ok(Box::new(s.chain(d)))
339    }
340
341    fn scan_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
342        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
343        match self.delta.get(relation) {
344            Some(tuples) => Ok(Box::new(
345                tuples
346                    .iter()
347                    .map(move |cells| unflatten_tuple(cells, n_cols)),
348            )),
349            None => Ok(Box::new(std::iter::empty())),
350        }
351    }
352
353    fn scan_next_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
354        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
355        match self.next_delta.get(relation) {
356            Some(tuples) => Ok(Box::new(
357                tuples
358                    .iter()
359                    .map(move |cells| unflatten_tuple(cells, n_cols)),
360            )),
361            None => Ok(Box::new(std::iter::empty())),
362        }
363    }
364
365    fn scan_index(
366        &self,
367        relation: &str,
368        col_idx: usize,
369        key: &Value,
370    ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
371        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
372        let cell_key = value_to_index_key(key);
373        let mut results: Vec<Vec<Value>> = Vec::new();
374
375        if let Some(idx_map) = self.stable_indexes.get(&(relation.to_string(), col_idx))
376            && let Some(row_indices) = idx_map.get(&cell_key)
377            && let Some(table) = self.stable.get(relation)
378        {
379            for &i in row_indices {
380                results.push(unflatten_tuple(&table[i], n_cols));
381            }
382        }
383
384        if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
385            && let Some(row_indices) = idx_map.get(&cell_key)
386            && let Some(table) = self.delta.get(relation)
387        {
388            for &i in row_indices {
389                results.push(unflatten_tuple(&table[i], n_cols));
390            }
391        }
392
393        Ok(Box::new(results.into_iter()))
394    }
395
396    fn scan_delta_index(
397        &self,
398        relation: &str,
399        col_idx: usize,
400        key: &Value,
401    ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
402        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
403        let cell_key = value_to_index_key(key);
404        let mut results: Vec<Vec<Value>> = Vec::new();
405
406        if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
407            && let Some(row_indices) = idx_map.get(&cell_key)
408            && let Some(table) = self.delta.get(relation)
409        {
410            for &i in row_indices {
411                results.push(unflatten_tuple(&table[i], n_cols));
412            }
413        }
414
415        Ok(Box::new(results.into_iter()))
416    }
417
418    fn insert(&mut self, relation: &str, tuple: Vec<Value>) -> Result<bool> {
419        let n_cols = tuple.len();
420        let cells = flatten_tuple(&tuple);
421
422        // Check if fact is already in stable, delta, or next_delta
423        if self
424            .stable
425            .get(relation)
426            .is_some_and(|v| v.contains(&cells))
427            || self
428                .delta
429                .get(relation)
430                .is_some_and(|v| v.contains(&cells))
431            || self
432                .next_delta
433                .get(relation)
434                .is_some_and(|v| v.contains(&cells))
435        {
436            return Ok(false);
437        }
438
439        self.arity.entry(relation.to_string()).or_insert(n_cols);
440        self.next_delta
441            .entry(relation.to_string())
442            .or_default()
443            .push(cells);
444        Ok(true)
445    }
446
447    fn merge_deltas(&mut self) {
448        // 1. Move current delta to stable
449        for (rel_name, mut tuples) in self.delta.drain() {
450            let n_cols = self.arity.get(&rel_name).copied().unwrap_or(0);
451            let table = self.stable.entry(rel_name.clone()).or_default();
452            for cells in tuples.drain(..) {
453                let row_idx = table.len();
454                index_cells(
455                    &mut self.stable_indexes,
456                    &rel_name,
457                    &cells,
458                    n_cols,
459                    row_idx,
460                );
461                table.push(cells);
462            }
463        }
464        self.delta_indexes.clear();
465
466        // 2. Move next_delta to delta and build delta index
467        self.delta = std::mem::take(&mut self.next_delta);
468        for (rel_name, tuples) in &self.delta {
469            let n_cols = self.arity.get(rel_name).copied().unwrap_or(0);
470            for (row_idx, cells) in tuples.iter().enumerate() {
471                index_cells(
472                    &mut self.delta_indexes,
473                    rel_name,
474                    cells,
475                    n_cols,
476                    row_idx,
477                );
478            }
479        }
480    }
481
482    fn create_relation(&mut self, relation: &str) {
483        self.stable.entry(relation.to_string()).or_default();
484    }
485
486    fn retract(&mut self, relation: &str, tuple: &[Value]) -> Result<bool> {
487        let cells = flatten_tuple(tuple);
488        let removed = if let Some(table) = self.stable.get_mut(relation) {
489            if let Some(pos) = table.iter().position(|t| *t == cells) {
490                table.swap_remove(pos);
491                true
492            } else {
493                false
494            }
495        } else {
496            false
497        };
498
499        // Also remove from delta and next_delta
500        if let Some(table) = self.delta.get_mut(relation) {
501            if let Some(pos) = table.iter().position(|t| *t == cells) {
502                table.swap_remove(pos);
503            }
504        }
505        if let Some(table) = self.next_delta.get_mut(relation) {
506            if let Some(pos) = table.iter().position(|t| *t == cells) {
507                table.swap_remove(pos);
508            }
509        }
510
511        if removed {
512            self.rebuild_indexes_for(relation);
513        }
514        Ok(removed)
515    }
516
517    fn clear(&mut self, relation: &str) {
518        if let Some(table) = self.stable.get_mut(relation) {
519            table.clear();
520        }
521        if let Some(table) = self.delta.get_mut(relation) {
522            table.clear();
523        }
524        if let Some(table) = self.next_delta.get_mut(relation) {
525            table.clear();
526        }
527        self.stable_indexes.retain(|(rel, _), _| rel != relation);
528        self.delta_indexes.retain(|(rel, _), _| rel != relation);
529    }
530
531    fn relation_names(&self) -> Vec<String> {
532        let mut names: Vec<String> = self.stable.keys().cloned().collect();
533        for key in self.delta.keys() {
534            if !names.contains(key) {
535                names.push(key.clone());
536            }
537        }
538        names
539    }
540
541    fn coalesce_temporal(&mut self, relation: &str) {
542        MemStore::coalesce_temporal(self, relation);
543    }
544}
545
546/// A record of one derivation: which fact was derived and which premises were used.
547#[derive(Debug, Clone)]
548pub struct ProvenanceEntry {
549    /// The derived fact: (relation_name, tuple).
550    pub derived: (String, Vec<Value>),
551    /// The premise facts that contributed: (relation_name, tuple) for each.
552    pub premises: Vec<(String, Vec<Value>)>,
553}
554
555/// Lightweight recorder that captures derivation provenance during execution.
556///
557/// When enabled on the interpreter, every successful `Op::Insert` records
558/// which premise facts (from enclosing `Op::Iterate` scans) contributed to
559/// the derivation. When disabled (the default), there is zero overhead.
560#[derive(Default)]
561pub struct ProvenanceRecorder {
562    /// All recorded derivations.
563    pub entries: Vec<ProvenanceEntry>,
564    /// Stack of currently-active scan sources (pushed on Iterate, popped after).
565    active_premises: Vec<(String, Vec<Value>)>,
566}
567
568/// A pure Rust interpreter for Mangle IR.
569pub struct Interpreter<'a> {
570    ir: &'a Ir,
571    store: Box<dyn Store + 'a>,
572    provenance: Option<ProvenanceRecorder>,
573}
574
575struct Env {
576    vars: HashMap<NameId, Value>,
577}
578
579impl Env {
580    fn new() -> Self {
581        Self {
582            vars: HashMap::new(),
583        }
584    }
585}
586
587impl<'a> Interpreter<'a> {
588    pub fn new(ir: &'a Ir, store: Box<dyn Store + 'a>) -> Self {
589        Self {
590            ir,
591            store,
592            provenance: None,
593        }
594    }
595
596    /// Enable provenance recording. When set, every successful insert
597    /// records which premise facts were in the current environment.
598    pub fn with_provenance(mut self) -> Self {
599        self.provenance = Some(ProvenanceRecorder::default());
600        self
601    }
602
603    /// Helper to get the underlying store (e.g. to inspect results).
604    pub fn store(&self) -> &dyn Store {
605        &*self.store
606    }
607
608    /// Helper to get the underlying store mutably.
609    pub fn store_mut(&mut self) -> &mut dyn Store {
610        &mut *self.store
611    }
612
613    /// Consumes the interpreter and returns the underlying store.
614    pub fn into_store(self) -> Box<dyn Store + 'a> {
615        self.store
616    }
617
618    /// Consume the interpreter, returning the provenance recorder if enabled.
619    pub fn into_provenance(self) -> Option<ProvenanceRecorder> {
620        self.provenance
621    }
622
623    /// Consume the interpreter, returning the store and optional provenance.
624    pub fn into_parts(self) -> (Box<dyn Store + 'a>, Option<ProvenanceRecorder>) {
625        (self.store, self.provenance)
626    }
627
628    /// Executes the operation and returns the number of facts inserted.
629    pub fn execute(&mut self, op: &Op) -> Result<usize> {
630        let mut env = Env::new();
631        self.exec_op(op, &mut env)
632    }
633
634    fn exec_op(&mut self, op: &Op, env: &mut Env) -> Result<usize> {
635        match op {
636            Op::Nop => Ok(0),
637            Op::Seq(ops) => {
638                let mut count = 0;
639                for o in ops {
640                    count += self.exec_op(o, env)?;
641                }
642                Ok(count)
643            }
644            Op::Iterate { source, body } => {
645                let mut count = 0;
646                match source {
647                    DataSource::Scan { relation, vars } => {
648                        let rel_name = self.ir.resolve_name(*relation);
649                        let iter = self.store.scan(rel_name)?;
650                        let tuples: Vec<_> = iter.collect();
651
652                        for tuple in tuples {
653                            if tuple.len() != vars.len() {
654                                continue;
655                            }
656                            for (i, var) in vars.iter().enumerate() {
657                                env.vars.insert(*var, tuple[i].clone());
658                            }
659                            if let Some(ref mut prov) = self.provenance {
660                                prov.active_premises
661                                    .push((rel_name.to_string(), tuple.clone()));
662                            }
663                            count += self.exec_op(body, env)?;
664                            if self.provenance.is_some() {
665                                self.provenance.as_mut().unwrap().active_premises.pop();
666                            }
667                        }
668                    }
669                    DataSource::ScanDelta { relation, vars } => {
670                        let rel_name = self.ir.resolve_name(*relation);
671                        let iter = self.store.scan_delta(rel_name)?;
672                        let tuples: Vec<_> = iter.collect();
673
674                        for tuple in tuples {
675                            if tuple.len() != vars.len() {
676                                continue;
677                            }
678                            for (i, var) in vars.iter().enumerate() {
679                                env.vars.insert(*var, tuple[i].clone());
680                            }
681                            if let Some(ref mut prov) = self.provenance {
682                                prov.active_premises
683                                    .push((rel_name.to_string(), tuple.clone()));
684                            }
685                            count += self.exec_op(body, env)?;
686                            if self.provenance.is_some() {
687                                self.provenance.as_mut().unwrap().active_premises.pop();
688                            }
689                        }
690                    }
691                    DataSource::IndexLookup {
692                        relation,
693                        col_idx,
694                        key,
695                        vars,
696                    } => {
697                        let rel_name = self.ir.resolve_name(*relation);
698                        let key_val = self.eval_operand(key, env)?;
699
700                        let iter = self.store.scan_index(rel_name, *col_idx, &key_val)?;
701                        let tuples: Vec<_> = iter.collect();
702
703                        for tuple in tuples {
704                            if tuple.len() != vars.len() {
705                                continue;
706                            }
707                            for (i, var) in vars.iter().enumerate() {
708                                env.vars.insert(*var, tuple[i].clone());
709                            }
710                            if let Some(ref mut prov) = self.provenance {
711                                prov.active_premises
712                                    .push((rel_name.to_string(), tuple.clone()));
713                            }
714                            count += self.exec_op(body, env)?;
715                            if self.provenance.is_some() {
716                                self.provenance.as_mut().unwrap().active_premises.pop();
717                            }
718                        }
719                    }
720                }
721                Ok(count)
722            }
723            Op::Filter { cond, body } => {
724                if self.eval_cond(cond, env)? {
725                    self.exec_op(body, env)
726                } else {
727                    Ok(0)
728                }
729            }
730            Op::Insert { relation, args } => {
731                let rel_name = self.ir.resolve_name(*relation);
732                let mut tuple = Vec::new();
733                for arg in args {
734                    tuple.push(self.eval_operand(arg, env)?);
735                }
736                let is_new = self.store.insert(rel_name, tuple.clone())?;
737                if is_new {
738                    if let Some(ref mut prov) = self.provenance {
739                        prov.entries.push(ProvenanceEntry {
740                            derived: (rel_name.to_string(), tuple),
741                            premises: prov.active_premises.clone(),
742                        });
743                    }
744                    Ok(1)
745                } else {
746                    Ok(0)
747                }
748            }
749            Op::Let { var, expr, body } => {
750                let val = self.eval_expr(expr, env)?;
751                env.vars.insert(*var, val);
752                self.exec_op(body, env)
753            }
754            Op::HashJoin {
755                build_source,
756                probe_source,
757                join_keys,
758                body,
759            } => self.exec_hash_join(build_source, probe_source, join_keys, body, env),
760            Op::GroupBy {
761                source,
762                vars,
763                keys,
764                aggregates,
765                body,
766            } => {
767                let rel_name = self.ir.resolve_name(*source);
768
769                // For GroupBy, we must scan ALL available facts including ones just produced in this stratum
770                // if we want to match Go implementation's behavior for non-recursive strata.
771                let iter = self.store.scan(rel_name)?;
772                let mut tuples: Vec<_> = iter.collect();
773
774                // Also scan next_delta if it's the same relation
775                if let Ok(nd_iter) = self.store.scan_next_delta(rel_name) {
776                    tuples.extend(nd_iter);
777                }
778
779                let mut groups: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
780
781                for tuple in tuples {
782                    if tuple.len() != vars.len() {
783                        continue;
784                    }
785                    // Temporarily bind variables to extract key
786                    for (i, var) in vars.iter().enumerate() {
787                        env.vars.insert(*var, tuple[i].clone());
788                    }
789
790                    let mut key = Vec::new();
791                    for k in keys {
792                        if let Some(val) = env.vars.get(k) {
793                            key.push(val.clone());
794                        } else {
795                            // Should not happen if well-typed
796                            key.push(Value::Null);
797                        }
798                    }
799                    groups.entry(key).or_default().push(tuple);
800                }
801
802                let mut count = 0;
803                for (key, group_tuples) in groups {
804                    // Bind keys
805                    for (i, k) in keys.iter().enumerate() {
806                        env.vars.insert(*k, key[i].clone());
807                    }
808
809                    // Compute aggregates
810                    for agg in aggregates {
811                        let val = self.eval_aggregate(agg, &group_tuples, vars, env)?;
812                        env.vars.insert(agg.var, val);
813                    }
814
815                    count += self.exec_op(body, env)?;
816                }
817                Ok(count)
818            }
819        }
820    }
821
822    /// Iterate a `DataSource` and return all tuples along with the var list
823    /// the tuples bind. Used by `HashJoin` to materialize the build side and
824    /// to stream the probe side.
825    fn collect_data_source(
826        &mut self,
827        source: &DataSource,
828        env: &mut Env,
829    ) -> Result<(Vec<Vec<Value>>, Vec<NameId>)> {
830        match source {
831            DataSource::Scan { relation, vars } => {
832                let rel_name = self.ir.resolve_name(*relation);
833                let tuples: Vec<_> = self.store.scan(rel_name)?.collect();
834                Ok((tuples, vars.clone()))
835            }
836            DataSource::ScanDelta { relation, vars } => {
837                let rel_name = self.ir.resolve_name(*relation);
838                let tuples: Vec<_> = self.store.scan_delta(rel_name)?.collect();
839                Ok((tuples, vars.clone()))
840            }
841            DataSource::IndexLookup {
842                relation,
843                col_idx,
844                key,
845                vars,
846            } => {
847                let rel_name = self.ir.resolve_name(*relation);
848                let key_val = self.eval_operand(key, env)?;
849                let tuples: Vec<_> =
850                    self.store.scan_index(rel_name, *col_idx, &key_val)?.collect();
851                Ok((tuples, vars.clone()))
852            }
853        }
854    }
855
856    fn exec_hash_join(
857        &mut self,
858        build_source: &DataSource,
859        probe_source: &DataSource,
860        join_keys: &[NameId],
861        body: &Op,
862        env: &mut Env,
863    ) -> Result<usize> {
864        let (build_tuples, build_vars) = self.collect_data_source(build_source, env)?;
865
866        // Position of each join-key variable inside build_vars. If any
867        // join_key is not in build_vars, the plan is malformed.
868        let build_key_positions: Vec<usize> = join_keys
869            .iter()
870            .map(|k| {
871                build_vars
872                    .iter()
873                    .position(|v| v == k)
874                    .ok_or_else(|| anyhow!("HashJoin: join key not in build_source vars"))
875            })
876            .collect::<Result<Vec<_>>>()?;
877
878        let mut table: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
879        for tuple in build_tuples {
880            if tuple.len() != build_vars.len() {
881                continue;
882            }
883            let key: Vec<Value> = build_key_positions
884                .iter()
885                .map(|&i| tuple[i].clone())
886                .collect();
887            table.entry(key).or_default().push(tuple);
888        }
889
890        let (probe_tuples, probe_vars) = self.collect_data_source(probe_source, env)?;
891        let probe_key_positions: Vec<usize> = join_keys
892            .iter()
893            .map(|k| {
894                probe_vars
895                    .iter()
896                    .position(|v| v == k)
897                    .ok_or_else(|| anyhow!("HashJoin: join key not in probe_source vars"))
898            })
899            .collect::<Result<Vec<_>>>()?;
900
901        let mut count = 0;
902        for probe_tuple in probe_tuples {
903            if probe_tuple.len() != probe_vars.len() {
904                continue;
905            }
906            let key: Vec<Value> = probe_key_positions
907                .iter()
908                .map(|&i| probe_tuple[i].clone())
909                .collect();
910            let Some(matches) = table.get(&key) else {
911                continue;
912            };
913            // Bind probe-side vars once; build-side bindings cycle per match.
914            for (i, var) in probe_vars.iter().enumerate() {
915                env.vars.insert(*var, probe_tuple[i].clone());
916            }
917            for build_tuple in matches {
918                for (i, var) in build_vars.iter().enumerate() {
919                    env.vars.insert(*var, build_tuple[i].clone());
920                }
921                count += self.exec_op(body, env)?;
922            }
923        }
924        Ok(count)
925    }
926
927    fn eval_aggregate(
928        &self,
929        agg: &Aggregate,
930        group: &[Vec<Value>],
931        vars: &[NameId],
932        env: &mut Env,
933    ) -> Result<Value> {
934        let fn_name = self.ir.resolve_name(agg.func);
935        match fn_name {
936            "fn:count" => Ok(Value::Number(group.len() as i64)),
937            "fn:sum" => {
938                let arg = agg
939                    .args
940                    .first()
941                    .ok_or_else(|| anyhow!("fn:sum requires 1 argument"))?;
942
943                let mut int_sum: i64 = 0;
944                for tuple in group {
945                    for (i, var) in vars.iter().enumerate() {
946                        env.vars.insert(*var, tuple[i].clone());
947                    }
948                    let val = self.eval_operand(arg, env)?;
949                    match val {
950                        Value::Number(n) => int_sum += n,
951                        _ => return Err(anyhow!("fn:sum: expected integer, got {val}")),
952                    }
953                }
954                Ok(Value::Number(int_sum))
955            }
956            "fn:float:sum" => {
957                let arg = agg
958                    .args
959                    .first()
960                    .ok_or_else(|| anyhow!("fn:float:sum requires 1 argument"))?;
961
962                let mut float_sum: f64 = 0.0;
963                for tuple in group {
964                    for (i, var) in vars.iter().enumerate() {
965                        env.vars.insert(*var, tuple[i].clone());
966                    }
967                    let val = self.eval_operand(arg, env)?;
968                    float_sum += value_as_float(&val)?;
969                }
970                Ok(Value::Float(float_sum))
971            }
972            "fn:max" => {
973                let mut max_val = None;
974                let arg = agg
975                    .args
976                    .first()
977                    .ok_or_else(|| anyhow!("fn:max requires 1 argument"))?;
978
979                for tuple in group {
980                    for (i, var) in vars.iter().enumerate() {
981                        env.vars.insert(*var, tuple[i].clone());
982                    }
983                    let val = self.eval_operand(arg, env)?;
984                    match max_val {
985                        None => max_val = Some(val),
986                        Some(ref m) => {
987                            if val > *m {
988                                max_val = Some(val);
989                            }
990                        }
991                    }
992                }
993                max_val.ok_or_else(|| anyhow!("fn:max on empty group"))
994            }
995            "fn:min" => {
996                let mut min_val = None;
997                let arg = agg
998                    .args
999                    .first()
1000                    .ok_or_else(|| anyhow!("fn:min requires 1 argument"))?;
1001
1002                for tuple in group {
1003                    for (i, var) in vars.iter().enumerate() {
1004                        env.vars.insert(*var, tuple[i].clone());
1005                    }
1006                    let val = self.eval_operand(arg, env)?;
1007                    match min_val {
1008                        None => min_val = Some(val),
1009                        Some(ref m) => {
1010                            if val < *m {
1011                                min_val = Some(val);
1012                            }
1013                        }
1014                    }
1015                }
1016                min_val.ok_or_else(|| anyhow!("fn:min on empty group"))
1017            }
1018            "fn:float:max" => {
1019                let mut max_val: Option<f64> = None;
1020                let arg = agg
1021                    .args
1022                    .first()
1023                    .ok_or_else(|| anyhow!("fn:float:max requires 1 argument"))?;
1024
1025                for tuple in group {
1026                    for (i, var) in vars.iter().enumerate() {
1027                        env.vars.insert(*var, tuple[i].clone());
1028                    }
1029                    let val = self.eval_operand(arg, env)?;
1030                    let f = value_as_float(&val)?;
1031                    max_val = Some(match max_val {
1032                        None => f,
1033                        Some(m) => f.max(m),
1034                    });
1035                }
1036                max_val
1037                    .map(Value::Float)
1038                    .ok_or_else(|| anyhow!("fn:float:max on empty group"))
1039            }
1040            "fn:float:min" => {
1041                let mut min_val: Option<f64> = None;
1042                let arg = agg
1043                    .args
1044                    .first()
1045                    .ok_or_else(|| anyhow!("fn:float:min requires 1 argument"))?;
1046
1047                for tuple in group {
1048                    for (i, var) in vars.iter().enumerate() {
1049                        env.vars.insert(*var, tuple[i].clone());
1050                    }
1051                    let val = self.eval_operand(arg, env)?;
1052                    let f = value_as_float(&val)?;
1053                    min_val = Some(match min_val {
1054                        None => f,
1055                        Some(m) => f.min(m),
1056                    });
1057                }
1058                min_val
1059                    .map(Value::Float)
1060                    .ok_or_else(|| anyhow!("fn:float:min on empty group"))
1061            }
1062            _ => Err(anyhow!("Unknown aggregation function: {fn_name}")),
1063        }
1064    }
1065
1066    fn eval_cond(&self, cond: &Condition, env: &Env) -> Result<bool> {
1067        match cond {
1068            Condition::Cmp { op, left, right } => {
1069                let l = self.eval_operand(left, env)?;
1070                let r = self.eval_operand(right, env)?;
1071                match op {
1072                    CmpOp::Eq => Ok(l == r),
1073                    CmpOp::Neq => Ok(l != r),
1074                    CmpOp::Lt => Ok(l < r),
1075                    CmpOp::Le => Ok(l <= r),
1076                    CmpOp::Gt => Ok(l > r),
1077                    CmpOp::Ge => Ok(l >= r),
1078                }
1079            }
1080            Condition::Negation { relation, args } => {
1081                let rel_name = self.ir.resolve_name(*relation);
1082                let iter = self.store.scan(rel_name)?;
1083                for tuple in iter {
1084                    let mut mat = true;
1085                    if tuple.len() != args.len() {
1086                        continue;
1087                    }
1088                    for (i, arg) in args.iter().enumerate() {
1089                        let val = self.eval_operand(arg, env)?;
1090                        if tuple[i] != val {
1091                            mat = false;
1092                            break;
1093                        }
1094                    }
1095                    if mat {
1096                        return Ok(false); // Found match, negation fails
1097                    }
1098                }
1099                Ok(true) // No match found
1100            }
1101            Condition::Call { function, args } => {
1102                let fn_name = self.ir.resolve_name(*function);
1103                let mut vals = Vec::new();
1104                for arg in args {
1105                    vals.push(self.eval_operand(arg, env)?);
1106                }
1107                self.eval_builtin_predicate(fn_name, &vals)
1108            }
1109        }
1110    }
1111
1112    fn eval_builtin_predicate(&self, name: &str, vals: &[Value]) -> Result<bool> {
1113        match name {
1114            ":string:starts_with" => match (&vals[0], &vals[1]) {
1115                (Value::String(s), Value::String(p)) => Ok(s.starts_with(p.as_str())),
1116                _ => Err(anyhow!(":string:starts_with: expected string arguments")),
1117            },
1118            ":string:ends_with" => match (&vals[0], &vals[1]) {
1119                (Value::String(s), Value::String(p)) => Ok(s.ends_with(p.as_str())),
1120                _ => Err(anyhow!(":string:ends_with: expected string arguments")),
1121            },
1122            ":string:contains" => match (&vals[0], &vals[1]) {
1123                (Value::String(s), Value::String(p)) => Ok(s.contains(p.as_str())),
1124                _ => Err(anyhow!(":string:contains: expected string arguments")),
1125            },
1126            ":match_prefix" => match (&vals[0], &vals[1]) {
1127                (Value::Name(name), Value::Name(prefix)) => {
1128                    Ok(name.starts_with(prefix.as_str()) && name.len() > prefix.len())
1129                }
1130                _ => Err(anyhow!(":match_prefix: expected name arguments")),
1131            },
1132            _ => Err(anyhow!("Unknown built-in predicate: {name}")),
1133        }
1134    }
1135
1136    fn eval_expr(&self, expr: &Expr, env: &Env) -> Result<Value> {
1137        match expr {
1138            Expr::Value(op) => self.eval_operand(op, env),
1139            Expr::Call { function, args } => {
1140                let fn_name = self.ir.resolve_name(*function);
1141                let mut vals = Vec::new();
1142                for arg in args {
1143                    vals.push(self.eval_operand(arg, env)?);
1144                }
1145                eval_function(fn_name, &vals)
1146            }
1147        }
1148    }
1149
1150    fn eval_operand(&self, op: &Operand, env: &Env) -> Result<Value> {
1151        match op {
1152            Operand::Var(v) => env
1153                .vars
1154                .get(v)
1155                .cloned()
1156                .ok_or_else(|| anyhow!("Variable not found")),
1157            Operand::Const(c) => match c {
1158                Constant::Number(n) => Ok(Value::Number(*n)),
1159                Constant::Float(f) => Ok(Value::Float(*f)),
1160                Constant::String(sid) => {
1161                    Ok(Value::String(self.ir.resolve_string(*sid).to_string()))
1162                }
1163                Constant::Name(nid) => Ok(Value::Name(self.ir.resolve_name(*nid).to_string())),
1164                Constant::Time(t) => Ok(Value::Time(*t)),
1165                Constant::Duration(d) => Ok(Value::Duration(*d)),
1166            },
1167        }
1168    }
1169}
1170
1171// --- Helper functions ---
1172
1173fn value_as_float(v: &Value) -> Result<f64> {
1174    match v {
1175        Value::Float(f) => Ok(*f),
1176        Value::Number(n) => Ok(*n as f64),
1177        _ => Err(anyhow!("expected numeric value, got {v}")),
1178    }
1179}
1180
1181fn value_to_string(v: &Value) -> String {
1182    match v {
1183        Value::Number(n) => n.to_string(),
1184        Value::Float(f) => format!("{f}"),
1185        Value::String(s) => s.clone(),
1186        Value::Name(s) => s.clone(),
1187        Value::Time(t) => format!("{}", Value::Time(*t)),
1188        Value::Duration(d) => format!("{}", Value::Duration(*d)),
1189        Value::Compound(kind, elems) => format!("{}", Value::Compound(*kind, elems.clone())),
1190        Value::Null => "null".to_string(),
1191    }
1192}
1193
1194/// Evaluate a built-in function call.
1195pub fn eval_function(fn_name: &str, vals: &[Value]) -> Result<Value> {
1196    match fn_name {
1197        // --- Integer arithmetic (variadic) ---
1198        "fn:plus" => {
1199            let mut sum: i64 = 0;
1200            for v in vals {
1201                match v {
1202                    Value::Number(n) => sum += n,
1203                    _ => return Err(anyhow!("fn:plus: expected integer, got {v}")),
1204                }
1205            }
1206            Ok(Value::Number(sum))
1207        }
1208        "fn:minus" => {
1209            if vals.is_empty() {
1210                return Err(anyhow!("fn:minus: requires at least 1 argument"));
1211            }
1212            let first = match &vals[0] {
1213                Value::Number(n) => *n,
1214                v => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1215            };
1216            if vals.len() == 1 {
1217                return Ok(Value::Number(-first));
1218            }
1219            let mut result = first;
1220            for v in &vals[1..] {
1221                match v {
1222                    Value::Number(n) => result -= n,
1223                    _ => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1224                }
1225            }
1226            Ok(Value::Number(result))
1227        }
1228        "fn:mult" => {
1229            let mut product: i64 = 1;
1230            for v in vals {
1231                match v {
1232                    Value::Number(n) => product *= n,
1233                    _ => return Err(anyhow!("fn:mult: expected integer, got {v}")),
1234                }
1235            }
1236            Ok(Value::Number(product))
1237        }
1238        "fn:div" => {
1239            if vals.is_empty() {
1240                return Err(anyhow!("fn:div: requires at least 1 argument"));
1241            }
1242            let first = match &vals[0] {
1243                Value::Number(n) => *n,
1244                v => return Err(anyhow!("fn:div: expected integer, got {v}")),
1245            };
1246            if vals.len() == 1 {
1247                if first == 0 {
1248                    return Err(anyhow!("Division by zero in fn:div"));
1249                }
1250                return Ok(Value::Number(1 / first));
1251            }
1252            let mut result = first;
1253            for v in &vals[1..] {
1254                match v {
1255                    Value::Number(0) => return Err(anyhow!("Division by zero in fn:div")),
1256                    Value::Number(n) => {
1257                        result /= n;
1258                        if result == 0 {
1259                            return Ok(Value::Number(0));
1260                        }
1261                    }
1262                    _ => return Err(anyhow!("fn:div: expected integer, got {v}")),
1263                }
1264            }
1265            Ok(Value::Number(result))
1266        }
1267
1268        // --- Float arithmetic (variadic, accepts Number via promotion) ---
1269        "fn:float:plus" => {
1270            let mut sum: f64 = 0.0;
1271            for v in vals {
1272                sum += value_as_float(v)?;
1273            }
1274            Ok(Value::Float(sum))
1275        }
1276        "fn:float:minus" => {
1277            if vals.is_empty() {
1278                return Err(anyhow!("fn:float:minus: requires at least 1 argument"));
1279            }
1280            let first = value_as_float(&vals[0])?;
1281            if vals.len() == 1 {
1282                return Ok(Value::Float(-first));
1283            }
1284            let mut result = first;
1285            for v in &vals[1..] {
1286                result -= value_as_float(v)?;
1287            }
1288            Ok(Value::Float(result))
1289        }
1290        "fn:float:mult" => {
1291            let mut product: f64 = 1.0;
1292            for v in vals {
1293                product *= value_as_float(v)?;
1294            }
1295            Ok(Value::Float(product))
1296        }
1297        "fn:float:div" => {
1298            if vals.is_empty() {
1299                return Err(anyhow!("fn:float:div: requires at least 1 argument"));
1300            }
1301            let first = value_as_float(&vals[0])?;
1302            if vals.len() == 1 {
1303                if first == 0.0 {
1304                    return Err(anyhow!("Division by zero in fn:float:div"));
1305                }
1306                return Ok(Value::Float(1.0 / first));
1307            }
1308            let mut result = first;
1309            for v in &vals[1..] {
1310                let d = value_as_float(v)?;
1311                if d == 0.0 {
1312                    return Err(anyhow!("Division by zero in fn:float:div"));
1313                }
1314                result /= d;
1315            }
1316            Ok(Value::Float(result))
1317        }
1318        "fn:sqrt" => {
1319            if vals.len() != 1 {
1320                return Err(anyhow!("fn:sqrt: requires exactly 1 argument"));
1321            }
1322            let f = value_as_float(&vals[0])?;
1323            Ok(Value::Float(f.sqrt()))
1324        }
1325
1326        // --- String functions ---
1327        "fn:string:concat" => {
1328            let mut result = String::new();
1329            for v in vals {
1330                result.push_str(&value_to_string(v));
1331            }
1332            Ok(Value::String(result))
1333        }
1334        "fn:string:replace" => {
1335            if vals.len() != 4 {
1336                return Err(anyhow!("fn:string:replace: requires 4 arguments (string, old, new, count)"));
1337            }
1338            let s = match &vals[0] {
1339                Value::String(s) => s,
1340                v => return Err(anyhow!("fn:string:replace: first arg must be string, got {v}")),
1341            };
1342            let old = match &vals[1] {
1343                Value::String(s) => s,
1344                v => return Err(anyhow!("fn:string:replace: second arg must be string, got {v}")),
1345            };
1346            let new_s = match &vals[2] {
1347                Value::String(s) => s,
1348                v => return Err(anyhow!("fn:string:replace: third arg must be string, got {v}")),
1349            };
1350            let count = match &vals[3] {
1351                Value::Number(n) => *n,
1352                v => return Err(anyhow!("fn:string:replace: fourth arg must be number, got {v}")),
1353            };
1354            let result = if count < 0 {
1355                s.replace(old.as_str(), new_s.as_str())
1356            } else {
1357                s.replacen(old.as_str(), new_s.as_str(), count as usize)
1358            };
1359            Ok(Value::String(result))
1360        }
1361
1362        // --- Type conversion functions ---
1363        "fn:number:to_string" => {
1364            if vals.len() != 1 {
1365                return Err(anyhow!("fn:number:to_string: requires 1 argument"));
1366            }
1367            match &vals[0] {
1368                Value::Number(n) => Ok(Value::String(n.to_string())),
1369                v => Err(anyhow!("fn:number:to_string: expected number, got {v}")),
1370            }
1371        }
1372        "fn:float64:to_string" => {
1373            if vals.len() != 1 {
1374                return Err(anyhow!("fn:float64:to_string: requires 1 argument"));
1375            }
1376            match &vals[0] {
1377                Value::Float(f) => Ok(Value::String(format!("{f}"))),
1378                v => Err(anyhow!("fn:float64:to_string: expected float, got {v}")),
1379            }
1380        }
1381        "fn:name:to_string" => {
1382            if vals.len() != 1 {
1383                return Err(anyhow!("fn:name:to_string: requires 1 argument"));
1384            }
1385            match &vals[0] {
1386                Value::Name(s) => Ok(Value::String(s.clone())),
1387                v => Err(anyhow!("fn:name:to_string: expected name, got {v}")),
1388            }
1389        }
1390
1391        // --- Time functions ---
1392        "fn:time:now" => {
1393            if !vals.is_empty() {
1394                return Err(anyhow!("fn:time:now: takes no arguments"));
1395            }
1396            let nanos = std::time::SystemTime::now()
1397                .duration_since(std::time::UNIX_EPOCH)
1398                .map_err(|e| anyhow!("fn:time:now: {e}"))?
1399                .as_nanos() as i64;
1400            Ok(Value::Time(nanos))
1401        }
1402        "fn:time:add" => {
1403            if vals.len() != 2 {
1404                return Err(anyhow!("fn:time:add: requires 2 arguments (time, duration)"));
1405            }
1406            match (&vals[0], &vals[1]) {
1407                (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t + d)),
1408                _ => Err(anyhow!("fn:time:add: expected (time, duration), got ({}, {})", vals[0], vals[1])),
1409            }
1410        }
1411        "fn:time:sub" => {
1412            if vals.len() != 2 {
1413                return Err(anyhow!("fn:time:sub: requires 2 arguments"));
1414            }
1415            match (&vals[0], &vals[1]) {
1416                (Value::Time(t1), Value::Time(t2)) => Ok(Value::Duration(t1 - t2)),
1417                (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t - d)),
1418                _ => Err(anyhow!("fn:time:sub: expected (time, time) or (time, duration)")),
1419            }
1420        }
1421        "fn:time:year" => time_component(vals, |secs, _| {
1422            let (y, _, _) = civil_from_epoch_secs(secs);
1423            y as i64
1424        }),
1425        "fn:time:month" => time_component(vals, |secs, _| {
1426            let (_, m, _) = civil_from_epoch_secs(secs);
1427            m as i64
1428        }),
1429        "fn:time:day" => time_component(vals, |secs, _| {
1430            let (_, _, d) = civil_from_epoch_secs(secs);
1431            d as i64
1432        }),
1433        "fn:time:hour" => time_component(vals, |secs, _| {
1434            secs.rem_euclid(86400) / 3600
1435        }),
1436        "fn:time:minute" => time_component(vals, |secs, _| {
1437            (secs.rem_euclid(86400) % 3600) / 60
1438        }),
1439        "fn:time:second" => time_component(vals, |secs, _| {
1440            secs.rem_euclid(86400) % 60
1441        }),
1442        "fn:time:from_unix_nanos" => {
1443            if vals.len() != 1 {
1444                return Err(anyhow!("fn:time:from_unix_nanos: requires 1 argument"));
1445            }
1446            match &vals[0] {
1447                Value::Number(n) => Ok(Value::Time(*n)),
1448                v => Err(anyhow!("fn:time:from_unix_nanos: expected number, got {v}")),
1449            }
1450        }
1451        "fn:time:to_unix_nanos" => {
1452            if vals.len() != 1 {
1453                return Err(anyhow!("fn:time:to_unix_nanos: requires 1 argument"));
1454            }
1455            match &vals[0] {
1456                Value::Time(t) => Ok(Value::Number(*t)),
1457                v => Err(anyhow!("fn:time:to_unix_nanos: expected time, got {v}")),
1458            }
1459        }
1460        "fn:time:trunc" => {
1461            if vals.len() != 2 {
1462                return Err(anyhow!("fn:time:trunc: requires 2 arguments (time, unit_name)"));
1463            }
1464            let t = match &vals[0] {
1465                Value::Time(t) => *t,
1466                v => return Err(anyhow!("fn:time:trunc: first arg must be time, got {v}")),
1467            };
1468            let unit_name = match &vals[1] {
1469                Value::Name(s) => s.as_str(),
1470                v => return Err(anyhow!("fn:time:trunc: second arg must be name, got {v}")),
1471            };
1472            let d: i64 = match unit_name {
1473                "/nanosecond" => 1,
1474                "/microsecond" => 1_000,
1475                "/millisecond" => 1_000_000,
1476                "/second" => 1_000_000_000,
1477                "/minute" => 60 * 1_000_000_000,
1478                "/hour" => 3600 * 1_000_000_000,
1479                "/day" => 24 * 3600 * 1_000_000_000,
1480                _ => return Err(anyhow!("fn:time:trunc: unknown unit {unit_name:?}")),
1481            };
1482            Ok(Value::Time(t - t.rem_euclid(d)))
1483        }
1484        "fn:time:format" => {
1485            if vals.len() != 2 {
1486                return Err(anyhow!("fn:time:format: requires 2 arguments (time, precision)"));
1487            }
1488            let t = match &vals[0] {
1489                Value::Time(t) => *t,
1490                v => return Err(anyhow!("fn:time:format: first arg must be time, got {v}")),
1491            };
1492            let precision = match &vals[1] {
1493                Value::String(s) => s.as_str(),
1494                v => return Err(anyhow!("fn:time:format: second arg must be name, got {v}")),
1495            };
1496            Ok(Value::String(format_time_with_precision(t, precision)?))
1497        }
1498        "fn:time:format_civil" => {
1499            if vals.len() != 3 {
1500                return Err(anyhow!("fn:time:format_civil: requires 3 arguments (time, timezone, precision)"));
1501            }
1502            let t = match &vals[0] {
1503                Value::Time(t) => *t,
1504                v => return Err(anyhow!("fn:time:format_civil: first arg must be time, got {v}")),
1505            };
1506            let tz = match &vals[1] {
1507                Value::String(s) => s.as_str(),
1508                v => return Err(anyhow!("fn:time:format_civil: second arg must be string, got {v}")),
1509            };
1510            let precision = match &vals[2] {
1511                Value::String(s) => s.as_str(),
1512                v => return Err(anyhow!("fn:time:format_civil: third arg must be name, got {v}")),
1513            };
1514            let offset = parse_timezone_offset(tz)?;
1515            let adjusted = t + offset * 1_000_000_000;
1516            let formatted = format_time_with_precision(adjusted, precision)?;
1517            Ok(Value::String(formatted))
1518        }
1519        "fn:time:parse_rfc3339" => {
1520            if vals.len() != 1 {
1521                return Err(anyhow!("fn:time:parse_rfc3339: requires 1 argument"));
1522            }
1523            match &vals[0] {
1524                Value::String(s) => {
1525                    let nanos = parse_rfc3339_to_nanos(s)?;
1526                    Ok(Value::Time(nanos))
1527                }
1528                v => Err(anyhow!("fn:time:parse_rfc3339: expected string, got {v}")),
1529            }
1530        }
1531        "fn:time:parse_civil" => {
1532            if vals.len() != 2 {
1533                return Err(anyhow!("fn:time:parse_civil: requires 2 arguments (string, timezone)"));
1534            }
1535            let s = match &vals[0] {
1536                Value::String(s) => s.as_str(),
1537                v => return Err(anyhow!("fn:time:parse_civil: first arg must be string, got {v}")),
1538            };
1539            let tz = match &vals[1] {
1540                Value::String(s) => s.as_str(),
1541                v => return Err(anyhow!("fn:time:parse_civil: second arg must be string, got {v}")),
1542            };
1543            let offset = parse_timezone_offset(tz)?;
1544            let nanos = parse_civil_datetime_to_nanos(s)?;
1545            // Subtract offset to convert local time to UTC
1546            Ok(Value::Time(nanos - offset * 1_000_000_000))
1547        }
1548
1549        // --- Duration functions ---
1550        "fn:duration:add" => {
1551            if vals.len() != 2 {
1552                return Err(anyhow!("fn:duration:add: requires 2 arguments"));
1553            }
1554            match (&vals[0], &vals[1]) {
1555                (Value::Duration(a), Value::Duration(b)) => Ok(Value::Duration(a + b)),
1556                _ => Err(anyhow!("fn:duration:add: expected (duration, duration)")),
1557            }
1558        }
1559        "fn:duration:mult" => {
1560            if vals.len() != 2 {
1561                return Err(anyhow!("fn:duration:mult: requires 2 arguments"));
1562            }
1563            match (&vals[0], &vals[1]) {
1564                (Value::Duration(d), Value::Number(n)) => Ok(Value::Duration(d * n)),
1565                (Value::Number(n), Value::Duration(d)) => Ok(Value::Duration(n * d)),
1566                _ => Err(anyhow!("fn:duration:mult: expected (duration, number) or (number, duration)")),
1567            }
1568        }
1569        "fn:duration:hours" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 60.0 * 1_000_000_000.0)),
1570        "fn:duration:minutes" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 1_000_000_000.0)),
1571        "fn:duration:seconds" => duration_component_float(vals, |nanos| nanos as f64 / 1_000_000_000.0),
1572        "fn:duration:nanos" => duration_component_int(vals, |nanos| nanos),
1573        "fn:duration:from_nanos" => {
1574            if vals.len() != 1 {
1575                return Err(anyhow!("fn:duration:from_nanos: requires 1 argument"));
1576            }
1577            match &vals[0] {
1578                Value::Number(n) => Ok(Value::Duration(*n)),
1579                v => Err(anyhow!("fn:duration:from_nanos: expected number, got {v}")),
1580            }
1581        }
1582        "fn:duration:from_hours" => duration_from(vals, "hours", 60 * 60 * 1_000_000_000),
1583        "fn:duration:from_minutes" => duration_from(vals, "minutes", 60 * 1_000_000_000),
1584        "fn:duration:from_seconds" => duration_from(vals, "seconds", 1_000_000_000),
1585        "fn:duration:parse" => {
1586            if vals.len() != 1 {
1587                return Err(anyhow!("fn:duration:parse: requires 1 argument"));
1588            }
1589            match &vals[0] {
1590                Value::String(s) => {
1591                    let nanos = parse_duration_string(s)?;
1592                    Ok(Value::Duration(nanos))
1593                }
1594                v => Err(anyhow!("fn:duration:parse: expected string, got {v}")),
1595            }
1596        }
1597
1598        // --- Compound type constructors ---
1599        "fn:list" => Ok(Value::Compound(CompoundKind::List, vals.to_vec())),
1600        "fn:pair" => {
1601            if vals.len() != 2 {
1602                return Err(anyhow!("fn:pair: requires exactly 2 arguments"));
1603            }
1604            Ok(Value::Compound(CompoundKind::Pair, vals.to_vec()))
1605        }
1606        "fn:struct" => {
1607            // Args are interleaved: field_name, value, field_name, value, ...
1608            if vals.len() % 2 != 0 {
1609                return Err(anyhow!(
1610                    "fn:struct: requires even number of arguments (field, value pairs)"
1611                ));
1612            }
1613            Ok(Value::Compound(CompoundKind::Struct, vals.to_vec()))
1614        }
1615        "fn:map" => {
1616            // Args are interleaved: key, value, key, value, ...
1617            if vals.len() % 2 != 0 {
1618                return Err(anyhow!(
1619                    "fn:map: requires even number of arguments (key, value pairs)"
1620                ));
1621            }
1622            Ok(Value::Compound(CompoundKind::Map, vals.to_vec()))
1623        }
1624
1625        // --- Compound type accessors ---
1626        "fn:list:get" => {
1627            if vals.len() != 2 {
1628                return Err(anyhow!("fn:list:get: requires 2 arguments (list, index)"));
1629            }
1630            match (&vals[0], &vals[1]) {
1631                (Value::Compound(_, elems), Value::Number(idx)) => {
1632                    let i = *idx as usize;
1633                    elems
1634                        .get(i)
1635                        .cloned()
1636                        .ok_or_else(|| anyhow!("fn:list:get: index {i} out of bounds (len {})", elems.len()))
1637                }
1638                _ => Err(anyhow!("fn:list:get: expected (compound, number)")),
1639            }
1640        }
1641        "fn:list:len" | "fn:len" => {
1642            if vals.len() != 1 {
1643                return Err(anyhow!("fn:len: requires 1 argument"));
1644            }
1645            match &vals[0] {
1646                Value::Compound(_, elems) => Ok(Value::Number(elems.len() as i64)),
1647                _ => Err(anyhow!("fn:len: expected compound value")),
1648            }
1649        }
1650        "fn:pair:first" => {
1651            if vals.len() != 1 {
1652                return Err(anyhow!("fn:pair:first: requires 1 argument"));
1653            }
1654            match &vals[0] {
1655                Value::Compound(_, elems) if elems.len() >= 1 => Ok(elems[0].clone()),
1656                _ => Err(anyhow!("fn:pair:first: expected compound with at least 1 element")),
1657            }
1658        }
1659        "fn:pair:second" => {
1660            if vals.len() != 1 {
1661                return Err(anyhow!("fn:pair:second: requires 1 argument"));
1662            }
1663            match &vals[0] {
1664                Value::Compound(_, elems) if elems.len() >= 2 => Ok(elems[1].clone()),
1665                _ => Err(anyhow!("fn:pair:second: expected compound with at least 2 elements")),
1666            }
1667        }
1668        "fn:struct:get" | "fn:map:get" => {
1669            if vals.len() != 2 {
1670                return Err(anyhow!("{fn_name}: requires 2 arguments (compound, key)"));
1671            }
1672            match &vals[0] {
1673                Value::Compound(_, elems) => {
1674                    // Interleaved key-value pairs: [k1, v1, k2, v2, ...]
1675                    for pair in elems.chunks_exact(2) {
1676                        if pair[0] == vals[1] {
1677                            return Ok(pair[1].clone());
1678                        }
1679                    }
1680                    Err(anyhow!("{fn_name}: key not found"))
1681                }
1682                _ => Err(anyhow!("{fn_name}: expected compound value")),
1683            }
1684        }
1685        "fn:map:len" | "fn:struct:len" => {
1686            if vals.len() != 1 {
1687                return Err(anyhow!("{fn_name}: requires 1 argument"));
1688            }
1689            match &vals[0] {
1690                Value::Compound(_, elems) => Ok(Value::Number((elems.len() / 2) as i64)),
1691                _ => Err(anyhow!("{fn_name}: expected compound value")),
1692            }
1693        }
1694        "fn:map:keys" => {
1695            if vals.len() != 1 {
1696                return Err(anyhow!("fn:map:keys: requires 1 argument"));
1697            }
1698            match &vals[0] {
1699                Value::Compound(_, elems) => {
1700                    let keys: Vec<Value> = elems.chunks_exact(2).map(|p| p[0].clone()).collect();
1701                    Ok(Value::Compound(CompoundKind::List, keys))
1702                }
1703                _ => Err(anyhow!("fn:map:keys: expected compound value")),
1704            }
1705        }
1706        "fn:map:values" | "fn:struct:values" => {
1707            if vals.len() != 1 {
1708                return Err(anyhow!("{fn_name}: requires 1 argument"));
1709            }
1710            match &vals[0] {
1711                Value::Compound(_, elems) => {
1712                    let values: Vec<Value> = elems.chunks_exact(2).map(|p| p[1].clone()).collect();
1713                    Ok(Value::Compound(CompoundKind::List, values))
1714                }
1715                _ => Err(anyhow!("{fn_name}: expected compound value")),
1716            }
1717        }
1718
1719        _ => Err(anyhow!("Unknown function: {fn_name}")),
1720    }
1721}
1722
1723fn time_component(vals: &[Value], extract: impl Fn(i64, i64) -> i64) -> Result<Value> {
1724    if vals.len() != 1 {
1725        return Err(anyhow!("time component function: requires 1 argument"));
1726    }
1727    match &vals[0] {
1728        Value::Time(nanos) => {
1729            let secs = nanos.div_euclid(1_000_000_000);
1730            let sub_nanos = nanos.rem_euclid(1_000_000_000);
1731            Ok(Value::Number(extract(secs, sub_nanos)))
1732        }
1733        v => Err(anyhow!("time component function: expected time, got {v}")),
1734    }
1735}
1736
1737fn civil_from_epoch_secs(secs: i64) -> (i32, u32, u32) {
1738    let days = secs.div_euclid(86400);
1739    let z = days + 719468;
1740    let era = (if z >= 0 { z } else { z - 146096 }) / 146097;
1741    let doe = (z - era * 146097) as u32;
1742    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1743    let y = yoe as i64 + era * 400;
1744    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1745    let mp = (5 * doy + 2) / 153;
1746    let d = doy - (153 * mp + 2) / 5 + 1;
1747    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1748    let y = if m <= 2 { y + 1 } else { y };
1749    (y as i32, m, d)
1750}
1751
1752fn duration_component_float(vals: &[Value], extract: impl Fn(i64) -> f64) -> Result<Value> {
1753    if vals.len() != 1 {
1754        return Err(anyhow!("duration component function: requires 1 argument"));
1755    }
1756    match &vals[0] {
1757        Value::Duration(nanos) => Ok(Value::Float(extract(*nanos))),
1758        v => Err(anyhow!("duration component function: expected duration, got {v}")),
1759    }
1760}
1761
1762fn duration_component_int(vals: &[Value], extract: impl Fn(i64) -> i64) -> Result<Value> {
1763    if vals.len() != 1 {
1764        return Err(anyhow!("duration component function: requires 1 argument"));
1765    }
1766    match &vals[0] {
1767        Value::Duration(nanos) => Ok(Value::Number(extract(*nanos))),
1768        v => Err(anyhow!("duration component function: expected duration, got {v}")),
1769    }
1770}
1771
1772fn duration_from(vals: &[Value], name: &str, multiplier: i64) -> Result<Value> {
1773    if vals.len() != 1 {
1774        return Err(anyhow!("fn:duration:from_{name}: requires 1 argument"));
1775    }
1776    match &vals[0] {
1777        Value::Number(n) => Ok(Value::Duration(n * multiplier)),
1778        v => Err(anyhow!("fn:duration:from_{name}: expected number, got {v}")),
1779    }
1780}
1781
1782/// Format a time (in UTC nanoseconds) with a given precision specifier.
1783fn format_time_with_precision(nanos: i64, precision: &str) -> Result<String> {
1784    let secs = nanos.div_euclid(1_000_000_000);
1785    let sub_nanos = nanos.rem_euclid(1_000_000_000);
1786    let (y, m, d) = civil_from_epoch_secs(secs);
1787    let time_of_day = secs.rem_euclid(86400);
1788    let hour = time_of_day / 3600;
1789    let minute = (time_of_day % 3600) / 60;
1790    let second = time_of_day % 60;
1791
1792    match precision {
1793        "/year" => Ok(format!("{y:04}")),
1794        "/month" => Ok(format!("{y:04}-{m:02}")),
1795        "/day" => Ok(format!("{y:04}-{m:02}-{d:02}")),
1796        "/hour" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}Z")),
1797        "/minute" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}Z")),
1798        "/second" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z")),
1799        "/millisecond" => {
1800            let ms = sub_nanos / 1_000_000;
1801            Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ms:03}Z"))
1802        }
1803        "/microsecond" => {
1804            let us = sub_nanos / 1_000;
1805            Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{us:06}Z"))
1806        }
1807        "/nanosecond" => {
1808            if sub_nanos == 0 {
1809                Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z"))
1810            } else {
1811                let ns_str = format!("{sub_nanos:09}");
1812                let ns_trimmed = ns_str.trim_end_matches('0');
1813                Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ns_trimmed}Z"))
1814            }
1815        }
1816        _ => Err(anyhow!("unknown time precision: {precision:?}")),
1817    }
1818}
1819
1820/// Parse a timezone string. Supports "UTC" and fixed offsets like "+05:30", "-08:00".
1821/// Returns offset in seconds from UTC.
1822fn parse_timezone_offset(tz: &str) -> Result<i64> {
1823    match tz {
1824        "UTC" => Ok(0),
1825        s if s.starts_with('+') || s.starts_with('-') => {
1826            let sign: i64 = if s.starts_with('-') { -1 } else { 1 };
1827            let rest = &s[1..];
1828            let parts: Vec<&str> = rest.split(':').collect();
1829            if parts.len() != 2 {
1830                return Err(anyhow!("invalid timezone offset: {tz:?}"));
1831            }
1832            let hours: i64 = parts[0].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1833            let minutes: i64 = parts[1].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1834            Ok(sign * (hours * 3600 + minutes * 60))
1835        }
1836        _ => Err(anyhow!("unsupported timezone: {tz:?} (use \"UTC\" or offset like \"+05:30\")")),
1837    }
1838}
1839
1840/// Parse an RFC3339-like timestamp string to nanoseconds since epoch.
1841fn parse_rfc3339_to_nanos(s: &str) -> Result<i64> {
1842    // Minimal RFC3339: "2006-01-02T15:04:05Z" or with fractional seconds
1843    if s.len() < 10 {
1844        return Err(anyhow!("fn:time:parse_rfc3339: string too short: {s:?}"));
1845    }
1846    let year: i64 = s[0..4].parse().map_err(|_| anyhow!("invalid year in {s:?}"))?;
1847    let month: u32 = s[5..7].parse().map_err(|_| anyhow!("invalid month in {s:?}"))?;
1848    let day: u32 = s[8..10].parse().map_err(|_| anyhow!("invalid day in {s:?}"))?;
1849
1850    let (hour, minute, second, frac_nanos) = if s.len() > 10 && s.as_bytes()[10] == b'T' {
1851        if s.len() < 19 {
1852            return Err(anyhow!("fn:time:parse_rfc3339: incomplete time in {s:?}"));
1853        }
1854        let h: u32 = s[11..13].parse().map_err(|_| anyhow!("invalid hour"))?;
1855        let min: u32 = s[14..16].parse().map_err(|_| anyhow!("invalid minute"))?;
1856        let sec: u32 = s[17..19].parse().map_err(|_| anyhow!("invalid second"))?;
1857
1858        let frac = if s.len() > 19 && s.as_bytes()[19] == b'.' {
1859            let end = s.len() - if s.ends_with('Z') { 1 } else { 0 };
1860            let frac_str = &s[20..end];
1861            let padded = format!("{frac_str:0<9}");
1862            padded[..9].parse::<i64>().unwrap_or(0)
1863        } else {
1864            0
1865        };
1866        (h, min, sec, frac)
1867    } else {
1868        (0, 0, 0, 0)
1869    };
1870
1871    let y = if month <= 2 { year - 1 } else { year };
1872    let era = (if y >= 0 { y } else { y - 399 }) / 400;
1873    let yoe = (y - era * 400) as u32;
1874    let m_adj = if month > 2 { month - 3 } else { month + 9 };
1875    let doy = (153 * m_adj + 2) / 5 + day - 1;
1876    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1877    let days = era * 146097 + doe as i64 - 719468;
1878
1879    let total_seconds = days * 86400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64;
1880    Ok(total_seconds * 1_000_000_000 + frac_nanos)
1881}
1882
1883/// Parse a civil datetime string like "2024-01-15T10:30:00" or "2024-01-15T10:30:00.000000000".
1884fn parse_civil_datetime_to_nanos(s: &str) -> Result<i64> {
1885    // Reuse RFC3339 parser — civil format is the same without 'Z' suffix
1886    parse_rfc3339_to_nanos(s)
1887}
1888
1889/// Parse a Go-style duration string like "1h30m", "500ms", "-2h45m30s", "1.5h".
1890fn parse_duration_string(s: &str) -> Result<i64> {
1891    if s.is_empty() {
1892        return Err(anyhow!("fn:duration:parse: empty string"));
1893    }
1894    if s == "0" || s == "0s" {
1895        return Ok(0);
1896    }
1897
1898    let (sign, mut rest) = if s.starts_with('-') {
1899        (-1i64, &s[1..])
1900    } else if s.starts_with('+') {
1901        (1i64, &s[1..])
1902    } else {
1903        (1i64, s)
1904    };
1905
1906    let mut total_nanos: i64 = 0;
1907
1908    while !rest.is_empty() {
1909        // Parse numeric part (integer or float)
1910        let num_end = rest
1911            .find(|c: char| !c.is_ascii_digit() && c != '.')
1912            .unwrap_or(rest.len());
1913        if num_end == 0 {
1914            return Err(anyhow!("fn:duration:parse: expected number in {s:?}"));
1915        }
1916        let num_str = &rest[..num_end];
1917        rest = &rest[num_end..];
1918
1919        // Parse unit suffix
1920        let (unit_nanos, unit_len) = if rest.starts_with("ns") {
1921            (1i64, 2)
1922        } else if rest.starts_with("us") || rest.starts_with("µs") {
1923            (1_000i64, if rest.starts_with("µ") { 3 } else { 2 })
1924        } else if rest.starts_with("ms") {
1925            (1_000_000i64, 2)
1926        } else if rest.starts_with('s') {
1927            (1_000_000_000i64, 1)
1928        } else if rest.starts_with('m') {
1929            (60 * 1_000_000_000i64, 1)
1930        } else if rest.starts_with('h') {
1931            (3600 * 1_000_000_000i64, 1)
1932        } else {
1933            return Err(anyhow!("fn:duration:parse: unknown unit in {s:?}"));
1934        };
1935        rest = &rest[unit_len..];
1936
1937        if num_str.contains('.') {
1938            let val: f64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
1939            total_nanos += (val * unit_nanos as f64) as i64;
1940        } else {
1941            let val: i64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
1942            total_nanos += val * unit_nanos;
1943        }
1944    }
1945
1946    Ok(sign * total_nanos)
1947}
1948
1949#[cfg(test)]
1950mod tests {
1951    use super::*;
1952
1953    #[test]
1954    fn test_retract_existing() {
1955        let mut store = MemStore::new();
1956        store.add_fact("r", vec![Value::Number(1), Value::Number(2)]);
1957        store.add_fact("r", vec![Value::Number(3), Value::Number(4)]);
1958
1959        let removed = store
1960            .retract("r", &[Value::Number(1), Value::Number(2)])
1961            .unwrap();
1962        assert!(removed);
1963
1964        let facts = store.get_facts("r");
1965        assert_eq!(facts.len(), 1);
1966        assert_eq!(facts[0], vec![Value::Number(3), Value::Number(4)]);
1967    }
1968
1969    #[test]
1970    fn test_retract_nonexistent() {
1971        let mut store = MemStore::new();
1972        store.add_fact("r", vec![Value::Number(1)]);
1973
1974        let removed = store.retract("r", &[Value::Number(99)]).unwrap();
1975        assert!(!removed);
1976
1977        let facts = store.get_facts("r");
1978        assert_eq!(facts.len(), 1);
1979        assert_eq!(facts[0], vec![Value::Number(1)]);
1980    }
1981
1982    #[test]
1983    fn test_clear() {
1984        let mut store = MemStore::new();
1985        store.add_fact("r", vec![Value::Number(1)]);
1986        store.add_fact("r", vec![Value::Number(2)]);
1987        store.add_fact("s", vec![Value::Number(10)]);
1988
1989        store.clear("r");
1990
1991        let r_facts = store.get_facts("r");
1992        assert!(r_facts.is_empty());
1993
1994        // "s" should be untouched
1995        let s_facts = store.get_facts("s");
1996        assert_eq!(s_facts.len(), 1);
1997    }
1998
1999    #[test]
2000    fn test_relation_names() {
2001        let mut store = MemStore::new();
2002        store.create_relation("alpha");
2003        store.create_relation("beta");
2004        store.add_fact("gamma", vec![Value::Number(1)]);
2005
2006        let mut names = store.relation_names();
2007        names.sort();
2008        assert_eq!(names, vec!["alpha", "beta", "gamma"]);
2009    }
2010
2011    #[test]
2012    fn test_provenance_recording() {
2013        use mangle_ir::physical::{DataSource, Operand};
2014
2015        // Build a minimal IR manually to test provenance
2016        let mut ir = mangle_ir::Ir::new();
2017        let base_name = ir.intern_name("base");
2018        let derived_name = ir.intern_name("derived");
2019        let var_x = ir.intern_name("X");
2020
2021        // Create an Op: Iterate(Scan("base", [X]), Insert("derived", [X]))
2022        let op = Op::Iterate {
2023            source: DataSource::Scan {
2024                relation: base_name,
2025                vars: vec![var_x],
2026            },
2027            body: Box::new(Op::Insert {
2028                relation: derived_name,
2029                args: vec![Operand::Var(var_x)],
2030            }),
2031        };
2032
2033        let mut store = Box::new(MemStore::new());
2034        store.add_fact("base", vec![Value::Number(10)]);
2035        store.add_fact("base", vec![Value::Number(20)]);
2036        store.create_relation("derived");
2037
2038        let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>).with_provenance();
2039
2040        let count = interpreter.execute(&op).unwrap();
2041        assert_eq!(count, 2);
2042
2043        // Check provenance was recorded
2044        let prov = interpreter.provenance.as_ref().unwrap();
2045        assert_eq!(prov.entries.len(), 2);
2046
2047        // Each derived fact should have one premise (from "base")
2048        for entry in &prov.entries {
2049            assert_eq!(entry.derived.0, "derived");
2050            assert_eq!(entry.premises.len(), 1);
2051            assert_eq!(entry.premises[0].0, "base");
2052        }
2053
2054        // Check the actual derived facts
2055        let mut derived_vals: Vec<i64> = prov
2056            .entries
2057            .iter()
2058            .map(|e| match &e.derived.1[0] {
2059                Value::Number(n) => *n,
2060                _ => panic!("expected number"),
2061            })
2062            .collect();
2063        derived_vals.sort();
2064        assert_eq!(derived_vals, vec![10, 20]);
2065    }
2066
2067    #[test]
2068    fn test_float_values() {
2069        use mangle_ir::physical::{self, DataSource, Expr, Operand};
2070
2071        let mut ir = mangle_ir::Ir::new();
2072        let temps = ir.intern_name("temps");
2073        let result = ir.intern_name("result");
2074        let var_x = ir.intern_name("X");
2075        // First, verify basic scan+insert of floats works
2076        let simple_op = Op::Iterate {
2077            source: DataSource::Scan {
2078                relation: temps,
2079                vars: vec![var_x],
2080            },
2081            body: Box::new(Op::Insert {
2082                relation: result,
2083                args: vec![Operand::Var(var_x)],
2084            }),
2085        };
2086
2087        let mut store = Box::new(MemStore::new());
2088        store.add_fact("temps", vec![Value::Float(36.5)]);
2089        store.add_fact("temps", vec![Value::Float(35.9)]);
2090        store.add_fact("temps", vec![Value::Float(37.2)]);
2091        store.create_relation("result");
2092
2093        let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>);
2094        let count = interpreter.execute(&simple_op).unwrap();
2095        assert_eq!(count, 3, "basic float scan+insert should produce 3 facts");
2096
2097        // Now test with filter and arithmetic
2098        let mut ir2 = mangle_ir::Ir::new();
2099        let temps2 = ir2.intern_name("temps");
2100        let result2 = ir2.intern_name("result2");
2101        let var_x2 = ir2.intern_name("X");
2102        let var_y2 = ir2.intern_name("Y");
2103        let fn_plus2 = ir2.intern_name("fn:float:plus");
2104
2105        let op = Op::Iterate {
2106            source: DataSource::Scan {
2107                relation: temps2,
2108                vars: vec![var_x2],
2109            },
2110            body: Box::new(Op::Filter {
2111                cond: Condition::Cmp {
2112                    op: physical::CmpOp::Gt,
2113                    left: Operand::Var(var_x2),
2114                    right: Operand::Const(physical::Constant::Float(36.0)),
2115                },
2116                body: Box::new(Op::Let {
2117                    var: var_y2,
2118                    expr: Expr::Call {
2119                        function: fn_plus2,
2120                        args: vec![
2121                            Operand::Var(var_x2),
2122                            Operand::Const(physical::Constant::Float(0.5)),
2123                        ],
2124                    },
2125                    body: Box::new(Op::Insert {
2126                        relation: result2,
2127                        args: vec![Operand::Var(var_x2), Operand::Var(var_y2)],
2128                    }),
2129                }),
2130            }),
2131        };
2132
2133        let mut store2 = Box::new(MemStore::new());
2134        store2.add_fact("temps", vec![Value::Float(36.5)]);
2135        store2.add_fact("temps", vec![Value::Float(35.9)]);
2136        store2.add_fact("temps", vec![Value::Float(37.2)]);
2137        store2.create_relation("result2");
2138
2139        let mut interpreter2 = Interpreter::new(&ir2, store2 as Box<dyn Store>);
2140        let count2 = interpreter2.execute(&op).unwrap();
2141
2142        // Only 36.5 and 37.2 are > 36.0
2143        assert_eq!(count2, 2);
2144
2145        // Results are in next_delta, check via scan_next_delta
2146        let results: Vec<_> = interpreter2
2147            .store()
2148            .scan_next_delta("result2")
2149            .unwrap()
2150            .collect();
2151        assert_eq!(results.len(), 2);
2152
2153        let mut output: Vec<(f64, f64)> = results
2154            .iter()
2155            .map(|t| match (&t[0], &t[1]) {
2156                (Value::Float(a), Value::Float(b)) => (*a, *b),
2157                _ => panic!("expected floats"),
2158            })
2159            .collect();
2160        output.sort_by(|a, b| a.0.total_cmp(&b.0));
2161
2162        assert_eq!(output[0], (36.5, 37.0));
2163        assert_eq!(output[1], (37.2, 37.7));
2164    }
2165
2166    #[test]
2167    fn test_float_in_memstore() {
2168        // Test that Float values work correctly as HashMap keys (equality, hashing)
2169        let mut store = MemStore::new();
2170        store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2171        store.add_fact("data", vec![Value::Float(2.5), Value::Number(20)]);
2172        // Duplicate should not be added
2173        store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2174
2175        let facts = store.get_facts("data");
2176        assert_eq!(facts.len(), 2);
2177
2178        // Test index lookup
2179        let results: Vec<_> = store
2180            .scan_index("data", 0, &Value::Float(1.5))
2181            .unwrap()
2182            .collect();
2183        assert_eq!(results.len(), 1);
2184        assert_eq!(results[0][1], Value::Number(10));
2185    }
2186
2187    // --- eval_function unit tests ---
2188
2189    #[test]
2190    fn test_fn_plus_variadic() {
2191        assert_eq!(
2192            eval_function("fn:plus", &[Value::Number(1), Value::Number(2), Value::Number(3)]).unwrap(),
2193            Value::Number(6)
2194        );
2195        // Zero args returns 0 (identity)
2196        assert_eq!(eval_function("fn:plus", &[]).unwrap(), Value::Number(0));
2197    }
2198
2199    #[test]
2200    fn test_fn_minus_variadic() {
2201        // Unary
2202        assert_eq!(
2203            eval_function("fn:minus", &[Value::Number(5)]).unwrap(),
2204            Value::Number(-5)
2205        );
2206        // Binary
2207        assert_eq!(
2208            eval_function("fn:minus", &[Value::Number(10), Value::Number(3)]).unwrap(),
2209            Value::Number(7)
2210        );
2211        // Variadic: 100 - 10 - 20 = 70
2212        assert_eq!(
2213            eval_function("fn:minus", &[Value::Number(100), Value::Number(10), Value::Number(20)]).unwrap(),
2214            Value::Number(70)
2215        );
2216        // Zero args is an error
2217        assert!(eval_function("fn:minus", &[]).is_err());
2218    }
2219
2220    #[test]
2221    fn test_fn_mult_variadic() {
2222        assert_eq!(
2223            eval_function("fn:mult", &[Value::Number(2), Value::Number(3), Value::Number(4)]).unwrap(),
2224            Value::Number(24)
2225        );
2226        // Zero args returns 1 (identity)
2227        assert_eq!(eval_function("fn:mult", &[]).unwrap(), Value::Number(1));
2228    }
2229
2230    #[test]
2231    fn test_fn_div() {
2232        // Binary
2233        assert_eq!(
2234            eval_function("fn:div", &[Value::Number(10), Value::Number(3)]).unwrap(),
2235            Value::Number(3)
2236        );
2237        // Unary: 1/n
2238        assert_eq!(
2239            eval_function("fn:div", &[Value::Number(5)]).unwrap(),
2240            Value::Number(0)
2241        );
2242        assert_eq!(
2243            eval_function("fn:div", &[Value::Number(1)]).unwrap(),
2244            Value::Number(1)
2245        );
2246        // Division by zero
2247        assert!(eval_function("fn:div", &[Value::Number(1), Value::Number(0)]).is_err());
2248        assert!(eval_function("fn:div", &[Value::Number(0)]).is_err());
2249    }
2250
2251    #[test]
2252    fn test_fn_float_promotion() {
2253        // fn:float:plus accepts both Float and Number
2254        assert_eq!(
2255            eval_function("fn:float:plus", &[Value::Float(1.5), Value::Number(2)]).unwrap(),
2256            Value::Float(3.5)
2257        );
2258        // fn:sqrt accepts Number
2259        assert_eq!(
2260            eval_function("fn:sqrt", &[Value::Number(9)]).unwrap(),
2261            Value::Float(3.0)
2262        );
2263    }
2264
2265    #[test]
2266    fn test_fn_string_concat() {
2267        assert_eq!(
2268            eval_function(
2269                "fn:string:concat",
2270                &[Value::String("a".into()), Value::String("b".into()), Value::String("c".into())]
2271            ).unwrap(),
2272            Value::String("abc".to_string())
2273        );
2274        // Mixed types
2275        assert_eq!(
2276            eval_function(
2277                "fn:string:concat",
2278                &[Value::String("n=".into()), Value::Number(42)]
2279            ).unwrap(),
2280            Value::String("n=42".to_string())
2281        );
2282    }
2283
2284    #[test]
2285    fn test_fn_string_replace() {
2286        // Replace all (-1)
2287        assert_eq!(
2288            eval_function(
2289                "fn:string:replace",
2290                &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(-1)]
2291            ).unwrap(),
2292            Value::String("a_b_c".to_string())
2293        );
2294        // Replace first only (1)
2295        assert_eq!(
2296            eval_function(
2297                "fn:string:replace",
2298                &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(1)]
2299            ).unwrap(),
2300            Value::String("a_b-c".to_string())
2301        );
2302    }
2303
2304    #[test]
2305    fn test_fn_to_string() {
2306        assert_eq!(
2307            eval_function("fn:number:to_string", &[Value::Number(42)]).unwrap(),
2308            Value::String("42".to_string())
2309        );
2310        assert_eq!(
2311            eval_function("fn:float64:to_string", &[Value::Float(3.14)]).unwrap(),
2312            Value::String("3.14".to_string())
2313        );
2314        assert_eq!(
2315            eval_function("fn:name:to_string", &[Value::Name("/role/admin".into())]).unwrap(),
2316            Value::String("/role/admin".to_string())
2317        );
2318    }
2319
2320    // -----------------------------------------------------------------------
2321    // Temporal coalescing tests (ported from Go factstore/temporal_test.go)
2322    // -----------------------------------------------------------------------
2323
2324    // Helper: create a time value in nanoseconds for a date
2325    fn date_nanos(year: i64, month: u32, day: u32) -> i64 {
2326        // Howard Hinnant's algorithm (matching the parser)
2327        let m = month;
2328        let y = if m <= 2 { year - 1 } else { year };
2329        let era = (if y >= 0 { y } else { y - 399 }) / 400;
2330        let yoe = (y - era * 400) as u32;
2331        let m_adj = if m > 2 { m - 3 } else { m + 9 };
2332        let doy = (153 * m_adj + 2) / 5 + day - 1;
2333        let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2334        let days = era * 146097 + doe as i64 - 719468;
2335        days * 86400 * 1_000_000_000
2336    }
2337
2338    fn datetime_nanos(year: i64, month: u32, day: u32, h: u32, m: u32, s: u32, ns: i64) -> i64 {
2339        date_nanos(year, month, day) + (h as i64) * 3_600_000_000_000
2340            + (m as i64) * 60_000_000_000 + (s as i64) * 1_000_000_000 + ns
2341    }
2342
2343    /// Go: TestTemporalStore_Coalesce - overlapping intervals merge into one
2344    #[test]
2345    fn test_coalesce_overlapping() {
2346        let mut store = MemStore::new();
2347        let jan1 = date_nanos(2024, 1, 1);
2348        let jan15 = date_nanos(2024, 1, 15);
2349        let jan10 = date_nanos(2024, 1, 10);
2350        let jan25 = date_nanos(2024, 1, 25);
2351        let jan20 = date_nanos(2024, 1, 20);
2352        let jan31 = date_nanos(2024, 1, 31);
2353
2354        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan1), Value::Time(jan15)]);
2355        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan10), Value::Time(jan25)]);
2356        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan20), Value::Time(jan31)]);
2357
2358        assert_eq!(store.get_facts("active").len(), 3);
2359
2360        store.coalesce_temporal("active");
2361
2362        let facts = store.get_facts("active");
2363        assert_eq!(facts.len(), 1, "after coalesce: expected 1, got {:?}", facts);
2364        assert_eq!(facts[0][1], Value::Time(jan1), "start should be Jan 1");
2365        assert_eq!(facts[0][2], Value::Time(jan31), "end should be Jan 31");
2366    }
2367
2368    /// Go: TestTemporalStore_CoalesceAdjacent - adjacent intervals (1ns gap) coalesce
2369    #[test]
2370    fn test_coalesce_adjacent() {
2371        let mut store = MemStore::new();
2372        let shift1_start = datetime_nanos(2024, 1, 1, 8, 0, 0, 0);
2373        let shift1_end = datetime_nanos(2024, 1, 1, 16, 0, 0, 0);
2374        let shift2_start = datetime_nanos(2024, 1, 1, 16, 0, 0, 1); // 1ns after
2375        let shift2_end = date_nanos(2024, 1, 2);
2376
2377        store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift1_start), Value::Time(shift1_end)]);
2378        store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift2_start), Value::Time(shift2_end)]);
2379
2380        store.coalesce_temporal("shift");
2381
2382        let facts = store.get_facts("shift");
2383        assert_eq!(facts.len(), 1, "adjacent intervals should coalesce");
2384        assert_eq!(facts[0][1], Value::Time(shift1_start));
2385        assert_eq!(facts[0][2], Value::Time(shift2_end));
2386    }
2387
2388    /// Go: TestTemporalStore_CoalesceNonOverlapping - non-overlapping intervals stay separate
2389    #[test]
2390    fn test_coalesce_non_overlapping() {
2391        let mut store = MemStore::new();
2392        let jan1 = date_nanos(2024, 1, 1);
2393        let jan7 = date_nanos(2024, 1, 7);
2394        let jun1 = date_nanos(2024, 6, 1);
2395        let jun14 = date_nanos(2024, 6, 14);
2396
2397        store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan7)]);
2398        store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jun1), Value::Time(jun14)]);
2399
2400        store.coalesce_temporal("vacation");
2401
2402        let facts = store.get_facts("vacation");
2403        assert_eq!(facts.len(), 2, "non-overlapping intervals should stay separate");
2404    }
2405
2406    /// Go: TestTemporalStore_CoalesceMixedGranularity - sub-second precision
2407    #[test]
2408    fn test_coalesce_mixed_granularity() {
2409        let mut store = MemStore::new();
2410        // Second-level: 10:00:00 to 10:00:05
2411        let t1_start = datetime_nanos(2024, 1, 1, 10, 0, 0, 0);
2412        let t1_end = datetime_nanos(2024, 1, 1, 10, 0, 5, 0);
2413        // Overlapping millisecond-level: 10:00:04.5 to 10:00:06
2414        let t2_start = datetime_nanos(2024, 1, 1, 10, 0, 4, 500_000_000);
2415        let t2_end = datetime_nanos(2024, 1, 1, 10, 0, 6, 0);
2416        // Adjacent nanosecond-level: 10:00:06.000000001 to 10:00:07
2417        let t3_start = datetime_nanos(2024, 1, 1, 10, 0, 6, 1);
2418        let t3_end = datetime_nanos(2024, 1, 1, 10, 0, 7, 0);
2419
2420        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t1_start), Value::Time(t1_end)]);
2421        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t2_start), Value::Time(t2_end)]);
2422        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t3_start), Value::Time(t3_end)]);
2423
2424        store.coalesce_temporal("event");
2425
2426        let facts = store.get_facts("event");
2427        assert_eq!(facts.len(), 1, "mixed granularity should coalesce to 1");
2428        assert_eq!(facts[0][1], Value::Time(t1_start), "start: 10:00:00");
2429        assert_eq!(facts[0][2], Value::Time(t3_end), "end: 10:00:07");
2430    }
2431
2432    /// Test coalescing with multiple keys: same relation, different key columns
2433    #[test]
2434    fn test_coalesce_multiple_keys() {
2435        let mut store = MemStore::new();
2436        let jan1 = date_nanos(2024, 1, 1);
2437        let jan10 = date_nanos(2024, 1, 10);
2438        let jan5 = date_nanos(2024, 1, 5);
2439        let jan15 = date_nanos(2024, 1, 15);
2440
2441        // Alice: two overlapping intervals
2442        store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan10)]);
2443        store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan5), Value::Time(jan15)]);
2444        // Bob: one interval
2445        store.add_fact("employed", vec![Value::String("/bob".into()), Value::Time(jan1), Value::Time(jan15)]);
2446
2447        store.coalesce_temporal("employed");
2448
2449        let facts = store.get_facts("employed");
2450        // Alice's 2 intervals merge to 1; Bob stays as 1
2451        assert_eq!(facts.len(), 2, "expected 2 facts after coalesce, got {:?}", facts);
2452    }
2453
2454    // --- HashJoin tests ---------------------------------------------------
2455    //
2456    // Each test hand-constructs an `Op::HashJoin` and compares its output to
2457    // the equivalent nested-loop plan (`Op::Iterate { body: Op::Iterate {..}
2458    // }` with an equality check). This isolates HashJoin correctness from
2459    // planner emission, which lives in a separate gated path.
2460
2461    /// Build `result(X, Y) :- a(X, Z), b(Z, Y).` as nested-loop (join-key
2462    /// check via Filter on Z) — the correctness baseline for HashJoin.
2463    fn nested_loop_two_way(
2464        ir: &mangle_ir::Ir,
2465        a: NameId,
2466        b: NameId,
2467        result: NameId,
2468        x: NameId,
2469        z: NameId,
2470        y: NameId,
2471        z_right: NameId,
2472    ) -> Op {
2473        use mangle_ir::physical::{CmpOp, Condition, DataSource, Operand};
2474        let _ = ir;
2475        Op::Iterate {
2476            source: DataSource::Scan {
2477                relation: a,
2478                vars: vec![x, z],
2479            },
2480            body: Box::new(Op::Iterate {
2481                source: DataSource::Scan {
2482                    relation: b,
2483                    vars: vec![z_right, y],
2484                },
2485                body: Box::new(Op::Filter {
2486                    cond: Condition::Cmp {
2487                        op: CmpOp::Eq,
2488                        left: Operand::Var(z),
2489                        right: Operand::Var(z_right),
2490                    },
2491                    body: Box::new(Op::Insert {
2492                        relation: result,
2493                        args: vec![Operand::Var(x), Operand::Var(y)],
2494                    }),
2495                }),
2496            }),
2497        }
2498    }
2499
2500    /// Same rule as `nested_loop_two_way` but with HashJoin.
2501    fn hash_join_two_way(
2502        a: NameId,
2503        b: NameId,
2504        result: NameId,
2505        x: NameId,
2506        z: NameId,
2507        y: NameId,
2508    ) -> Op {
2509        use mangle_ir::physical::{DataSource, Operand};
2510        Op::HashJoin {
2511            build_source: DataSource::Scan {
2512                relation: a,
2513                vars: vec![x, z],
2514            },
2515            probe_source: DataSource::Scan {
2516                relation: b,
2517                vars: vec![z, y],
2518            },
2519            join_keys: vec![z],
2520            body: Box::new(Op::Insert {
2521                relation: result,
2522                args: vec![Operand::Var(x), Operand::Var(y)],
2523            }),
2524        }
2525    }
2526
2527    fn run_plan(ir: &mangle_ir::Ir, facts: &[(&str, Vec<Value>)], op: &Op) -> Vec<Vec<Value>> {
2528        let mut store = Box::new(MemStore::new());
2529        for (rel, t) in facts {
2530            store.add_fact(rel, t.clone());
2531        }
2532        store.create_relation("result");
2533        let mut interp = Interpreter::new(ir, store as Box<dyn Store>);
2534        interp.execute(op).unwrap();
2535        // Inserts land in next_delta; promote twice so `scan()` surfaces them.
2536        let mut store = interp.into_store();
2537        store.merge_deltas();
2538        store.merge_deltas();
2539        store.scan("result").unwrap().collect()
2540    }
2541
2542    fn sorted(mut v: Vec<Vec<Value>>) -> Vec<Vec<Value>> {
2543        v.sort();
2544        v
2545    }
2546
2547    /// Build an IR with the names needed for the two-way-join tests.
2548    fn setup_two_way_ir()
2549    -> (mangle_ir::Ir, NameId, NameId, NameId, NameId, NameId, NameId, NameId) {
2550        let mut ir = mangle_ir::Ir::new();
2551        let a = ir.intern_name("a");
2552        let b = ir.intern_name("b");
2553        let result = ir.intern_name("result");
2554        let x = ir.intern_name("X");
2555        let z = ir.intern_name("Z");
2556        let y = ir.intern_name("Y");
2557        // A separate NameId for the nested-loop baseline's Z on the right side
2558        // — nested-loop needs a distinct var so the Filter can check equality.
2559        let z_right = ir.intern_name("Z_right");
2560        (ir, a, b, result, x, z, y, z_right)
2561    }
2562
2563    #[test]
2564    fn test_hashjoin_matches_nested_loop_basic() {
2565        let (ir, a, b, result, x, z, y, z_right) = setup_two_way_ir();
2566
2567        let facts: Vec<(&str, Vec<Value>)> = vec![
2568            ("a", vec![Value::Number(1), Value::Number(10)]),
2569            ("a", vec![Value::Number(2), Value::Number(20)]),
2570            ("a", vec![Value::Number(3), Value::Number(10)]),
2571            ("b", vec![Value::Number(10), Value::Number(100)]),
2572            ("b", vec![Value::Number(10), Value::Number(101)]),
2573            ("b", vec![Value::Number(20), Value::Number(200)]),
2574            ("b", vec![Value::Number(30), Value::Number(300)]),
2575        ];
2576
2577        let baseline = run_plan(
2578            &ir,
2579            &facts,
2580            &nested_loop_two_way(&ir, a, b, result, x, z, y, z_right),
2581        );
2582        let via_hash = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2583
2584        assert_eq!(
2585            sorted(baseline.clone()),
2586            sorted(via_hash.clone()),
2587            "HashJoin output must match nested-loop baseline"
2588        );
2589        // Sanity: we expect (1,100), (1,101), (2,200), (3,100), (3,101).
2590        assert_eq!(sorted(via_hash).len(), 5);
2591    }
2592
2593    #[test]
2594    fn test_hashjoin_empty_build() {
2595        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2596        let facts: Vec<(&str, Vec<Value>)> = vec![
2597            ("b", vec![Value::Number(10), Value::Number(100)]),
2598            ("b", vec![Value::Number(20), Value::Number(200)]),
2599        ];
2600        let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2601        assert!(out.is_empty());
2602    }
2603
2604    #[test]
2605    fn test_hashjoin_empty_probe() {
2606        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2607        let facts: Vec<(&str, Vec<Value>)> = vec![
2608            ("a", vec![Value::Number(1), Value::Number(10)]),
2609            ("a", vec![Value::Number(2), Value::Number(20)]),
2610        ];
2611        let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2612        assert!(out.is_empty());
2613    }
2614
2615    #[test]
2616    fn test_hashjoin_no_matches() {
2617        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2618        let facts: Vec<(&str, Vec<Value>)> = vec![
2619            ("a", vec![Value::Number(1), Value::Number(10)]),
2620            ("b", vec![Value::Number(99), Value::Number(200)]),
2621        ];
2622        let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2623        assert!(out.is_empty());
2624    }
2625
2626    #[test]
2627    fn test_hashjoin_value_variants_as_key() {
2628        // Use strings and names as join keys to exercise value hashing on
2629        // non-integer variants. `r(X, Y) :- a(X, K), b(K, Y).`
2630        let mut ir = mangle_ir::Ir::new();
2631        let a = ir.intern_name("a");
2632        let b = ir.intern_name("b");
2633        let result = ir.intern_name("result");
2634        let x = ir.intern_name("X");
2635        let k = ir.intern_name("K");
2636        let y = ir.intern_name("Y");
2637
2638        let facts: Vec<(&str, Vec<Value>)> = vec![
2639            ("a", vec![Value::Number(1), Value::String("hello".into())]),
2640            ("a", vec![Value::Number(2), Value::Name("/foo".into())]),
2641            ("b", vec![Value::String("hello".into()), Value::Number(100)]),
2642            ("b", vec![Value::Name("/foo".into()), Value::Number(200)]),
2643            // An entry that must NOT match — Name vs String are distinct.
2644            (
2645                "b",
2646                vec![Value::String("/foo".into()), Value::Number(999)],
2647            ),
2648        ];
2649        let op = hash_join_two_way(a, b, result, x, k, y);
2650        let out = sorted(run_plan(&ir, &facts, &op));
2651        assert_eq!(
2652            out,
2653            sorted(vec![
2654                vec![Value::Number(1), Value::Number(100)],
2655                vec![Value::Number(2), Value::Number(200)],
2656            ])
2657        );
2658    }
2659
2660    #[test]
2661    fn test_hashjoin_multi_key() {
2662        // `r(X, W) :- a(X, K1, K2), b(K1, K2, W).` — compound (two-variable)
2663        // join key.
2664        let mut ir = mangle_ir::Ir::new();
2665        let a = ir.intern_name("a");
2666        let b = ir.intern_name("b");
2667        let result = ir.intern_name("result");
2668        let x = ir.intern_name("X");
2669        let k1 = ir.intern_name("K1");
2670        let k2 = ir.intern_name("K2");
2671        let w = ir.intern_name("W");
2672
2673        let op = Op::HashJoin {
2674            build_source: DataSource::Scan {
2675                relation: a,
2676                vars: vec![x, k1, k2],
2677            },
2678            probe_source: DataSource::Scan {
2679                relation: b,
2680                vars: vec![k1, k2, w],
2681            },
2682            join_keys: vec![k1, k2],
2683            body: Box::new(Op::Insert {
2684                relation: result,
2685                args: vec![Operand::Var(x), Operand::Var(w)],
2686            }),
2687        };
2688
2689        let facts: Vec<(&str, Vec<Value>)> = vec![
2690            (
2691                "a",
2692                vec![Value::Number(1), Value::Number(10), Value::Number(100)],
2693            ),
2694            (
2695                "a",
2696                vec![Value::Number(2), Value::Number(10), Value::Number(200)],
2697            ),
2698            // Matching K1 but different K2 — must NOT join with (10, 100, ...).
2699            (
2700                "a",
2701                vec![Value::Number(3), Value::Number(10), Value::Number(999)],
2702            ),
2703            (
2704                "b",
2705                vec![Value::Number(10), Value::Number(100), Value::Number(1000)],
2706            ),
2707            (
2708                "b",
2709                vec![Value::Number(10), Value::Number(200), Value::Number(2000)],
2710            ),
2711        ];
2712
2713        let out = sorted(run_plan(&ir, &facts, &op));
2714        assert_eq!(
2715            out,
2716            sorted(vec![
2717                vec![Value::Number(1), Value::Number(1000)],
2718                vec![Value::Number(2), Value::Number(2000)],
2719            ])
2720        );
2721    }
2722
2723    #[test]
2724    fn test_hashjoin_duplicate_build_keys() {
2725        // Multiple build rows with the same key must all be emitted (one
2726        // probe tuple × N build matches = N results).
2727        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2728        let facts: Vec<(&str, Vec<Value>)> = vec![
2729            ("a", vec![Value::Number(1), Value::Number(10)]),
2730            ("a", vec![Value::Number(2), Value::Number(10)]),
2731            ("a", vec![Value::Number(3), Value::Number(10)]),
2732            ("b", vec![Value::Number(10), Value::Number(99)]),
2733        ];
2734        let op = hash_join_two_way(a, b, result, x, z, y);
2735        let out = sorted(run_plan(&ir, &facts, &op));
2736        assert_eq!(
2737            out,
2738            sorted(vec![
2739                vec![Value::Number(1), Value::Number(99)],
2740                vec![Value::Number(2), Value::Number(99)],
2741                vec![Value::Number(3), Value::Number(99)],
2742            ])
2743        );
2744    }
2745}