Skip to main content

mangle_interpreter/
lib.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Mangle Interpreter
16//!
17//! A pure Rust interpreter for the Mangle Intermediate Representation (IR).
18//!
19//! This crate enables the **Edge Mode** of execution, allowing Mangle programs
20//! to run on devices or in environments where a WebAssembly runtime is not available
21//! or desired.
22//!
23//! It executes the Physical IR operations (`Op`) directly.
24//!
25//! ## Usage
26//!
27//! See `mangle-driver` for the high-level API to compile and execute source code.
28
29use anyhow::{Result, anyhow};
30use mangle_ir::physical::{Aggregate, CmpOp, Condition, Constant, DataSource, Expr, Op, Operand};
31use mangle_ir::{Ir, NameId};
32use std::collections::HashMap;
33
34pub use mangle_common::{CompoundKind, Store, Value};
35
36/// Internal storage cell. Compound values are stored inline as a `CompoundStart`
37/// marker (holding the kind and element count) followed by that many cells
38/// (which may themselves be nested compounds). Scalar values are stored directly.
39#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
40enum Cell {
41    Val(Value),
42    /// Marks the start of a compound with `n` logical elements following.
43    CompoundStart(CompoundKind, usize),
44}
45
46/// Flatten a `Value` into a sequence of `Cell`s.
47/// Scalar values become a single `Cell::Val`. Compound values become
48/// `Cell::CompoundStart(n)` followed by the flattened cells of each element.
49fn flatten_value(v: &Value, out: &mut Vec<Cell>) {
50    match v {
51        Value::Compound(kind, elems) => {
52            out.push(Cell::CompoundStart(*kind, elems.len()));
53            for elem in elems {
54                flatten_value(elem, out);
55            }
56        }
57        _ => out.push(Cell::Val(v.clone())),
58    }
59}
60
61/// Flatten a tuple of Values into a flat Vec of Cells.
62fn flatten_tuple(tuple: &[Value]) -> Vec<Cell> {
63    let mut cells = Vec::new();
64    for v in tuple {
65        flatten_value(v, &mut cells);
66    }
67    cells
68}
69
70/// Read one logical Value from cells starting at `pos`, advancing `pos`.
71fn unflatten_one(cells: &[Cell], pos: &mut usize) -> Value {
72    match &cells[*pos] {
73        Cell::Val(v) => {
74            *pos += 1;
75            v.clone()
76        }
77        Cell::CompoundStart(kind, n) => {
78            let kind = *kind;
79            let n = *n;
80            *pos += 1;
81            let mut elems = Vec::with_capacity(n);
82            for _ in 0..n {
83                elems.push(unflatten_one(cells, pos));
84            }
85            Value::Compound(kind, elems)
86        }
87    }
88}
89
90/// Skip past one logical value in a cell slice, advancing `pos`.
91fn skip_one(cells: &[Cell], pos: &mut usize) {
92    match &cells[*pos] {
93        Cell::Val(_) => *pos += 1,
94        Cell::CompoundStart(_, n) => {
95            let n = *n;
96            *pos += 1;
97            for _ in 0..n {
98                skip_one(cells, pos);
99            }
100        }
101    }
102}
103
104/// Count the number of logical values represented by a flat cell array.
105fn count_logical_values(cells: &[Cell]) -> usize {
106    let mut count = 0;
107    let mut pos = 0;
108    while pos < cells.len() {
109        count += 1;
110        skip_one(cells, &mut pos);
111    }
112    count
113}
114
115/// Reconstruct a `Vec<Value>` tuple from flat cells, reading `n_cols` logical values.
116fn unflatten_tuple(cells: &[Cell], n_cols: usize) -> Vec<Value> {
117    let mut pos = 0;
118    let mut tuple = Vec::with_capacity(n_cols);
119    for _ in 0..n_cols {
120        tuple.push(unflatten_one(cells, &mut pos));
121    }
122    tuple
123}
124
125/// A simple in-memory implementation of `Store`.
126/// Supports semi-naive evaluation by tracking "stable" and "delta" facts.
127///
128/// Compound values are stored inline: each `Value::Compound` is flattened into
129/// a `CompoundStart(n)` marker followed by its elements. This avoids nested
130/// heap allocations in stored tuples and enables future per-field indexing.
131#[derive(Default)]
132pub struct MemStore {
133    // Stable facts from previous iterations
134    stable: HashMap<String, Vec<Vec<Cell>>>,
135    // New facts from the current iteration
136    delta: HashMap<String, Vec<Vec<Cell>>>,
137    // Facts being collected for the next iteration
138    next_delta: HashMap<String, Vec<Vec<Cell>>>,
139
140    // Number of logical columns per relation (set on first insert).
141    arity: HashMap<String, usize>,
142
143    // Secondary indexes: (relation_name, col_idx) -> { Cell -> [row_indices] }
144    // Indexes use the first Cell of each logical column (scalars: the value itself,
145    // compounds: CompoundStart(n)). This enables index lookups on scalar columns.
146    stable_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
147    delta_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
148}
149
150impl MemStore {
151    pub fn new() -> Self {
152        Self::default()
153    }
154
155    /// Registers a relation (creating it if absent) to allow scanning it.
156    pub fn create_relation(&mut self, relation: &str) {
157        self.stable.entry(relation.to_string()).or_default();
158    }
159
160    /// Add a fact manually (for testing/setup). Auto-creates relation in stable.
161    pub fn add_fact(&mut self, relation: &str, args: Vec<Value>) {
162        let n_cols = args.len();
163        let cells = flatten_tuple(&args);
164        let table = self.stable.entry(relation.to_string()).or_default();
165        if !table.contains(&cells) {
166            let row_idx = table.len();
167            self.arity.entry(relation.to_string()).or_insert(n_cols);
168            table.push(cells.clone());
169            index_cells(&mut self.stable_indexes, relation, &cells, n_cols, row_idx);
170        }
171    }
172
173    /// Rebuilds stable and delta indexes for a single relation after mutation.
174    fn rebuild_indexes_for(&mut self, relation: &str) {
175        self.stable_indexes.retain(|(rel, _), _| rel != relation);
176        self.delta_indexes.retain(|(rel, _), _| rel != relation);
177
178        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
179
180        if let Some(table) = self.stable.get(relation) {
181            for (row_idx, cells) in table.iter().enumerate() {
182                index_cells(&mut self.stable_indexes, relation, cells, n_cols, row_idx);
183            }
184        }
185
186        if let Some(table) = self.delta.get(relation) {
187            for (row_idx, cells) in table.iter().enumerate() {
188                index_cells(&mut self.delta_indexes, relation, cells, n_cols, row_idx);
189            }
190        }
191    }
192
193    /// Unflatten stored cells into Values using the relation's arity.
194    fn to_values(&self, relation: &str, cells: &[Cell]) -> Vec<Value> {
195        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
196        if n_cols == 0 {
197            // Fallback: treat each cell as a scalar
198            cells
199                .iter()
200                .map(|c| match c {
201                    Cell::Val(v) => v.clone(),
202                    Cell::CompoundStart(_, _) => Value::Null,
203                })
204                .collect()
205        } else {
206            unflatten_tuple(cells, n_cols)
207        }
208    }
209
210    pub fn get_facts(&self, relation: &str) -> Vec<Vec<Value>> {
211        let mut all: Vec<Vec<Value>> = self
212            .stable
213            .get(relation)
214            .into_iter()
215            .flatten()
216            .map(|cells| self.to_values(relation, cells))
217            .collect();
218        if let Some(d) = self.delta.get(relation) {
219            for cells in d {
220                all.push(self.to_values(relation, cells));
221            }
222        }
223        all
224    }
225
226    /// Coalesce temporal intervals for a relation.
227    /// Groups facts by their non-temporal columns (all except the last 2),
228    /// sorts intervals by start time, and merges overlapping/adjacent intervals.
229    /// This prevents interval explosion in recursive temporal rules.
230    pub fn coalesce_temporal(&mut self, relation: &str) {
231        let n_cols = match self.arity.get(relation) {
232            Some(&n) if n >= 2 => n,
233            _ => return,
234        };
235        let key_cols = n_cols - 2; // non-temporal columns
236
237        // Collect all facts (stable + delta) as Value tuples
238        let mut all_facts: Vec<Vec<Value>> = Vec::new();
239        if let Some(table) = self.stable.get(relation) {
240            for cells in table {
241                // Defensive: skip if logical column count doesn't match
242                if count_logical_values(cells) != n_cols {
243                    eprintln!("[mangle] coalesce_temporal: skipping malformed cells in '{relation}' (expected {n_cols} logical cols, got {})", count_logical_values(cells));
244                    continue;
245                }
246                all_facts.push(unflatten_tuple(cells, n_cols));
247            }
248        }
249        if let Some(table) = self.delta.get(relation) {
250            for cells in table {
251                if count_logical_values(cells) != n_cols {
252                    eprintln!("[mangle] coalesce_temporal: skipping malformed cells in delta '{relation}' (expected {n_cols} logical cols, got {})", count_logical_values(cells));
253                    continue;
254                }
255                all_facts.push(unflatten_tuple(cells, n_cols));
256            }
257        }
258
259        if all_facts.is_empty() {
260            return;
261        }
262
263        // Group by non-temporal key columns
264        let mut groups: HashMap<Vec<Value>, Vec<(i64, i64)>> = HashMap::new();
265        for fact in &all_facts {
266            let key: Vec<Value> = fact[..key_cols].to_vec();
267            let start = match &fact[key_cols] {
268                Value::Time(t) => *t,
269                Value::Number(n) => *n,
270                _ => continue,
271            };
272            let end = match &fact[key_cols + 1] {
273                Value::Time(t) => *t,
274                Value::Number(n) => *n,
275                _ => continue,
276            };
277            groups.entry(key).or_default().push((start, end));
278        }
279
280        // Coalesce intervals within each group
281        let mut coalesced_facts: Vec<Vec<Value>> = Vec::new();
282        for (key, mut intervals) in groups {
283            intervals.sort_by_key(|&(s, _)| s);
284            let mut merged: Vec<(i64, i64)> = vec![intervals[0]];
285            for &(s, e) in &intervals[1..] {
286                let last = merged.last_mut().unwrap();
287                // Merge if overlapping or adjacent (within 1 nanosecond)
288                if s <= last.1.saturating_add(1) {
289                    last.1 = last.1.max(e);
290                } else {
291                    merged.push((s, e));
292                }
293            }
294            for (start, end) in merged {
295                let mut fact = key.clone();
296                fact.push(Value::Time(start));
297                fact.push(Value::Time(end));
298                coalesced_facts.push(fact);
299            }
300        }
301
302        // Replace stable with coalesced facts, clear delta
303        let coalesced_cells: Vec<Vec<Cell>> = coalesced_facts
304            .iter()
305            .map(|fact| flatten_tuple(fact))
306            .collect();
307        self.stable.insert(relation.to_string(), coalesced_cells);
308        self.delta.remove(relation);
309        self.next_delta.remove(relation);
310        self.rebuild_indexes_for(relation);
311    }
312}
313
314/// Index a flattened tuple's logical columns.
315fn index_cells(
316    indexes: &mut HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
317    relation: &str,
318    cells: &[Cell],
319    n_cols: usize,
320    row_idx: usize,
321) {
322    let mut pos = 0;
323    for col_idx in 0..n_cols {
324        let key = cells[pos].clone();
325        skip_one(cells, &mut pos);
326        indexes
327            .entry((relation.to_string(), col_idx))
328            .or_default()
329            .entry(key)
330            .or_default()
331            .push(row_idx);
332    }
333}
334
335/// Convert a Value to its index Cell key (for index lookup matching).
336fn value_to_index_key(v: &Value) -> Cell {
337    match v {
338        Value::Compound(kind, elems) => Cell::CompoundStart(*kind, elems.len()),
339        _ => Cell::Val(v.clone()),
340    }
341}
342
343impl Store for MemStore {
344    fn scan(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
345        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
346        let s = self
347            .stable
348            .get(relation)
349            .into_iter()
350            .flatten()
351            .map(move |cells| unflatten_tuple(cells, n_cols));
352        let d = self
353            .delta
354            .get(relation)
355            .into_iter()
356            .flatten()
357            .map(move |cells| unflatten_tuple(cells, n_cols));
358        Ok(Box::new(s.chain(d)))
359    }
360
361    fn scan_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
362        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
363        match self.delta.get(relation) {
364            Some(tuples) => Ok(Box::new(
365                tuples
366                    .iter()
367                    .map(move |cells| unflatten_tuple(cells, n_cols)),
368            )),
369            None => Ok(Box::new(std::iter::empty())),
370        }
371    }
372
373    fn scan_next_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
374        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
375        match self.next_delta.get(relation) {
376            Some(tuples) => Ok(Box::new(
377                tuples
378                    .iter()
379                    .map(move |cells| unflatten_tuple(cells, n_cols)),
380            )),
381            None => Ok(Box::new(std::iter::empty())),
382        }
383    }
384
385    fn scan_index(
386        &self,
387        relation: &str,
388        col_idx: usize,
389        key: &Value,
390    ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
391        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
392        let cell_key = value_to_index_key(key);
393        let mut results: Vec<Vec<Value>> = Vec::new();
394
395        if let Some(idx_map) = self.stable_indexes.get(&(relation.to_string(), col_idx))
396            && let Some(row_indices) = idx_map.get(&cell_key)
397            && let Some(table) = self.stable.get(relation)
398        {
399            for &i in row_indices {
400                results.push(unflatten_tuple(&table[i], n_cols));
401            }
402        }
403
404        if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
405            && let Some(row_indices) = idx_map.get(&cell_key)
406            && let Some(table) = self.delta.get(relation)
407        {
408            for &i in row_indices {
409                results.push(unflatten_tuple(&table[i], n_cols));
410            }
411        }
412
413        Ok(Box::new(results.into_iter()))
414    }
415
416    fn scan_delta_index(
417        &self,
418        relation: &str,
419        col_idx: usize,
420        key: &Value,
421    ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
422        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
423        let cell_key = value_to_index_key(key);
424        let mut results: Vec<Vec<Value>> = Vec::new();
425
426        if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
427            && let Some(row_indices) = idx_map.get(&cell_key)
428            && let Some(table) = self.delta.get(relation)
429        {
430            for &i in row_indices {
431                results.push(unflatten_tuple(&table[i], n_cols));
432            }
433        }
434
435        Ok(Box::new(results.into_iter()))
436    }
437
438    fn insert(&mut self, relation: &str, tuple: Vec<Value>) -> Result<bool> {
439        let n_cols = tuple.len();
440        let cells = flatten_tuple(&tuple);
441
442        // Check if fact is already in stable, delta, or next_delta
443        if self
444            .stable
445            .get(relation)
446            .is_some_and(|v| v.contains(&cells))
447            || self
448                .delta
449                .get(relation)
450                .is_some_and(|v| v.contains(&cells))
451            || self
452                .next_delta
453                .get(relation)
454                .is_some_and(|v| v.contains(&cells))
455        {
456            return Ok(false);
457        }
458
459        // Debug: log arity registration / mismatch
460        let existing_arity = self.arity.get(relation).copied();
461        match existing_arity {
462            Some(reg) if reg != n_cols => {
463                eprintln!("[mangle] ARITY MISMATCH in relation '{relation}': registered arity={reg}, inserting tuple with {n_cols} cols, cells.len()={}. Skipping.", cells.len());
464                return Ok(false);
465            }
466            None => {
467                eprintln!("[mangle] Registering arity for relation '{relation}': {n_cols} cols");
468            }
469            _ => {}
470        }
471
472        self.arity.entry(relation.to_string()).or_insert(n_cols);
473        self.next_delta
474            .entry(relation.to_string())
475            .or_default()
476            .push(cells);
477        Ok(true)
478    }
479
480    fn merge_deltas(&mut self) {
481        // 1. Move current delta to stable
482        for (rel_name, mut tuples) in self.delta.drain() {
483            let n_cols = self.arity.get(&rel_name).copied().unwrap_or(0);
484            let table = self.stable.entry(rel_name.clone()).or_default();
485            for (i, cells) in tuples.drain(..).enumerate() {
486                // Defensive: skip tuples where logical arity doesn't match registered arity
487                if count_logical_values(&cells) != n_cols {
488                    eprintln!("[mangle] SKIP: relation '{rel_name}' arity={n_cols} but tuple[{i}] has {} logical cols. Skipping.", count_logical_values(&cells));
489                    continue;
490                }
491                let row_idx = table.len();
492                index_cells(
493                    &mut self.stable_indexes,
494                    &rel_name,
495                    &cells,
496                    n_cols,
497                    row_idx,
498                );
499                table.push(cells);
500            }
501        }
502        self.delta_indexes.clear();
503
504        // 2. Move next_delta to delta and build delta index
505        self.delta = std::mem::take(&mut self.next_delta);
506        for (rel_name, tuples) in &self.delta {
507            let n_cols = self.arity.get(rel_name).copied().unwrap_or(0);
508            for (row_idx, cells) in tuples.iter().enumerate() {
509                // Defensive: skip tuples where logical arity doesn't match registered arity
510                if count_logical_values(cells) != n_cols {
511                    eprintln!("[mangle] SKIP: relation '{rel_name}' arity={n_cols} but delta tuple[{row_idx}] has {} logical cols. Skipping.", count_logical_values(cells));
512                    continue;
513                }
514                index_cells(
515                    &mut self.delta_indexes,
516                    rel_name,
517                    cells,
518                    n_cols,
519                    row_idx,
520                );
521            }
522        }
523    }
524
525    fn create_relation(&mut self, relation: &str) {
526        self.stable.entry(relation.to_string()).or_default();
527    }
528
529    fn retract(&mut self, relation: &str, tuple: &[Value]) -> Result<bool> {
530        let cells = flatten_tuple(tuple);
531        let removed = if let Some(table) = self.stable.get_mut(relation) {
532            if let Some(pos) = table.iter().position(|t| *t == cells) {
533                table.swap_remove(pos);
534                true
535            } else {
536                false
537            }
538        } else {
539            false
540        };
541
542        // Also remove from delta and next_delta
543        if let Some(table) = self.delta.get_mut(relation) {
544            if let Some(pos) = table.iter().position(|t| *t == cells) {
545                table.swap_remove(pos);
546            }
547        }
548        if let Some(table) = self.next_delta.get_mut(relation) {
549            if let Some(pos) = table.iter().position(|t| *t == cells) {
550                table.swap_remove(pos);
551            }
552        }
553
554        if removed {
555            self.rebuild_indexes_for(relation);
556        }
557        Ok(removed)
558    }
559
560    fn clear(&mut self, relation: &str) {
561        if let Some(table) = self.stable.get_mut(relation) {
562            table.clear();
563        }
564        if let Some(table) = self.delta.get_mut(relation) {
565            table.clear();
566        }
567        if let Some(table) = self.next_delta.get_mut(relation) {
568            table.clear();
569        }
570        self.stable_indexes.retain(|(rel, _), _| rel != relation);
571        self.delta_indexes.retain(|(rel, _), _| rel != relation);
572    }
573
574    fn relation_names(&self) -> Vec<String> {
575        let mut names: Vec<String> = self.stable.keys().cloned().collect();
576        for key in self.delta.keys() {
577            if !names.contains(key) {
578                names.push(key.clone());
579            }
580        }
581        names
582    }
583
584    fn coalesce_temporal(&mut self, relation: &str) {
585        MemStore::coalesce_temporal(self, relation);
586    }
587}
588
589/// A record of one derivation: which fact was derived and which premises were used.
590#[derive(Debug, Clone)]
591pub struct ProvenanceEntry {
592    /// The derived fact: (relation_name, tuple).
593    pub derived: (String, Vec<Value>),
594    /// The premise facts that contributed: (relation_name, tuple) for each.
595    pub premises: Vec<(String, Vec<Value>)>,
596}
597
598/// Lightweight recorder that captures derivation provenance during execution.
599///
600/// When enabled on the interpreter, every successful `Op::Insert` records
601/// which premise facts (from enclosing `Op::Iterate` scans) contributed to
602/// the derivation. When disabled (the default), there is zero overhead.
603#[derive(Default)]
604pub struct ProvenanceRecorder {
605    /// All recorded derivations.
606    pub entries: Vec<ProvenanceEntry>,
607    /// Stack of currently-active scan sources (pushed on Iterate, popped after).
608    active_premises: Vec<(String, Vec<Value>)>,
609}
610
611/// A pure Rust interpreter for Mangle IR.
612pub struct Interpreter<'a> {
613    ir: &'a Ir,
614    store: Box<dyn Store + 'a>,
615    provenance: Option<ProvenanceRecorder>,
616}
617
618struct Env {
619    vars: HashMap<NameId, Value>,
620}
621
622impl Env {
623    fn new() -> Self {
624        Self {
625            vars: HashMap::new(),
626        }
627    }
628}
629
630impl<'a> Interpreter<'a> {
631    pub fn new(ir: &'a Ir, store: Box<dyn Store + 'a>) -> Self {
632        Self {
633            ir,
634            store,
635            provenance: None,
636        }
637    }
638
639    /// Enable provenance recording. When set, every successful insert
640    /// records which premise facts were in the current environment.
641    pub fn with_provenance(mut self) -> Self {
642        self.provenance = Some(ProvenanceRecorder::default());
643        self
644    }
645
646    /// Helper to get the underlying store (e.g. to inspect results).
647    pub fn store(&self) -> &dyn Store {
648        &*self.store
649    }
650
651    /// Helper to get the underlying store mutably.
652    pub fn store_mut(&mut self) -> &mut dyn Store {
653        &mut *self.store
654    }
655
656    /// Consumes the interpreter and returns the underlying store.
657    pub fn into_store(self) -> Box<dyn Store + 'a> {
658        self.store
659    }
660
661    /// Consume the interpreter, returning the provenance recorder if enabled.
662    pub fn into_provenance(self) -> Option<ProvenanceRecorder> {
663        self.provenance
664    }
665
666    /// Borrow the provenance recorder without consuming the
667    /// interpreter. Returns `None` when provenance recording wasn't
668    /// enabled via [`with_provenance`].
669    ///
670    /// [`with_provenance`]: Self::with_provenance
671    pub fn provenance(&self) -> Option<&ProvenanceRecorder> {
672        self.provenance.as_ref()
673    }
674
675    /// Consume the interpreter, returning the store and optional provenance.
676    pub fn into_parts(self) -> (Box<dyn Store + 'a>, Option<ProvenanceRecorder>) {
677        (self.store, self.provenance)
678    }
679
680    /// Executes the operation and returns the number of facts inserted.
681    pub fn execute(&mut self, op: &Op) -> Result<usize> {
682        let mut env = Env::new();
683        self.exec_op(op, &mut env)
684    }
685
686    fn exec_op(&mut self, op: &Op, env: &mut Env) -> Result<usize> {
687        match op {
688            Op::Nop => Ok(0),
689            Op::Seq(ops) => {
690                let mut count = 0;
691                for o in ops {
692                    count += self.exec_op(o, env)?;
693                }
694                Ok(count)
695            }
696            Op::Iterate { source, body } => {
697                let mut count = 0;
698                match source {
699                    DataSource::Scan { relation, vars } => {
700                        let rel_name = self.ir.resolve_name(*relation);
701                        let iter = self.store.scan(rel_name)?;
702                        let tuples: Vec<_> = iter.collect();
703
704                        for tuple in tuples {
705                            if tuple.len() != vars.len() {
706                                continue;
707                            }
708                            for (i, var) in vars.iter().enumerate() {
709                                env.vars.insert(*var, tuple[i].clone());
710                            }
711                            if let Some(ref mut prov) = self.provenance {
712                                prov.active_premises
713                                    .push((rel_name.to_string(), tuple.clone()));
714                            }
715                            count += self.exec_op(body, env)?;
716                            if self.provenance.is_some() {
717                                self.provenance.as_mut().unwrap().active_premises.pop();
718                            }
719                        }
720                    }
721                    DataSource::ScanDelta { relation, vars } => {
722                        let rel_name = self.ir.resolve_name(*relation);
723                        let iter = self.store.scan_delta(rel_name)?;
724                        let tuples: Vec<_> = iter.collect();
725
726                        for tuple in tuples {
727                            if tuple.len() != vars.len() {
728                                continue;
729                            }
730                            for (i, var) in vars.iter().enumerate() {
731                                env.vars.insert(*var, tuple[i].clone());
732                            }
733                            if let Some(ref mut prov) = self.provenance {
734                                prov.active_premises
735                                    .push((rel_name.to_string(), tuple.clone()));
736                            }
737                            count += self.exec_op(body, env)?;
738                            if self.provenance.is_some() {
739                                self.provenance.as_mut().unwrap().active_premises.pop();
740                            }
741                        }
742                    }
743                    DataSource::IndexLookup {
744                        relation,
745                        col_idx,
746                        key,
747                        vars,
748                    } => {
749                        let rel_name = self.ir.resolve_name(*relation);
750                        let key_val = self.eval_operand(key, env)?;
751
752                        let iter = self.store.scan_index(rel_name, *col_idx, &key_val)?;
753                        let tuples: Vec<_> = iter.collect();
754
755                        for tuple in tuples {
756                            if tuple.len() != vars.len() {
757                                continue;
758                            }
759                            for (i, var) in vars.iter().enumerate() {
760                                env.vars.insert(*var, tuple[i].clone());
761                            }
762                            if let Some(ref mut prov) = self.provenance {
763                                prov.active_premises
764                                    .push((rel_name.to_string(), tuple.clone()));
765                            }
766                            count += self.exec_op(body, env)?;
767                            if self.provenance.is_some() {
768                                self.provenance.as_mut().unwrap().active_premises.pop();
769                            }
770                        }
771                    }
772                }
773                Ok(count)
774            }
775            Op::Filter { cond, body } => {
776                if self.eval_cond(cond, env)? {
777                    self.exec_op(body, env)
778                } else {
779                    Ok(0)
780                }
781            }
782            Op::Insert { relation, args } => {
783                let rel_name = self.ir.resolve_name(*relation);
784                let mut tuple = Vec::new();
785                for arg in args {
786                    tuple.push(self.eval_operand(arg, env)?);
787                }
788                let is_new = self.store.insert(rel_name, tuple.clone())?;
789                if is_new {
790                    if let Some(ref mut prov) = self.provenance {
791                        prov.entries.push(ProvenanceEntry {
792                            derived: (rel_name.to_string(), tuple),
793                            premises: prov.active_premises.clone(),
794                        });
795                    }
796                    Ok(1)
797                } else {
798                    Ok(0)
799                }
800            }
801            Op::Let { var, expr, body } => {
802                let val = self.eval_expr(expr, env)?;
803                env.vars.insert(*var, val);
804                self.exec_op(body, env)
805            }
806            Op::HashJoin {
807                build_source,
808                probe_source,
809                join_keys,
810                body,
811            } => self.exec_hash_join(build_source, probe_source, join_keys, body, env),
812            Op::GroupBy {
813                source,
814                vars,
815                keys,
816                aggregates,
817                body,
818            } => {
819                let rel_name = self.ir.resolve_name(*source);
820
821                // For GroupBy, we must scan ALL available facts including ones just produced in this stratum
822                // if we want to match Go implementation's behavior for non-recursive strata.
823                let iter = self.store.scan(rel_name)?;
824                let mut tuples: Vec<_> = iter.collect();
825
826                // Also scan next_delta if it's the same relation
827                if let Ok(nd_iter) = self.store.scan_next_delta(rel_name) {
828                    tuples.extend(nd_iter);
829                }
830
831                let mut groups: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
832
833                for tuple in tuples {
834                    if tuple.len() != vars.len() {
835                        continue;
836                    }
837                    // Temporarily bind variables to extract key
838                    for (i, var) in vars.iter().enumerate() {
839                        env.vars.insert(*var, tuple[i].clone());
840                    }
841
842                    let mut key = Vec::new();
843                    for k in keys {
844                        if let Some(val) = env.vars.get(k) {
845                            key.push(val.clone());
846                        } else {
847                            // Should not happen if well-typed
848                            key.push(Value::Null);
849                        }
850                    }
851                    groups.entry(key).or_default().push(tuple);
852                }
853
854                let mut count = 0;
855                for (key, group_tuples) in groups {
856                    // Bind keys
857                    for (i, k) in keys.iter().enumerate() {
858                        env.vars.insert(*k, key[i].clone());
859                    }
860
861                    // Compute aggregates
862                    for agg in aggregates {
863                        let val = self.eval_aggregate(agg, &group_tuples, vars, env)?;
864                        env.vars.insert(agg.var, val);
865                    }
866
867                    count += self.exec_op(body, env)?;
868                }
869                Ok(count)
870            }
871        }
872    }
873
874    /// Iterate a `DataSource` and return all tuples along with the var list
875    /// the tuples bind. Used by `HashJoin` to materialize the build side and
876    /// to stream the probe side.
877    fn collect_data_source(
878        &mut self,
879        source: &DataSource,
880        env: &mut Env,
881    ) -> Result<(Vec<Vec<Value>>, Vec<NameId>)> {
882        match source {
883            DataSource::Scan { relation, vars } => {
884                let rel_name = self.ir.resolve_name(*relation);
885                let tuples: Vec<_> = self.store.scan(rel_name)?.collect();
886                Ok((tuples, vars.clone()))
887            }
888            DataSource::ScanDelta { relation, vars } => {
889                let rel_name = self.ir.resolve_name(*relation);
890                let tuples: Vec<_> = self.store.scan_delta(rel_name)?.collect();
891                Ok((tuples, vars.clone()))
892            }
893            DataSource::IndexLookup {
894                relation,
895                col_idx,
896                key,
897                vars,
898            } => {
899                let rel_name = self.ir.resolve_name(*relation);
900                let key_val = self.eval_operand(key, env)?;
901                let tuples: Vec<_> =
902                    self.store.scan_index(rel_name, *col_idx, &key_val)?.collect();
903                Ok((tuples, vars.clone()))
904            }
905        }
906    }
907
908    fn exec_hash_join(
909        &mut self,
910        build_source: &DataSource,
911        probe_source: &DataSource,
912        join_keys: &[NameId],
913        body: &Op,
914        env: &mut Env,
915    ) -> Result<usize> {
916        let (build_tuples, build_vars) = self.collect_data_source(build_source, env)?;
917
918        // Position of each join-key variable inside build_vars. If any
919        // join_key is not in build_vars, the plan is malformed.
920        let build_key_positions: Vec<usize> = join_keys
921            .iter()
922            .map(|k| {
923                build_vars
924                    .iter()
925                    .position(|v| v == k)
926                    .ok_or_else(|| anyhow!("HashJoin: join key not in build_source vars"))
927            })
928            .collect::<Result<Vec<_>>>()?;
929
930        let mut table: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
931        for tuple in build_tuples {
932            if tuple.len() != build_vars.len() {
933                continue;
934            }
935            let key: Vec<Value> = build_key_positions
936                .iter()
937                .map(|&i| tuple[i].clone())
938                .collect();
939            table.entry(key).or_default().push(tuple);
940        }
941
942        let (probe_tuples, probe_vars) = self.collect_data_source(probe_source, env)?;
943        let probe_key_positions: Vec<usize> = join_keys
944            .iter()
945            .map(|k| {
946                probe_vars
947                    .iter()
948                    .position(|v| v == k)
949                    .ok_or_else(|| anyhow!("HashJoin: join key not in probe_source vars"))
950            })
951            .collect::<Result<Vec<_>>>()?;
952
953        let mut count = 0;
954        for probe_tuple in probe_tuples {
955            if probe_tuple.len() != probe_vars.len() {
956                continue;
957            }
958            let key: Vec<Value> = probe_key_positions
959                .iter()
960                .map(|&i| probe_tuple[i].clone())
961                .collect();
962            let Some(matches) = table.get(&key) else {
963                continue;
964            };
965            // Bind probe-side vars once; build-side bindings cycle per match.
966            for (i, var) in probe_vars.iter().enumerate() {
967                env.vars.insert(*var, probe_tuple[i].clone());
968            }
969            for build_tuple in matches {
970                for (i, var) in build_vars.iter().enumerate() {
971                    env.vars.insert(*var, build_tuple[i].clone());
972                }
973                count += self.exec_op(body, env)?;
974            }
975        }
976        Ok(count)
977    }
978
979    fn eval_aggregate(
980        &self,
981        agg: &Aggregate,
982        group: &[Vec<Value>],
983        vars: &[NameId],
984        env: &mut Env,
985    ) -> Result<Value> {
986        let fn_name = self.ir.resolve_name(agg.func);
987        match fn_name {
988            "fn:count" => Ok(Value::Number(group.len() as i64)),
989            "fn:sum" => {
990                let arg = agg
991                    .args
992                    .first()
993                    .ok_or_else(|| anyhow!("fn:sum requires 1 argument"))?;
994
995                let mut int_sum: i64 = 0;
996                for tuple in group {
997                    for (i, var) in vars.iter().enumerate() {
998                        env.vars.insert(*var, tuple[i].clone());
999                    }
1000                    let val = self.eval_operand(arg, env)?;
1001                    match val {
1002                        Value::Number(n) => int_sum += n,
1003                        _ => return Err(anyhow!("fn:sum: expected integer, got {val}")),
1004                    }
1005                }
1006                Ok(Value::Number(int_sum))
1007            }
1008            "fn:float:sum" => {
1009                let arg = agg
1010                    .args
1011                    .first()
1012                    .ok_or_else(|| anyhow!("fn:float:sum requires 1 argument"))?;
1013
1014                let mut float_sum: f64 = 0.0;
1015                for tuple in group {
1016                    for (i, var) in vars.iter().enumerate() {
1017                        env.vars.insert(*var, tuple[i].clone());
1018                    }
1019                    let val = self.eval_operand(arg, env)?;
1020                    float_sum += value_as_float(&val)?;
1021                }
1022                Ok(Value::Float(float_sum))
1023            }
1024            "fn:max" => {
1025                let mut max_val = None;
1026                let arg = agg
1027                    .args
1028                    .first()
1029                    .ok_or_else(|| anyhow!("fn:max requires 1 argument"))?;
1030
1031                for tuple in group {
1032                    for (i, var) in vars.iter().enumerate() {
1033                        env.vars.insert(*var, tuple[i].clone());
1034                    }
1035                    let val = self.eval_operand(arg, env)?;
1036                    match max_val {
1037                        None => max_val = Some(val),
1038                        Some(ref m) => {
1039                            if val > *m {
1040                                max_val = Some(val);
1041                            }
1042                        }
1043                    }
1044                }
1045                max_val.ok_or_else(|| anyhow!("fn:max on empty group"))
1046            }
1047            "fn:min" => {
1048                let mut min_val = None;
1049                let arg = agg
1050                    .args
1051                    .first()
1052                    .ok_or_else(|| anyhow!("fn:min requires 1 argument"))?;
1053
1054                for tuple in group {
1055                    for (i, var) in vars.iter().enumerate() {
1056                        env.vars.insert(*var, tuple[i].clone());
1057                    }
1058                    let val = self.eval_operand(arg, env)?;
1059                    match min_val {
1060                        None => min_val = Some(val),
1061                        Some(ref m) => {
1062                            if val < *m {
1063                                min_val = Some(val);
1064                            }
1065                        }
1066                    }
1067                }
1068                min_val.ok_or_else(|| anyhow!("fn:min on empty group"))
1069            }
1070            "fn:float:max" => {
1071                let mut max_val: Option<f64> = None;
1072                let arg = agg
1073                    .args
1074                    .first()
1075                    .ok_or_else(|| anyhow!("fn:float:max requires 1 argument"))?;
1076
1077                for tuple in group {
1078                    for (i, var) in vars.iter().enumerate() {
1079                        env.vars.insert(*var, tuple[i].clone());
1080                    }
1081                    let val = self.eval_operand(arg, env)?;
1082                    let f = value_as_float(&val)?;
1083                    max_val = Some(match max_val {
1084                        None => f,
1085                        Some(m) => f.max(m),
1086                    });
1087                }
1088                max_val
1089                    .map(Value::Float)
1090                    .ok_or_else(|| anyhow!("fn:float:max on empty group"))
1091            }
1092            "fn:float:min" => {
1093                let mut min_val: Option<f64> = None;
1094                let arg = agg
1095                    .args
1096                    .first()
1097                    .ok_or_else(|| anyhow!("fn:float:min requires 1 argument"))?;
1098
1099                for tuple in group {
1100                    for (i, var) in vars.iter().enumerate() {
1101                        env.vars.insert(*var, tuple[i].clone());
1102                    }
1103                    let val = self.eval_operand(arg, env)?;
1104                    let f = value_as_float(&val)?;
1105                    min_val = Some(match min_val {
1106                        None => f,
1107                        Some(m) => f.min(m),
1108                    });
1109                }
1110                min_val
1111                    .map(Value::Float)
1112                    .ok_or_else(|| anyhow!("fn:float:min on empty group"))
1113            }
1114            "fn:collect" | "fn:collect_distinct" => {
1115                let arg = agg
1116                    .args
1117                    .first()
1118                    .ok_or_else(|| anyhow!("{fn_name}: requires 1 argument"))?;
1119                let mut out: Vec<Value> = Vec::with_capacity(group.len());
1120                for tuple in group {
1121                    for (i, var) in vars.iter().enumerate() {
1122                        env.vars.insert(*var, tuple[i].clone());
1123                    }
1124                    let val = self.eval_operand(arg, env)?;
1125                    if fn_name == "fn:collect_distinct" && out.contains(&val) {
1126                        continue;
1127                    }
1128                    out.push(val);
1129                }
1130                Ok(Value::Compound(CompoundKind::List, out))
1131            }
1132            _ => Err(anyhow!("Unknown aggregation function: {fn_name}")),
1133        }
1134    }
1135
1136    fn eval_cond(&self, cond: &Condition, env: &Env) -> Result<bool> {
1137        match cond {
1138            Condition::Cmp { op, left, right } => {
1139                let l = self.eval_operand(left, env)?;
1140                let r = self.eval_operand(right, env)?;
1141                match op {
1142                    CmpOp::Eq => Ok(l == r),
1143                    CmpOp::Neq => Ok(l != r),
1144                    CmpOp::Lt => Ok(l < r),
1145                    CmpOp::Le => Ok(l <= r),
1146                    CmpOp::Gt => Ok(l > r),
1147                    CmpOp::Ge => Ok(l >= r),
1148                }
1149            }
1150            Condition::Negation { relation, args } => {
1151                let rel_name = self.ir.resolve_name(*relation);
1152                let iter = self.store.scan(rel_name)?;
1153                for tuple in iter {
1154                    let mut mat = true;
1155                    if tuple.len() != args.len() {
1156                        continue;
1157                    }
1158                    for (i, arg) in args.iter().enumerate() {
1159                        let val = self.eval_operand(arg, env)?;
1160                        if tuple[i] != val {
1161                            mat = false;
1162                            break;
1163                        }
1164                    }
1165                    if mat {
1166                        return Ok(false); // Found match, negation fails
1167                    }
1168                }
1169                Ok(true) // No match found
1170            }
1171            Condition::Call { function, args } => {
1172                let fn_name = self.ir.resolve_name(*function);
1173                let mut vals = Vec::new();
1174                for arg in args {
1175                    vals.push(self.eval_operand(arg, env)?);
1176                }
1177                self.eval_builtin_predicate(fn_name, &vals)
1178            }
1179        }
1180    }
1181
1182    fn eval_builtin_predicate(&self, name: &str, vals: &[Value]) -> Result<bool> {
1183        match name {
1184            ":string:starts_with" => match (&vals[0], &vals[1]) {
1185                (Value::String(s), Value::String(p)) => Ok(s.starts_with(p.as_str())),
1186                _ => Err(anyhow!(":string:starts_with: expected string arguments")),
1187            },
1188            ":string:ends_with" => match (&vals[0], &vals[1]) {
1189                (Value::String(s), Value::String(p)) => Ok(s.ends_with(p.as_str())),
1190                _ => Err(anyhow!(":string:ends_with: expected string arguments")),
1191            },
1192            ":string:contains" => match (&vals[0], &vals[1]) {
1193                (Value::String(s), Value::String(p)) => Ok(s.contains(p.as_str())),
1194                _ => Err(anyhow!(":string:contains: expected string arguments")),
1195            },
1196            ":match_prefix" => match (&vals[0], &vals[1]) {
1197                (Value::Name(name), Value::Name(prefix)) => {
1198                    Ok(name.starts_with(prefix.as_str()) && name.len() > prefix.len())
1199                }
1200                _ => Err(anyhow!(":match_prefix: expected name arguments")),
1201            },
1202            _ => Err(anyhow!("Unknown built-in predicate: {name}")),
1203        }
1204    }
1205
1206    fn eval_expr(&self, expr: &Expr, env: &Env) -> Result<Value> {
1207        match expr {
1208            Expr::Value(op) => self.eval_operand(op, env),
1209            Expr::Call { function, args } => {
1210                let fn_name = self.ir.resolve_name(*function);
1211                let mut vals = Vec::new();
1212                for arg in args {
1213                    vals.push(self.eval_operand(arg, env)?);
1214                }
1215                eval_function(fn_name, &vals)
1216            }
1217        }
1218    }
1219
1220    fn eval_operand(&self, op: &Operand, env: &Env) -> Result<Value> {
1221        match op {
1222            Operand::Var(v) => env
1223                .vars
1224                .get(v)
1225                .cloned()
1226                .ok_or_else(|| anyhow!("Variable not found")),
1227            Operand::Const(c) => match c {
1228                Constant::Number(n) => Ok(Value::Number(*n)),
1229                Constant::Float(f) => Ok(Value::Float(*f)),
1230                Constant::String(sid) => {
1231                    Ok(Value::String(self.ir.resolve_string(*sid).to_string()))
1232                }
1233                Constant::Name(nid) => Ok(Value::Name(self.ir.resolve_name(*nid).to_string())),
1234                Constant::Time(t) => Ok(Value::Time(*t)),
1235                Constant::Duration(d) => Ok(Value::Duration(*d)),
1236            },
1237        }
1238    }
1239}
1240
1241// --- Helper functions ---
1242
1243fn value_as_float(v: &Value) -> Result<f64> {
1244    match v {
1245        Value::Float(f) => Ok(*f),
1246        Value::Number(n) => Ok(*n as f64),
1247        _ => Err(anyhow!("expected numeric value, got {v}")),
1248    }
1249}
1250
1251fn value_to_string(v: &Value) -> String {
1252    match v {
1253        Value::Number(n) => n.to_string(),
1254        Value::Float(f) => format!("{f}"),
1255        Value::String(s) => s.clone(),
1256        Value::Name(s) => s.clone(),
1257        Value::Time(t) => format!("{}", Value::Time(*t)),
1258        Value::Duration(d) => format!("{}", Value::Duration(*d)),
1259        Value::Compound(kind, elems) => format!("{}", Value::Compound(*kind, elems.clone())),
1260        Value::Null => "null".to_string(),
1261    }
1262}
1263
1264/// Evaluate a built-in function call.
1265pub fn eval_function(fn_name: &str, vals: &[Value]) -> Result<Value> {
1266    match fn_name {
1267        // --- Integer arithmetic (variadic) ---
1268        "fn:plus" => {
1269            let mut sum: i64 = 0;
1270            for v in vals {
1271                match v {
1272                    Value::Number(n) => sum += n,
1273                    _ => return Err(anyhow!("fn:plus: expected integer, got {v}")),
1274                }
1275            }
1276            Ok(Value::Number(sum))
1277        }
1278        "fn:minus" => {
1279            if vals.is_empty() {
1280                return Err(anyhow!("fn:minus: requires at least 1 argument"));
1281            }
1282            let first = match &vals[0] {
1283                Value::Number(n) => *n,
1284                v => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1285            };
1286            if vals.len() == 1 {
1287                return Ok(Value::Number(-first));
1288            }
1289            let mut result = first;
1290            for v in &vals[1..] {
1291                match v {
1292                    Value::Number(n) => result -= n,
1293                    _ => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1294                }
1295            }
1296            Ok(Value::Number(result))
1297        }
1298        "fn:mult" => {
1299            let mut product: i64 = 1;
1300            for v in vals {
1301                match v {
1302                    Value::Number(n) => product *= n,
1303                    _ => return Err(anyhow!("fn:mult: expected integer, got {v}")),
1304                }
1305            }
1306            Ok(Value::Number(product))
1307        }
1308        "fn:div" => {
1309            if vals.is_empty() {
1310                return Err(anyhow!("fn:div: requires at least 1 argument"));
1311            }
1312            let first = match &vals[0] {
1313                Value::Number(n) => *n,
1314                v => return Err(anyhow!("fn:div: expected integer, got {v}")),
1315            };
1316            if vals.len() == 1 {
1317                if first == 0 {
1318                    return Err(anyhow!("Division by zero in fn:div"));
1319                }
1320                return Ok(Value::Number(1 / first));
1321            }
1322            let mut result = first;
1323            for v in &vals[1..] {
1324                match v {
1325                    Value::Number(0) => return Err(anyhow!("Division by zero in fn:div")),
1326                    Value::Number(n) => {
1327                        result /= n;
1328                        if result == 0 {
1329                            return Ok(Value::Number(0));
1330                        }
1331                    }
1332                    _ => return Err(anyhow!("fn:div: expected integer, got {v}")),
1333                }
1334            }
1335            Ok(Value::Number(result))
1336        }
1337
1338        // --- Float arithmetic (variadic, accepts Number via promotion) ---
1339        "fn:float:plus" => {
1340            let mut sum: f64 = 0.0;
1341            for v in vals {
1342                sum += value_as_float(v)?;
1343            }
1344            Ok(Value::Float(sum))
1345        }
1346        "fn:float:minus" => {
1347            if vals.is_empty() {
1348                return Err(anyhow!("fn:float:minus: requires at least 1 argument"));
1349            }
1350            let first = value_as_float(&vals[0])?;
1351            if vals.len() == 1 {
1352                return Ok(Value::Float(-first));
1353            }
1354            let mut result = first;
1355            for v in &vals[1..] {
1356                result -= value_as_float(v)?;
1357            }
1358            Ok(Value::Float(result))
1359        }
1360        "fn:float:mult" => {
1361            let mut product: f64 = 1.0;
1362            for v in vals {
1363                product *= value_as_float(v)?;
1364            }
1365            Ok(Value::Float(product))
1366        }
1367        "fn:float:div" => {
1368            if vals.is_empty() {
1369                return Err(anyhow!("fn:float:div: requires at least 1 argument"));
1370            }
1371            let first = value_as_float(&vals[0])?;
1372            if vals.len() == 1 {
1373                if first == 0.0 {
1374                    return Err(anyhow!("Division by zero in fn:float:div"));
1375                }
1376                return Ok(Value::Float(1.0 / first));
1377            }
1378            let mut result = first;
1379            for v in &vals[1..] {
1380                let d = value_as_float(v)?;
1381                if d == 0.0 {
1382                    return Err(anyhow!("Division by zero in fn:float:div"));
1383                }
1384                result /= d;
1385            }
1386            Ok(Value::Float(result))
1387        }
1388        "fn:sqrt" => {
1389            if vals.len() != 1 {
1390                return Err(anyhow!("fn:sqrt: requires exactly 1 argument"));
1391            }
1392            let f = value_as_float(&vals[0])?;
1393            Ok(Value::Float(f.sqrt()))
1394        }
1395
1396        // --- String functions ---
1397        "fn:string:concat" => {
1398            let mut result = String::new();
1399            for v in vals {
1400                result.push_str(&value_to_string(v));
1401            }
1402            Ok(Value::String(result))
1403        }
1404        "fn:string:replace" => {
1405            if vals.len() != 4 {
1406                return Err(anyhow!("fn:string:replace: requires 4 arguments (string, old, new, count)"));
1407            }
1408            let s = match &vals[0] {
1409                Value::String(s) => s,
1410                v => return Err(anyhow!("fn:string:replace: first arg must be string, got {v}")),
1411            };
1412            let old = match &vals[1] {
1413                Value::String(s) => s,
1414                v => return Err(anyhow!("fn:string:replace: second arg must be string, got {v}")),
1415            };
1416            let new_s = match &vals[2] {
1417                Value::String(s) => s,
1418                v => return Err(anyhow!("fn:string:replace: third arg must be string, got {v}")),
1419            };
1420            let count = match &vals[3] {
1421                Value::Number(n) => *n,
1422                v => return Err(anyhow!("fn:string:replace: fourth arg must be number, got {v}")),
1423            };
1424            let result = if count < 0 {
1425                s.replace(old.as_str(), new_s.as_str())
1426            } else {
1427                s.replacen(old.as_str(), new_s.as_str(), count as usize)
1428            };
1429            Ok(Value::String(result))
1430        }
1431
1432        // --- Type conversion functions ---
1433        "fn:number:to_string" => {
1434            if vals.len() != 1 {
1435                return Err(anyhow!("fn:number:to_string: requires 1 argument"));
1436            }
1437            match &vals[0] {
1438                Value::Number(n) => Ok(Value::String(n.to_string())),
1439                v => Err(anyhow!("fn:number:to_string: expected number, got {v}")),
1440            }
1441        }
1442        "fn:float64:to_string" => {
1443            if vals.len() != 1 {
1444                return Err(anyhow!("fn:float64:to_string: requires 1 argument"));
1445            }
1446            match &vals[0] {
1447                Value::Float(f) => Ok(Value::String(format!("{f}"))),
1448                v => Err(anyhow!("fn:float64:to_string: expected float, got {v}")),
1449            }
1450        }
1451        "fn:name:to_string" => {
1452            if vals.len() != 1 {
1453                return Err(anyhow!("fn:name:to_string: requires 1 argument"));
1454            }
1455            match &vals[0] {
1456                Value::Name(s) => Ok(Value::String(s.clone())),
1457                v => Err(anyhow!("fn:name:to_string: expected name, got {v}")),
1458            }
1459        }
1460
1461        // --- Time functions ---
1462        "fn:time:now" => {
1463            if !vals.is_empty() {
1464                return Err(anyhow!("fn:time:now: takes no arguments"));
1465            }
1466            let nanos = std::time::SystemTime::now()
1467                .duration_since(std::time::UNIX_EPOCH)
1468                .map_err(|e| anyhow!("fn:time:now: {e}"))?
1469                .as_nanos() as i64;
1470            Ok(Value::Time(nanos))
1471        }
1472        "fn:time:add" => {
1473            if vals.len() != 2 {
1474                return Err(anyhow!("fn:time:add: requires 2 arguments (time, duration)"));
1475            }
1476            match (&vals[0], &vals[1]) {
1477                (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t + d)),
1478                _ => Err(anyhow!("fn:time:add: expected (time, duration), got ({}, {})", vals[0], vals[1])),
1479            }
1480        }
1481        "fn:time:sub" => {
1482            if vals.len() != 2 {
1483                return Err(anyhow!("fn:time:sub: requires 2 arguments"));
1484            }
1485            match (&vals[0], &vals[1]) {
1486                (Value::Time(t1), Value::Time(t2)) => Ok(Value::Duration(t1 - t2)),
1487                (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t - d)),
1488                _ => Err(anyhow!("fn:time:sub: expected (time, time) or (time, duration)")),
1489            }
1490        }
1491        "fn:time:year" => time_component(vals, |secs, _| {
1492            let (y, _, _) = civil_from_epoch_secs(secs);
1493            y as i64
1494        }),
1495        "fn:time:month" => time_component(vals, |secs, _| {
1496            let (_, m, _) = civil_from_epoch_secs(secs);
1497            m as i64
1498        }),
1499        "fn:time:day" => time_component(vals, |secs, _| {
1500            let (_, _, d) = civil_from_epoch_secs(secs);
1501            d as i64
1502        }),
1503        "fn:time:hour" => time_component(vals, |secs, _| {
1504            secs.rem_euclid(86400) / 3600
1505        }),
1506        "fn:time:minute" => time_component(vals, |secs, _| {
1507            (secs.rem_euclid(86400) % 3600) / 60
1508        }),
1509        "fn:time:second" => time_component(vals, |secs, _| {
1510            secs.rem_euclid(86400) % 60
1511        }),
1512        "fn:time:from_unix_nanos" => {
1513            if vals.len() != 1 {
1514                return Err(anyhow!("fn:time:from_unix_nanos: requires 1 argument"));
1515            }
1516            match &vals[0] {
1517                Value::Number(n) => Ok(Value::Time(*n)),
1518                v => Err(anyhow!("fn:time:from_unix_nanos: expected number, got {v}")),
1519            }
1520        }
1521        "fn:time:to_unix_nanos" => {
1522            if vals.len() != 1 {
1523                return Err(anyhow!("fn:time:to_unix_nanos: requires 1 argument"));
1524            }
1525            match &vals[0] {
1526                Value::Time(t) => Ok(Value::Number(*t)),
1527                v => Err(anyhow!("fn:time:to_unix_nanos: expected time, got {v}")),
1528            }
1529        }
1530        "fn:time:trunc" => {
1531            if vals.len() != 2 {
1532                return Err(anyhow!("fn:time:trunc: requires 2 arguments (time, unit_name)"));
1533            }
1534            let t = match &vals[0] {
1535                Value::Time(t) => *t,
1536                v => return Err(anyhow!("fn:time:trunc: first arg must be time, got {v}")),
1537            };
1538            let unit_name = match &vals[1] {
1539                Value::Name(s) => s.as_str(),
1540                v => return Err(anyhow!("fn:time:trunc: second arg must be name, got {v}")),
1541            };
1542            let d: i64 = match unit_name {
1543                "/nanosecond" => 1,
1544                "/microsecond" => 1_000,
1545                "/millisecond" => 1_000_000,
1546                "/second" => 1_000_000_000,
1547                "/minute" => 60 * 1_000_000_000,
1548                "/hour" => 3600 * 1_000_000_000,
1549                "/day" => 24 * 3600 * 1_000_000_000,
1550                _ => return Err(anyhow!("fn:time:trunc: unknown unit {unit_name:?}")),
1551            };
1552            Ok(Value::Time(t - t.rem_euclid(d)))
1553        }
1554        "fn:time:format" => {
1555            if vals.len() != 2 {
1556                return Err(anyhow!("fn:time:format: requires 2 arguments (time, precision)"));
1557            }
1558            let t = match &vals[0] {
1559                Value::Time(t) => *t,
1560                v => return Err(anyhow!("fn:time:format: first arg must be time, got {v}")),
1561            };
1562            let precision = match &vals[1] {
1563                Value::String(s) => s.as_str(),
1564                v => return Err(anyhow!("fn:time:format: second arg must be name, got {v}")),
1565            };
1566            Ok(Value::String(format_time_with_precision(t, precision)?))
1567        }
1568        "fn:time:format_civil" => {
1569            if vals.len() != 3 {
1570                return Err(anyhow!("fn:time:format_civil: requires 3 arguments (time, timezone, precision)"));
1571            }
1572            let t = match &vals[0] {
1573                Value::Time(t) => *t,
1574                v => return Err(anyhow!("fn:time:format_civil: first arg must be time, got {v}")),
1575            };
1576            let tz = match &vals[1] {
1577                Value::String(s) => s.as_str(),
1578                v => return Err(anyhow!("fn:time:format_civil: second arg must be string, got {v}")),
1579            };
1580            let precision = match &vals[2] {
1581                Value::String(s) => s.as_str(),
1582                v => return Err(anyhow!("fn:time:format_civil: third arg must be name, got {v}")),
1583            };
1584            let offset = parse_timezone_offset(tz)?;
1585            let adjusted = t + offset * 1_000_000_000;
1586            let formatted = format_time_with_precision(adjusted, precision)?;
1587            Ok(Value::String(formatted))
1588        }
1589        "fn:time:parse_rfc3339" => {
1590            if vals.len() != 1 {
1591                return Err(anyhow!("fn:time:parse_rfc3339: requires 1 argument"));
1592            }
1593            match &vals[0] {
1594                Value::String(s) => {
1595                    let nanos = parse_rfc3339_to_nanos(s)?;
1596                    Ok(Value::Time(nanos))
1597                }
1598                v => Err(anyhow!("fn:time:parse_rfc3339: expected string, got {v}")),
1599            }
1600        }
1601        "fn:time:parse_civil" => {
1602            if vals.len() != 2 {
1603                return Err(anyhow!("fn:time:parse_civil: requires 2 arguments (string, timezone)"));
1604            }
1605            let s = match &vals[0] {
1606                Value::String(s) => s.as_str(),
1607                v => return Err(anyhow!("fn:time:parse_civil: first arg must be string, got {v}")),
1608            };
1609            let tz = match &vals[1] {
1610                Value::String(s) => s.as_str(),
1611                v => return Err(anyhow!("fn:time:parse_civil: second arg must be string, got {v}")),
1612            };
1613            let offset = parse_timezone_offset(tz)?;
1614            let nanos = parse_civil_datetime_to_nanos(s)?;
1615            // Subtract offset to convert local time to UTC
1616            Ok(Value::Time(nanos - offset * 1_000_000_000))
1617        }
1618
1619        // --- Duration functions ---
1620        "fn:duration:add" => {
1621            if vals.len() != 2 {
1622                return Err(anyhow!("fn:duration:add: requires 2 arguments"));
1623            }
1624            match (&vals[0], &vals[1]) {
1625                (Value::Duration(a), Value::Duration(b)) => Ok(Value::Duration(a + b)),
1626                _ => Err(anyhow!("fn:duration:add: expected (duration, duration)")),
1627            }
1628        }
1629        "fn:duration:mult" => {
1630            if vals.len() != 2 {
1631                return Err(anyhow!("fn:duration:mult: requires 2 arguments"));
1632            }
1633            match (&vals[0], &vals[1]) {
1634                (Value::Duration(d), Value::Number(n)) => Ok(Value::Duration(d * n)),
1635                (Value::Number(n), Value::Duration(d)) => Ok(Value::Duration(n * d)),
1636                _ => Err(anyhow!("fn:duration:mult: expected (duration, number) or (number, duration)")),
1637            }
1638        }
1639        "fn:duration:hours" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 60.0 * 1_000_000_000.0)),
1640        "fn:duration:minutes" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 1_000_000_000.0)),
1641        "fn:duration:seconds" => duration_component_float(vals, |nanos| nanos as f64 / 1_000_000_000.0),
1642        "fn:duration:nanos" => duration_component_int(vals, |nanos| nanos),
1643        "fn:duration:from_nanos" => {
1644            if vals.len() != 1 {
1645                return Err(anyhow!("fn:duration:from_nanos: requires 1 argument"));
1646            }
1647            match &vals[0] {
1648                Value::Number(n) => Ok(Value::Duration(*n)),
1649                v => Err(anyhow!("fn:duration:from_nanos: expected number, got {v}")),
1650            }
1651        }
1652        "fn:duration:from_hours" => duration_from(vals, "hours", 60 * 60 * 1_000_000_000),
1653        "fn:duration:from_minutes" => duration_from(vals, "minutes", 60 * 1_000_000_000),
1654        "fn:duration:from_seconds" => duration_from(vals, "seconds", 1_000_000_000),
1655        "fn:duration:parse" => {
1656            if vals.len() != 1 {
1657                return Err(anyhow!("fn:duration:parse: requires 1 argument"));
1658            }
1659            match &vals[0] {
1660                Value::String(s) => {
1661                    let nanos = parse_duration_string(s)?;
1662                    Ok(Value::Duration(nanos))
1663                }
1664                v => Err(anyhow!("fn:duration:parse: expected string, got {v}")),
1665            }
1666        }
1667
1668        // --- Compound type constructors ---
1669        "fn:list" => Ok(Value::Compound(CompoundKind::List, vals.to_vec())),
1670        "fn:pair" => {
1671            if vals.len() != 2 {
1672                return Err(anyhow!("fn:pair: requires exactly 2 arguments"));
1673            }
1674            Ok(Value::Compound(CompoundKind::Pair, vals.to_vec()))
1675        }
1676        "fn:struct" => {
1677            // Args are interleaved: field_name, value, field_name, value, ...
1678            if vals.len() % 2 != 0 {
1679                return Err(anyhow!(
1680                    "fn:struct: requires even number of arguments (field, value pairs)"
1681                ));
1682            }
1683            Ok(Value::Compound(CompoundKind::Struct, vals.to_vec()))
1684        }
1685        "fn:map" => {
1686            // Args are interleaved: key, value, key, value, ...
1687            if vals.len() % 2 != 0 {
1688                return Err(anyhow!(
1689                    "fn:map: requires even number of arguments (key, value pairs)"
1690                ));
1691            }
1692            Ok(Value::Compound(CompoundKind::Map, vals.to_vec()))
1693        }
1694
1695        // --- Compound type accessors ---
1696        "fn:list:get" => {
1697            if vals.len() != 2 {
1698                return Err(anyhow!("fn:list:get: requires 2 arguments (list, index)"));
1699            }
1700            match (&vals[0], &vals[1]) {
1701                (Value::Compound(_, elems), Value::Number(idx)) => {
1702                    let i = *idx as usize;
1703                    elems
1704                        .get(i)
1705                        .cloned()
1706                        .ok_or_else(|| anyhow!("fn:list:get: index {i} out of bounds (len {})", elems.len()))
1707                }
1708                _ => Err(anyhow!("fn:list:get: expected (compound, number)")),
1709            }
1710        }
1711        "fn:list:append" => {
1712            if vals.len() != 2 {
1713                return Err(anyhow!("fn:list:append: requires 2 arguments (list, elem)"));
1714            }
1715            match &vals[0] {
1716                Value::Compound(CompoundKind::List, elems) => {
1717                    let mut out = elems.clone();
1718                    out.push(vals[1].clone());
1719                    Ok(Value::Compound(CompoundKind::List, out))
1720                }
1721                _ => Err(anyhow!("fn:list:append: expected list as first argument")),
1722            }
1723        }
1724        "fn:list:len" | "fn:len" => {
1725            if vals.len() != 1 {
1726                return Err(anyhow!("fn:len: requires 1 argument"));
1727            }
1728            match &vals[0] {
1729                Value::Compound(_, elems) => Ok(Value::Number(elems.len() as i64)),
1730                _ => Err(anyhow!("fn:len: expected compound value")),
1731            }
1732        }
1733        "fn:pair:first" => {
1734            if vals.len() != 1 {
1735                return Err(anyhow!("fn:pair:first: requires 1 argument"));
1736            }
1737            match &vals[0] {
1738                Value::Compound(_, elems) if elems.len() >= 1 => Ok(elems[0].clone()),
1739                _ => Err(anyhow!("fn:pair:first: expected compound with at least 1 element")),
1740            }
1741        }
1742        "fn:pair:second" => {
1743            if vals.len() != 1 {
1744                return Err(anyhow!("fn:pair:second: requires 1 argument"));
1745            }
1746            match &vals[0] {
1747                Value::Compound(_, elems) if elems.len() >= 2 => Ok(elems[1].clone()),
1748                _ => Err(anyhow!("fn:pair:second: expected compound with at least 2 elements")),
1749            }
1750        }
1751        "fn:struct:get" | "fn:map:get" => {
1752            if vals.len() != 2 {
1753                return Err(anyhow!("{fn_name}: requires 2 arguments (compound, key)"));
1754            }
1755            match &vals[0] {
1756                Value::Compound(_, elems) => {
1757                    // Interleaved key-value pairs: [k1, v1, k2, v2, ...]
1758                    for pair in elems.chunks_exact(2) {
1759                        if pair[0] == vals[1] {
1760                            return Ok(pair[1].clone());
1761                        }
1762                    }
1763                    Err(anyhow!("{fn_name}: key not found"))
1764                }
1765                _ => Err(anyhow!("{fn_name}: expected compound value")),
1766            }
1767        }
1768        "fn:map:len" | "fn:struct:len" => {
1769            if vals.len() != 1 {
1770                return Err(anyhow!("{fn_name}: requires 1 argument"));
1771            }
1772            match &vals[0] {
1773                Value::Compound(_, elems) => Ok(Value::Number((elems.len() / 2) as i64)),
1774                _ => Err(anyhow!("{fn_name}: expected compound value")),
1775            }
1776        }
1777        "fn:map:keys" => {
1778            if vals.len() != 1 {
1779                return Err(anyhow!("fn:map:keys: requires 1 argument"));
1780            }
1781            match &vals[0] {
1782                Value::Compound(_, elems) => {
1783                    let keys: Vec<Value> = elems.chunks_exact(2).map(|p| p[0].clone()).collect();
1784                    Ok(Value::Compound(CompoundKind::List, keys))
1785                }
1786                _ => Err(anyhow!("fn:map:keys: expected compound value")),
1787            }
1788        }
1789        "fn:map:values" | "fn:struct:values" => {
1790            if vals.len() != 1 {
1791                return Err(anyhow!("{fn_name}: requires 1 argument"));
1792            }
1793            match &vals[0] {
1794                Value::Compound(_, elems) => {
1795                    let values: Vec<Value> = elems.chunks_exact(2).map(|p| p[1].clone()).collect();
1796                    Ok(Value::Compound(CompoundKind::List, values))
1797                }
1798                _ => Err(anyhow!("{fn_name}: expected compound value")),
1799            }
1800        }
1801
1802        _ => Err(anyhow!("Unknown function: {fn_name}")),
1803    }
1804}
1805
1806fn time_component(vals: &[Value], extract: impl Fn(i64, i64) -> i64) -> Result<Value> {
1807    if vals.len() != 1 {
1808        return Err(anyhow!("time component function: requires 1 argument"));
1809    }
1810    match &vals[0] {
1811        Value::Time(nanos) => {
1812            let secs = nanos.div_euclid(1_000_000_000);
1813            let sub_nanos = nanos.rem_euclid(1_000_000_000);
1814            Ok(Value::Number(extract(secs, sub_nanos)))
1815        }
1816        v => Err(anyhow!("time component function: expected time, got {v}")),
1817    }
1818}
1819
1820fn civil_from_epoch_secs(secs: i64) -> (i32, u32, u32) {
1821    let days = secs.div_euclid(86400);
1822    let z = days + 719468;
1823    let era = (if z >= 0 { z } else { z - 146096 }) / 146097;
1824    let doe = (z - era * 146097) as u32;
1825    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1826    let y = yoe as i64 + era * 400;
1827    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1828    let mp = (5 * doy + 2) / 153;
1829    let d = doy - (153 * mp + 2) / 5 + 1;
1830    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1831    let y = if m <= 2 { y + 1 } else { y };
1832    (y as i32, m, d)
1833}
1834
1835fn duration_component_float(vals: &[Value], extract: impl Fn(i64) -> f64) -> Result<Value> {
1836    if vals.len() != 1 {
1837        return Err(anyhow!("duration component function: requires 1 argument"));
1838    }
1839    match &vals[0] {
1840        Value::Duration(nanos) => Ok(Value::Float(extract(*nanos))),
1841        v => Err(anyhow!("duration component function: expected duration, got {v}")),
1842    }
1843}
1844
1845fn duration_component_int(vals: &[Value], extract: impl Fn(i64) -> i64) -> Result<Value> {
1846    if vals.len() != 1 {
1847        return Err(anyhow!("duration component function: requires 1 argument"));
1848    }
1849    match &vals[0] {
1850        Value::Duration(nanos) => Ok(Value::Number(extract(*nanos))),
1851        v => Err(anyhow!("duration component function: expected duration, got {v}")),
1852    }
1853}
1854
1855fn duration_from(vals: &[Value], name: &str, multiplier: i64) -> Result<Value> {
1856    if vals.len() != 1 {
1857        return Err(anyhow!("fn:duration:from_{name}: requires 1 argument"));
1858    }
1859    match &vals[0] {
1860        Value::Number(n) => Ok(Value::Duration(n * multiplier)),
1861        v => Err(anyhow!("fn:duration:from_{name}: expected number, got {v}")),
1862    }
1863}
1864
1865/// Format a time (in UTC nanoseconds) with a given precision specifier.
1866fn format_time_with_precision(nanos: i64, precision: &str) -> Result<String> {
1867    let secs = nanos.div_euclid(1_000_000_000);
1868    let sub_nanos = nanos.rem_euclid(1_000_000_000);
1869    let (y, m, d) = civil_from_epoch_secs(secs);
1870    let time_of_day = secs.rem_euclid(86400);
1871    let hour = time_of_day / 3600;
1872    let minute = (time_of_day % 3600) / 60;
1873    let second = time_of_day % 60;
1874
1875    match precision {
1876        "/year" => Ok(format!("{y:04}")),
1877        "/month" => Ok(format!("{y:04}-{m:02}")),
1878        "/day" => Ok(format!("{y:04}-{m:02}-{d:02}")),
1879        "/hour" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}Z")),
1880        "/minute" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}Z")),
1881        "/second" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z")),
1882        "/millisecond" => {
1883            let ms = sub_nanos / 1_000_000;
1884            Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ms:03}Z"))
1885        }
1886        "/microsecond" => {
1887            let us = sub_nanos / 1_000;
1888            Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{us:06}Z"))
1889        }
1890        "/nanosecond" => {
1891            if sub_nanos == 0 {
1892                Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z"))
1893            } else {
1894                let ns_str = format!("{sub_nanos:09}");
1895                let ns_trimmed = ns_str.trim_end_matches('0');
1896                Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ns_trimmed}Z"))
1897            }
1898        }
1899        _ => Err(anyhow!("unknown time precision: {precision:?}")),
1900    }
1901}
1902
1903/// Parse a timezone string. Supports "UTC" and fixed offsets like "+05:30", "-08:00".
1904/// Returns offset in seconds from UTC.
1905fn parse_timezone_offset(tz: &str) -> Result<i64> {
1906    match tz {
1907        "UTC" => Ok(0),
1908        s if s.starts_with('+') || s.starts_with('-') => {
1909            let sign: i64 = if s.starts_with('-') { -1 } else { 1 };
1910            let rest = &s[1..];
1911            let parts: Vec<&str> = rest.split(':').collect();
1912            if parts.len() != 2 {
1913                return Err(anyhow!("invalid timezone offset: {tz:?}"));
1914            }
1915            let hours: i64 = parts[0].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1916            let minutes: i64 = parts[1].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1917            Ok(sign * (hours * 3600 + minutes * 60))
1918        }
1919        _ => Err(anyhow!("unsupported timezone: {tz:?} (use \"UTC\" or offset like \"+05:30\")")),
1920    }
1921}
1922
1923/// Parse an RFC3339-like timestamp string to nanoseconds since epoch.
1924fn parse_rfc3339_to_nanos(s: &str) -> Result<i64> {
1925    // Minimal RFC3339: "2006-01-02T15:04:05Z" or with fractional seconds
1926    if s.len() < 10 {
1927        return Err(anyhow!("fn:time:parse_rfc3339: string too short: {s:?}"));
1928    }
1929    let year: i64 = s[0..4].parse().map_err(|_| anyhow!("invalid year in {s:?}"))?;
1930    let month: u32 = s[5..7].parse().map_err(|_| anyhow!("invalid month in {s:?}"))?;
1931    let day: u32 = s[8..10].parse().map_err(|_| anyhow!("invalid day in {s:?}"))?;
1932
1933    let (hour, minute, second, frac_nanos) = if s.len() > 10 && s.as_bytes()[10] == b'T' {
1934        if s.len() < 19 {
1935            return Err(anyhow!("fn:time:parse_rfc3339: incomplete time in {s:?}"));
1936        }
1937        let h: u32 = s[11..13].parse().map_err(|_| anyhow!("invalid hour"))?;
1938        let min: u32 = s[14..16].parse().map_err(|_| anyhow!("invalid minute"))?;
1939        let sec: u32 = s[17..19].parse().map_err(|_| anyhow!("invalid second"))?;
1940
1941        let frac = if s.len() > 19 && s.as_bytes()[19] == b'.' {
1942            let end = s.len() - if s.ends_with('Z') { 1 } else { 0 };
1943            let frac_str = &s[20..end];
1944            let padded = format!("{frac_str:0<9}");
1945            padded[..9].parse::<i64>().unwrap_or(0)
1946        } else {
1947            0
1948        };
1949        (h, min, sec, frac)
1950    } else {
1951        (0, 0, 0, 0)
1952    };
1953
1954    let y = if month <= 2 { year - 1 } else { year };
1955    let era = (if y >= 0 { y } else { y - 399 }) / 400;
1956    let yoe = (y - era * 400) as u32;
1957    let m_adj = if month > 2 { month - 3 } else { month + 9 };
1958    let doy = (153 * m_adj + 2) / 5 + day - 1;
1959    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1960    let days = era * 146097 + doe as i64 - 719468;
1961
1962    let total_seconds = days * 86400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64;
1963    Ok(total_seconds * 1_000_000_000 + frac_nanos)
1964}
1965
1966/// Parse a civil datetime string like "2024-01-15T10:30:00" or "2024-01-15T10:30:00.000000000".
1967fn parse_civil_datetime_to_nanos(s: &str) -> Result<i64> {
1968    // Reuse RFC3339 parser — civil format is the same without 'Z' suffix
1969    parse_rfc3339_to_nanos(s)
1970}
1971
1972/// Parse a Go-style duration string like "1h30m", "500ms", "-2h45m30s", "1.5h".
1973fn parse_duration_string(s: &str) -> Result<i64> {
1974    if s.is_empty() {
1975        return Err(anyhow!("fn:duration:parse: empty string"));
1976    }
1977    if s == "0" || s == "0s" {
1978        return Ok(0);
1979    }
1980
1981    let (sign, mut rest) = if s.starts_with('-') {
1982        (-1i64, &s[1..])
1983    } else if s.starts_with('+') {
1984        (1i64, &s[1..])
1985    } else {
1986        (1i64, s)
1987    };
1988
1989    let mut total_nanos: i64 = 0;
1990
1991    while !rest.is_empty() {
1992        // Parse numeric part (integer or float)
1993        let num_end = rest
1994            .find(|c: char| !c.is_ascii_digit() && c != '.')
1995            .unwrap_or(rest.len());
1996        if num_end == 0 {
1997            return Err(anyhow!("fn:duration:parse: expected number in {s:?}"));
1998        }
1999        let num_str = &rest[..num_end];
2000        rest = &rest[num_end..];
2001
2002        // Parse unit suffix
2003        let (unit_nanos, unit_len) = if rest.starts_with("ns") {
2004            (1i64, 2)
2005        } else if rest.starts_with("us") || rest.starts_with("µs") {
2006            (1_000i64, if rest.starts_with("µ") { 3 } else { 2 })
2007        } else if rest.starts_with("ms") {
2008            (1_000_000i64, 2)
2009        } else if rest.starts_with('s') {
2010            (1_000_000_000i64, 1)
2011        } else if rest.starts_with('m') {
2012            (60 * 1_000_000_000i64, 1)
2013        } else if rest.starts_with('h') {
2014            (3600 * 1_000_000_000i64, 1)
2015        } else {
2016            return Err(anyhow!("fn:duration:parse: unknown unit in {s:?}"));
2017        };
2018        rest = &rest[unit_len..];
2019
2020        if num_str.contains('.') {
2021            let val: f64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
2022            total_nanos += (val * unit_nanos as f64) as i64;
2023        } else {
2024            let val: i64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
2025            total_nanos += val * unit_nanos;
2026        }
2027    }
2028
2029    Ok(sign * total_nanos)
2030}
2031
2032#[cfg(test)]
2033mod tests {
2034    use super::*;
2035
2036    #[test]
2037    fn test_retract_existing() {
2038        let mut store = MemStore::new();
2039        store.add_fact("r", vec![Value::Number(1), Value::Number(2)]);
2040        store.add_fact("r", vec![Value::Number(3), Value::Number(4)]);
2041
2042        let removed = store
2043            .retract("r", &[Value::Number(1), Value::Number(2)])
2044            .unwrap();
2045        assert!(removed);
2046
2047        let facts = store.get_facts("r");
2048        assert_eq!(facts.len(), 1);
2049        assert_eq!(facts[0], vec![Value::Number(3), Value::Number(4)]);
2050    }
2051
2052    #[test]
2053    fn test_retract_nonexistent() {
2054        let mut store = MemStore::new();
2055        store.add_fact("r", vec![Value::Number(1)]);
2056
2057        let removed = store.retract("r", &[Value::Number(99)]).unwrap();
2058        assert!(!removed);
2059
2060        let facts = store.get_facts("r");
2061        assert_eq!(facts.len(), 1);
2062        assert_eq!(facts[0], vec![Value::Number(1)]);
2063    }
2064
2065    #[test]
2066    fn test_clear() {
2067        let mut store = MemStore::new();
2068        store.add_fact("r", vec![Value::Number(1)]);
2069        store.add_fact("r", vec![Value::Number(2)]);
2070        store.add_fact("s", vec![Value::Number(10)]);
2071
2072        store.clear("r");
2073
2074        let r_facts = store.get_facts("r");
2075        assert!(r_facts.is_empty());
2076
2077        // "s" should be untouched
2078        let s_facts = store.get_facts("s");
2079        assert_eq!(s_facts.len(), 1);
2080    }
2081
2082    #[test]
2083    fn test_relation_names() {
2084        let mut store = MemStore::new();
2085        store.create_relation("alpha");
2086        store.create_relation("beta");
2087        store.add_fact("gamma", vec![Value::Number(1)]);
2088
2089        let mut names = store.relation_names();
2090        names.sort();
2091        assert_eq!(names, vec!["alpha", "beta", "gamma"]);
2092    }
2093
2094    #[test]
2095    fn test_provenance_recording() {
2096        use mangle_ir::physical::{DataSource, Operand};
2097
2098        // Build a minimal IR manually to test provenance
2099        let mut ir = mangle_ir::Ir::new();
2100        let base_name = ir.intern_name("base");
2101        let derived_name = ir.intern_name("derived");
2102        let var_x = ir.intern_name("X");
2103
2104        // Create an Op: Iterate(Scan("base", [X]), Insert("derived", [X]))
2105        let op = Op::Iterate {
2106            source: DataSource::Scan {
2107                relation: base_name,
2108                vars: vec![var_x],
2109            },
2110            body: Box::new(Op::Insert {
2111                relation: derived_name,
2112                args: vec![Operand::Var(var_x)],
2113            }),
2114        };
2115
2116        let mut store = Box::new(MemStore::new());
2117        store.add_fact("base", vec![Value::Number(10)]);
2118        store.add_fact("base", vec![Value::Number(20)]);
2119        store.create_relation("derived");
2120
2121        let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>).with_provenance();
2122
2123        let count = interpreter.execute(&op).unwrap();
2124        assert_eq!(count, 2);
2125
2126        // Check provenance was recorded
2127        let prov = interpreter.provenance.as_ref().unwrap();
2128        assert_eq!(prov.entries.len(), 2);
2129
2130        // Each derived fact should have one premise (from "base")
2131        for entry in &prov.entries {
2132            assert_eq!(entry.derived.0, "derived");
2133            assert_eq!(entry.premises.len(), 1);
2134            assert_eq!(entry.premises[0].0, "base");
2135        }
2136
2137        // Check the actual derived facts
2138        let mut derived_vals: Vec<i64> = prov
2139            .entries
2140            .iter()
2141            .map(|e| match &e.derived.1[0] {
2142                Value::Number(n) => *n,
2143                _ => panic!("expected number"),
2144            })
2145            .collect();
2146        derived_vals.sort();
2147        assert_eq!(derived_vals, vec![10, 20]);
2148    }
2149
2150    #[test]
2151    fn test_float_values() {
2152        use mangle_ir::physical::{self, DataSource, Expr, Operand};
2153
2154        let mut ir = mangle_ir::Ir::new();
2155        let temps = ir.intern_name("temps");
2156        let result = ir.intern_name("result");
2157        let var_x = ir.intern_name("X");
2158        // First, verify basic scan+insert of floats works
2159        let simple_op = Op::Iterate {
2160            source: DataSource::Scan {
2161                relation: temps,
2162                vars: vec![var_x],
2163            },
2164            body: Box::new(Op::Insert {
2165                relation: result,
2166                args: vec![Operand::Var(var_x)],
2167            }),
2168        };
2169
2170        let mut store = Box::new(MemStore::new());
2171        store.add_fact("temps", vec![Value::Float(36.5)]);
2172        store.add_fact("temps", vec![Value::Float(35.9)]);
2173        store.add_fact("temps", vec![Value::Float(37.2)]);
2174        store.create_relation("result");
2175
2176        let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>);
2177        let count = interpreter.execute(&simple_op).unwrap();
2178        assert_eq!(count, 3, "basic float scan+insert should produce 3 facts");
2179
2180        // Now test with filter and arithmetic
2181        let mut ir2 = mangle_ir::Ir::new();
2182        let temps2 = ir2.intern_name("temps");
2183        let result2 = ir2.intern_name("result2");
2184        let var_x2 = ir2.intern_name("X");
2185        let var_y2 = ir2.intern_name("Y");
2186        let fn_plus2 = ir2.intern_name("fn:float:plus");
2187
2188        let op = Op::Iterate {
2189            source: DataSource::Scan {
2190                relation: temps2,
2191                vars: vec![var_x2],
2192            },
2193            body: Box::new(Op::Filter {
2194                cond: Condition::Cmp {
2195                    op: physical::CmpOp::Gt,
2196                    left: Operand::Var(var_x2),
2197                    right: Operand::Const(physical::Constant::Float(36.0)),
2198                },
2199                body: Box::new(Op::Let {
2200                    var: var_y2,
2201                    expr: Expr::Call {
2202                        function: fn_plus2,
2203                        args: vec![
2204                            Operand::Var(var_x2),
2205                            Operand::Const(physical::Constant::Float(0.5)),
2206                        ],
2207                    },
2208                    body: Box::new(Op::Insert {
2209                        relation: result2,
2210                        args: vec![Operand::Var(var_x2), Operand::Var(var_y2)],
2211                    }),
2212                }),
2213            }),
2214        };
2215
2216        let mut store2 = Box::new(MemStore::new());
2217        store2.add_fact("temps", vec![Value::Float(36.5)]);
2218        store2.add_fact("temps", vec![Value::Float(35.9)]);
2219        store2.add_fact("temps", vec![Value::Float(37.2)]);
2220        store2.create_relation("result2");
2221
2222        let mut interpreter2 = Interpreter::new(&ir2, store2 as Box<dyn Store>);
2223        let count2 = interpreter2.execute(&op).unwrap();
2224
2225        // Only 36.5 and 37.2 are > 36.0
2226        assert_eq!(count2, 2);
2227
2228        // Results are in next_delta, check via scan_next_delta
2229        let results: Vec<_> = interpreter2
2230            .store()
2231            .scan_next_delta("result2")
2232            .unwrap()
2233            .collect();
2234        assert_eq!(results.len(), 2);
2235
2236        let mut output: Vec<(f64, f64)> = results
2237            .iter()
2238            .map(|t| match (&t[0], &t[1]) {
2239                (Value::Float(a), Value::Float(b)) => (*a, *b),
2240                _ => panic!("expected floats"),
2241            })
2242            .collect();
2243        output.sort_by(|a, b| a.0.total_cmp(&b.0));
2244
2245        assert_eq!(output[0], (36.5, 37.0));
2246        assert_eq!(output[1], (37.2, 37.7));
2247    }
2248
2249    #[test]
2250    fn test_float_in_memstore() {
2251        // Test that Float values work correctly as HashMap keys (equality, hashing)
2252        let mut store = MemStore::new();
2253        store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2254        store.add_fact("data", vec![Value::Float(2.5), Value::Number(20)]);
2255        // Duplicate should not be added
2256        store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2257
2258        let facts = store.get_facts("data");
2259        assert_eq!(facts.len(), 2);
2260
2261        // Test index lookup
2262        let results: Vec<_> = store
2263            .scan_index("data", 0, &Value::Float(1.5))
2264            .unwrap()
2265            .collect();
2266        assert_eq!(results.len(), 1);
2267        assert_eq!(results[0][1], Value::Number(10));
2268    }
2269
2270    // --- eval_function unit tests ---
2271
2272    #[test]
2273    fn test_fn_plus_variadic() {
2274        assert_eq!(
2275            eval_function("fn:plus", &[Value::Number(1), Value::Number(2), Value::Number(3)]).unwrap(),
2276            Value::Number(6)
2277        );
2278        // Zero args returns 0 (identity)
2279        assert_eq!(eval_function("fn:plus", &[]).unwrap(), Value::Number(0));
2280    }
2281
2282    #[test]
2283    fn test_fn_minus_variadic() {
2284        // Unary
2285        assert_eq!(
2286            eval_function("fn:minus", &[Value::Number(5)]).unwrap(),
2287            Value::Number(-5)
2288        );
2289        // Binary
2290        assert_eq!(
2291            eval_function("fn:minus", &[Value::Number(10), Value::Number(3)]).unwrap(),
2292            Value::Number(7)
2293        );
2294        // Variadic: 100 - 10 - 20 = 70
2295        assert_eq!(
2296            eval_function("fn:minus", &[Value::Number(100), Value::Number(10), Value::Number(20)]).unwrap(),
2297            Value::Number(70)
2298        );
2299        // Zero args is an error
2300        assert!(eval_function("fn:minus", &[]).is_err());
2301    }
2302
2303    #[test]
2304    fn test_fn_mult_variadic() {
2305        assert_eq!(
2306            eval_function("fn:mult", &[Value::Number(2), Value::Number(3), Value::Number(4)]).unwrap(),
2307            Value::Number(24)
2308        );
2309        // Zero args returns 1 (identity)
2310        assert_eq!(eval_function("fn:mult", &[]).unwrap(), Value::Number(1));
2311    }
2312
2313    #[test]
2314    fn test_fn_div() {
2315        // Binary
2316        assert_eq!(
2317            eval_function("fn:div", &[Value::Number(10), Value::Number(3)]).unwrap(),
2318            Value::Number(3)
2319        );
2320        // Unary: 1/n
2321        assert_eq!(
2322            eval_function("fn:div", &[Value::Number(5)]).unwrap(),
2323            Value::Number(0)
2324        );
2325        assert_eq!(
2326            eval_function("fn:div", &[Value::Number(1)]).unwrap(),
2327            Value::Number(1)
2328        );
2329        // Division by zero
2330        assert!(eval_function("fn:div", &[Value::Number(1), Value::Number(0)]).is_err());
2331        assert!(eval_function("fn:div", &[Value::Number(0)]).is_err());
2332    }
2333
2334    #[test]
2335    fn test_fn_float_promotion() {
2336        // fn:float:plus accepts both Float and Number
2337        assert_eq!(
2338            eval_function("fn:float:plus", &[Value::Float(1.5), Value::Number(2)]).unwrap(),
2339            Value::Float(3.5)
2340        );
2341        // fn:sqrt accepts Number
2342        assert_eq!(
2343            eval_function("fn:sqrt", &[Value::Number(9)]).unwrap(),
2344            Value::Float(3.0)
2345        );
2346    }
2347
2348    #[test]
2349    fn test_fn_string_concat() {
2350        assert_eq!(
2351            eval_function(
2352                "fn:string:concat",
2353                &[Value::String("a".into()), Value::String("b".into()), Value::String("c".into())]
2354            ).unwrap(),
2355            Value::String("abc".to_string())
2356        );
2357        // Mixed types
2358        assert_eq!(
2359            eval_function(
2360                "fn:string:concat",
2361                &[Value::String("n=".into()), Value::Number(42)]
2362            ).unwrap(),
2363            Value::String("n=42".to_string())
2364        );
2365    }
2366
2367    #[test]
2368    fn test_fn_string_replace() {
2369        // Replace all (-1)
2370        assert_eq!(
2371            eval_function(
2372                "fn:string:replace",
2373                &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(-1)]
2374            ).unwrap(),
2375            Value::String("a_b_c".to_string())
2376        );
2377        // Replace first only (1)
2378        assert_eq!(
2379            eval_function(
2380                "fn:string:replace",
2381                &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(1)]
2382            ).unwrap(),
2383            Value::String("a_b-c".to_string())
2384        );
2385    }
2386
2387    #[test]
2388    fn test_fn_to_string() {
2389        assert_eq!(
2390            eval_function("fn:number:to_string", &[Value::Number(42)]).unwrap(),
2391            Value::String("42".to_string())
2392        );
2393        assert_eq!(
2394            eval_function("fn:float64:to_string", &[Value::Float(3.14)]).unwrap(),
2395            Value::String("3.14".to_string())
2396        );
2397        assert_eq!(
2398            eval_function("fn:name:to_string", &[Value::Name("/role/admin".into())]).unwrap(),
2399            Value::String("/role/admin".to_string())
2400        );
2401    }
2402
2403    // -----------------------------------------------------------------------
2404    // Temporal coalescing tests (ported from Go factstore/temporal_test.go)
2405    // -----------------------------------------------------------------------
2406
2407    // Helper: create a time value in nanoseconds for a date
2408    fn date_nanos(year: i64, month: u32, day: u32) -> i64 {
2409        // Howard Hinnant's algorithm (matching the parser)
2410        let m = month;
2411        let y = if m <= 2 { year - 1 } else { year };
2412        let era = (if y >= 0 { y } else { y - 399 }) / 400;
2413        let yoe = (y - era * 400) as u32;
2414        let m_adj = if m > 2 { m - 3 } else { m + 9 };
2415        let doy = (153 * m_adj + 2) / 5 + day - 1;
2416        let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2417        let days = era * 146097 + doe as i64 - 719468;
2418        days * 86400 * 1_000_000_000
2419    }
2420
2421    fn datetime_nanos(year: i64, month: u32, day: u32, h: u32, m: u32, s: u32, ns: i64) -> i64 {
2422        date_nanos(year, month, day) + (h as i64) * 3_600_000_000_000
2423            + (m as i64) * 60_000_000_000 + (s as i64) * 1_000_000_000 + ns
2424    }
2425
2426    /// Go: TestTemporalStore_Coalesce - overlapping intervals merge into one
2427    #[test]
2428    fn test_coalesce_overlapping() {
2429        let mut store = MemStore::new();
2430        let jan1 = date_nanos(2024, 1, 1);
2431        let jan15 = date_nanos(2024, 1, 15);
2432        let jan10 = date_nanos(2024, 1, 10);
2433        let jan25 = date_nanos(2024, 1, 25);
2434        let jan20 = date_nanos(2024, 1, 20);
2435        let jan31 = date_nanos(2024, 1, 31);
2436
2437        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan1), Value::Time(jan15)]);
2438        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan10), Value::Time(jan25)]);
2439        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan20), Value::Time(jan31)]);
2440
2441        assert_eq!(store.get_facts("active").len(), 3);
2442
2443        store.coalesce_temporal("active");
2444
2445        let facts = store.get_facts("active");
2446        assert_eq!(facts.len(), 1, "after coalesce: expected 1, got {:?}", facts);
2447        assert_eq!(facts[0][1], Value::Time(jan1), "start should be Jan 1");
2448        assert_eq!(facts[0][2], Value::Time(jan31), "end should be Jan 31");
2449    }
2450
2451    /// Go: TestTemporalStore_CoalesceAdjacent - adjacent intervals (1ns gap) coalesce
2452    #[test]
2453    fn test_coalesce_adjacent() {
2454        let mut store = MemStore::new();
2455        let shift1_start = datetime_nanos(2024, 1, 1, 8, 0, 0, 0);
2456        let shift1_end = datetime_nanos(2024, 1, 1, 16, 0, 0, 0);
2457        let shift2_start = datetime_nanos(2024, 1, 1, 16, 0, 0, 1); // 1ns after
2458        let shift2_end = date_nanos(2024, 1, 2);
2459
2460        store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift1_start), Value::Time(shift1_end)]);
2461        store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift2_start), Value::Time(shift2_end)]);
2462
2463        store.coalesce_temporal("shift");
2464
2465        let facts = store.get_facts("shift");
2466        assert_eq!(facts.len(), 1, "adjacent intervals should coalesce");
2467        assert_eq!(facts[0][1], Value::Time(shift1_start));
2468        assert_eq!(facts[0][2], Value::Time(shift2_end));
2469    }
2470
2471    /// Go: TestTemporalStore_CoalesceNonOverlapping - non-overlapping intervals stay separate
2472    #[test]
2473    fn test_coalesce_non_overlapping() {
2474        let mut store = MemStore::new();
2475        let jan1 = date_nanos(2024, 1, 1);
2476        let jan7 = date_nanos(2024, 1, 7);
2477        let jun1 = date_nanos(2024, 6, 1);
2478        let jun14 = date_nanos(2024, 6, 14);
2479
2480        store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan7)]);
2481        store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jun1), Value::Time(jun14)]);
2482
2483        store.coalesce_temporal("vacation");
2484
2485        let facts = store.get_facts("vacation");
2486        assert_eq!(facts.len(), 2, "non-overlapping intervals should stay separate");
2487    }
2488
2489    /// Go: TestTemporalStore_CoalesceMixedGranularity - sub-second precision
2490    #[test]
2491    fn test_coalesce_mixed_granularity() {
2492        let mut store = MemStore::new();
2493        // Second-level: 10:00:00 to 10:00:05
2494        let t1_start = datetime_nanos(2024, 1, 1, 10, 0, 0, 0);
2495        let t1_end = datetime_nanos(2024, 1, 1, 10, 0, 5, 0);
2496        // Overlapping millisecond-level: 10:00:04.5 to 10:00:06
2497        let t2_start = datetime_nanos(2024, 1, 1, 10, 0, 4, 500_000_000);
2498        let t2_end = datetime_nanos(2024, 1, 1, 10, 0, 6, 0);
2499        // Adjacent nanosecond-level: 10:00:06.000000001 to 10:00:07
2500        let t3_start = datetime_nanos(2024, 1, 1, 10, 0, 6, 1);
2501        let t3_end = datetime_nanos(2024, 1, 1, 10, 0, 7, 0);
2502
2503        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t1_start), Value::Time(t1_end)]);
2504        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t2_start), Value::Time(t2_end)]);
2505        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t3_start), Value::Time(t3_end)]);
2506
2507        store.coalesce_temporal("event");
2508
2509        let facts = store.get_facts("event");
2510        assert_eq!(facts.len(), 1, "mixed granularity should coalesce to 1");
2511        assert_eq!(facts[0][1], Value::Time(t1_start), "start: 10:00:00");
2512        assert_eq!(facts[0][2], Value::Time(t3_end), "end: 10:00:07");
2513    }
2514
2515    /// Test coalescing with multiple keys: same relation, different key columns
2516    #[test]
2517    fn test_coalesce_multiple_keys() {
2518        let mut store = MemStore::new();
2519        let jan1 = date_nanos(2024, 1, 1);
2520        let jan10 = date_nanos(2024, 1, 10);
2521        let jan5 = date_nanos(2024, 1, 5);
2522        let jan15 = date_nanos(2024, 1, 15);
2523
2524        // Alice: two overlapping intervals
2525        store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan10)]);
2526        store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan5), Value::Time(jan15)]);
2527        // Bob: one interval
2528        store.add_fact("employed", vec![Value::String("/bob".into()), Value::Time(jan1), Value::Time(jan15)]);
2529
2530        store.coalesce_temporal("employed");
2531
2532        let facts = store.get_facts("employed");
2533        // Alice's 2 intervals merge to 1; Bob stays as 1
2534        assert_eq!(facts.len(), 2, "expected 2 facts after coalesce, got {:?}", facts);
2535    }
2536
2537    // --- HashJoin tests ---------------------------------------------------
2538    //
2539    // Each test hand-constructs an `Op::HashJoin` and compares its output to
2540    // the equivalent nested-loop plan (`Op::Iterate { body: Op::Iterate {..}
2541    // }` with an equality check). This isolates HashJoin correctness from
2542    // planner emission, which lives in a separate gated path.
2543
2544    /// Build `result(X, Y) :- a(X, Z), b(Z, Y).` as nested-loop (join-key
2545    /// check via Filter on Z) — the correctness baseline for HashJoin.
2546    fn nested_loop_two_way(
2547        ir: &mangle_ir::Ir,
2548        a: NameId,
2549        b: NameId,
2550        result: NameId,
2551        x: NameId,
2552        z: NameId,
2553        y: NameId,
2554        z_right: NameId,
2555    ) -> Op {
2556        use mangle_ir::physical::{CmpOp, Condition, DataSource, Operand};
2557        let _ = ir;
2558        Op::Iterate {
2559            source: DataSource::Scan {
2560                relation: a,
2561                vars: vec![x, z],
2562            },
2563            body: Box::new(Op::Iterate {
2564                source: DataSource::Scan {
2565                    relation: b,
2566                    vars: vec![z_right, y],
2567                },
2568                body: Box::new(Op::Filter {
2569                    cond: Condition::Cmp {
2570                        op: CmpOp::Eq,
2571                        left: Operand::Var(z),
2572                        right: Operand::Var(z_right),
2573                    },
2574                    body: Box::new(Op::Insert {
2575                        relation: result,
2576                        args: vec![Operand::Var(x), Operand::Var(y)],
2577                    }),
2578                }),
2579            }),
2580        }
2581    }
2582
2583    /// Same rule as `nested_loop_two_way` but with HashJoin.
2584    fn hash_join_two_way(
2585        a: NameId,
2586        b: NameId,
2587        result: NameId,
2588        x: NameId,
2589        z: NameId,
2590        y: NameId,
2591    ) -> Op {
2592        use mangle_ir::physical::{DataSource, Operand};
2593        Op::HashJoin {
2594            build_source: DataSource::Scan {
2595                relation: a,
2596                vars: vec![x, z],
2597            },
2598            probe_source: DataSource::Scan {
2599                relation: b,
2600                vars: vec![z, y],
2601            },
2602            join_keys: vec![z],
2603            body: Box::new(Op::Insert {
2604                relation: result,
2605                args: vec![Operand::Var(x), Operand::Var(y)],
2606            }),
2607        }
2608    }
2609
2610    fn run_plan(ir: &mangle_ir::Ir, facts: &[(&str, Vec<Value>)], op: &Op) -> Vec<Vec<Value>> {
2611        let mut store = Box::new(MemStore::new());
2612        for (rel, t) in facts {
2613            store.add_fact(rel, t.clone());
2614        }
2615        store.create_relation("result");
2616        let mut interp = Interpreter::new(ir, store as Box<dyn Store>);
2617        interp.execute(op).unwrap();
2618        // Inserts land in next_delta; promote twice so `scan()` surfaces them.
2619        let mut store = interp.into_store();
2620        store.merge_deltas();
2621        store.merge_deltas();
2622        store.scan("result").unwrap().collect()
2623    }
2624
2625    fn sorted(mut v: Vec<Vec<Value>>) -> Vec<Vec<Value>> {
2626        v.sort();
2627        v
2628    }
2629
2630    /// Build an IR with the names needed for the two-way-join tests.
2631    fn setup_two_way_ir()
2632    -> (mangle_ir::Ir, NameId, NameId, NameId, NameId, NameId, NameId, NameId) {
2633        let mut ir = mangle_ir::Ir::new();
2634        let a = ir.intern_name("a");
2635        let b = ir.intern_name("b");
2636        let result = ir.intern_name("result");
2637        let x = ir.intern_name("X");
2638        let z = ir.intern_name("Z");
2639        let y = ir.intern_name("Y");
2640        // A separate NameId for the nested-loop baseline's Z on the right side
2641        // — nested-loop needs a distinct var so the Filter can check equality.
2642        let z_right = ir.intern_name("Z_right");
2643        (ir, a, b, result, x, z, y, z_right)
2644    }
2645
2646    #[test]
2647    fn test_hashjoin_matches_nested_loop_basic() {
2648        let (ir, a, b, result, x, z, y, z_right) = setup_two_way_ir();
2649
2650        let facts: Vec<(&str, Vec<Value>)> = vec![
2651            ("a", vec![Value::Number(1), Value::Number(10)]),
2652            ("a", vec![Value::Number(2), Value::Number(20)]),
2653            ("a", vec![Value::Number(3), Value::Number(10)]),
2654            ("b", vec![Value::Number(10), Value::Number(100)]),
2655            ("b", vec![Value::Number(10), Value::Number(101)]),
2656            ("b", vec![Value::Number(20), Value::Number(200)]),
2657            ("b", vec![Value::Number(30), Value::Number(300)]),
2658        ];
2659
2660        let baseline = run_plan(
2661            &ir,
2662            &facts,
2663            &nested_loop_two_way(&ir, a, b, result, x, z, y, z_right),
2664        );
2665        let via_hash = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2666
2667        assert_eq!(
2668            sorted(baseline.clone()),
2669            sorted(via_hash.clone()),
2670            "HashJoin output must match nested-loop baseline"
2671        );
2672        // Sanity: we expect (1,100), (1,101), (2,200), (3,100), (3,101).
2673        assert_eq!(sorted(via_hash).len(), 5);
2674    }
2675
2676    #[test]
2677    fn test_hashjoin_empty_build() {
2678        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2679        let facts: Vec<(&str, Vec<Value>)> = vec![
2680            ("b", vec![Value::Number(10), Value::Number(100)]),
2681            ("b", vec![Value::Number(20), Value::Number(200)]),
2682        ];
2683        let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2684        assert!(out.is_empty());
2685    }
2686
2687    #[test]
2688    fn test_hashjoin_empty_probe() {
2689        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2690        let facts: Vec<(&str, Vec<Value>)> = vec![
2691            ("a", vec![Value::Number(1), Value::Number(10)]),
2692            ("a", vec![Value::Number(2), Value::Number(20)]),
2693        ];
2694        let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2695        assert!(out.is_empty());
2696    }
2697
2698    #[test]
2699    fn test_hashjoin_no_matches() {
2700        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2701        let facts: Vec<(&str, Vec<Value>)> = vec![
2702            ("a", vec![Value::Number(1), Value::Number(10)]),
2703            ("b", vec![Value::Number(99), Value::Number(200)]),
2704        ];
2705        let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2706        assert!(out.is_empty());
2707    }
2708
2709    #[test]
2710    fn test_hashjoin_value_variants_as_key() {
2711        // Use strings and names as join keys to exercise value hashing on
2712        // non-integer variants. `r(X, Y) :- a(X, K), b(K, Y).`
2713        let mut ir = mangle_ir::Ir::new();
2714        let a = ir.intern_name("a");
2715        let b = ir.intern_name("b");
2716        let result = ir.intern_name("result");
2717        let x = ir.intern_name("X");
2718        let k = ir.intern_name("K");
2719        let y = ir.intern_name("Y");
2720
2721        let facts: Vec<(&str, Vec<Value>)> = vec![
2722            ("a", vec![Value::Number(1), Value::String("hello".into())]),
2723            ("a", vec![Value::Number(2), Value::Name("/foo".into())]),
2724            ("b", vec![Value::String("hello".into()), Value::Number(100)]),
2725            ("b", vec![Value::Name("/foo".into()), Value::Number(200)]),
2726            // An entry that must NOT match — Name vs String are distinct.
2727            (
2728                "b",
2729                vec![Value::String("/foo".into()), Value::Number(999)],
2730            ),
2731        ];
2732        let op = hash_join_two_way(a, b, result, x, k, y);
2733        let out = sorted(run_plan(&ir, &facts, &op));
2734        assert_eq!(
2735            out,
2736            sorted(vec![
2737                vec![Value::Number(1), Value::Number(100)],
2738                vec![Value::Number(2), Value::Number(200)],
2739            ])
2740        );
2741    }
2742
2743    #[test]
2744    fn test_hashjoin_multi_key() {
2745        // `r(X, W) :- a(X, K1, K2), b(K1, K2, W).` — compound (two-variable)
2746        // join key.
2747        let mut ir = mangle_ir::Ir::new();
2748        let a = ir.intern_name("a");
2749        let b = ir.intern_name("b");
2750        let result = ir.intern_name("result");
2751        let x = ir.intern_name("X");
2752        let k1 = ir.intern_name("K1");
2753        let k2 = ir.intern_name("K2");
2754        let w = ir.intern_name("W");
2755
2756        let op = Op::HashJoin {
2757            build_source: DataSource::Scan {
2758                relation: a,
2759                vars: vec![x, k1, k2],
2760            },
2761            probe_source: DataSource::Scan {
2762                relation: b,
2763                vars: vec![k1, k2, w],
2764            },
2765            join_keys: vec![k1, k2],
2766            body: Box::new(Op::Insert {
2767                relation: result,
2768                args: vec![Operand::Var(x), Operand::Var(w)],
2769            }),
2770        };
2771
2772        let facts: Vec<(&str, Vec<Value>)> = vec![
2773            (
2774                "a",
2775                vec![Value::Number(1), Value::Number(10), Value::Number(100)],
2776            ),
2777            (
2778                "a",
2779                vec![Value::Number(2), Value::Number(10), Value::Number(200)],
2780            ),
2781            // Matching K1 but different K2 — must NOT join with (10, 100, ...).
2782            (
2783                "a",
2784                vec![Value::Number(3), Value::Number(10), Value::Number(999)],
2785            ),
2786            (
2787                "b",
2788                vec![Value::Number(10), Value::Number(100), Value::Number(1000)],
2789            ),
2790            (
2791                "b",
2792                vec![Value::Number(10), Value::Number(200), Value::Number(2000)],
2793            ),
2794        ];
2795
2796        let out = sorted(run_plan(&ir, &facts, &op));
2797        assert_eq!(
2798            out,
2799            sorted(vec![
2800                vec![Value::Number(1), Value::Number(1000)],
2801                vec![Value::Number(2), Value::Number(2000)],
2802            ])
2803        );
2804    }
2805
2806    #[test]
2807    fn test_hashjoin_duplicate_build_keys() {
2808        // Multiple build rows with the same key must all be emitted (one
2809        // probe tuple × N build matches = N results).
2810        let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2811        let facts: Vec<(&str, Vec<Value>)> = vec![
2812            ("a", vec![Value::Number(1), Value::Number(10)]),
2813            ("a", vec![Value::Number(2), Value::Number(10)]),
2814            ("a", vec![Value::Number(3), Value::Number(10)]),
2815            ("b", vec![Value::Number(10), Value::Number(99)]),
2816        ];
2817        let op = hash_join_two_way(a, b, result, x, z, y);
2818        let out = sorted(run_plan(&ir, &facts, &op));
2819        assert_eq!(
2820            out,
2821            sorted(vec![
2822                vec![Value::Number(1), Value::Number(99)],
2823                vec![Value::Number(2), Value::Number(99)],
2824                vec![Value::Number(3), Value::Number(99)],
2825            ])
2826        );
2827    }
2828}