Skip to main content

mangle_interpreter/
lib.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Mangle Interpreter
16//!
17//! A pure Rust interpreter for the Mangle Intermediate Representation (IR).
18//!
19//! This crate enables the **Edge Mode** of execution, allowing Mangle programs
20//! to run on devices or in environments where a WebAssembly runtime is not available
21//! or desired.
22//!
23//! It executes the Physical IR operations (`Op`) directly.
24//!
25//! ## Usage
26//!
27//! See `mangle-driver` for the high-level API to compile and execute source code.
28
29use anyhow::{Result, anyhow};
30use mangle_ir::physical::{Aggregate, CmpOp, Condition, Constant, DataSource, Expr, Op, Operand};
31use mangle_ir::{Ir, NameId};
32use std::collections::HashMap;
33
34pub use mangle_common::{CompoundKind, Store, Value};
35
36/// Internal storage cell. Compound values are stored inline as a `CompoundStart`
37/// marker (holding the kind and element count) followed by that many cells
38/// (which may themselves be nested compounds). Scalar values are stored directly.
39#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
40enum Cell {
41    Val(Value),
42    /// Marks the start of a compound with `n` logical elements following.
43    CompoundStart(CompoundKind, usize),
44}
45
46/// Flatten a `Value` into a sequence of `Cell`s.
47/// Scalar values become a single `Cell::Val`. Compound values become
48/// `Cell::CompoundStart(n)` followed by the flattened cells of each element.
49fn flatten_value(v: &Value, out: &mut Vec<Cell>) {
50    match v {
51        Value::Compound(kind, elems) => {
52            out.push(Cell::CompoundStart(*kind, elems.len()));
53            for elem in elems {
54                flatten_value(elem, out);
55            }
56        }
57        _ => out.push(Cell::Val(v.clone())),
58    }
59}
60
61/// Flatten a tuple of Values into a flat Vec of Cells.
62fn flatten_tuple(tuple: &[Value]) -> Vec<Cell> {
63    let mut cells = Vec::new();
64    for v in tuple {
65        flatten_value(v, &mut cells);
66    }
67    cells
68}
69
70/// Read one logical Value from cells starting at `pos`, advancing `pos`.
71fn unflatten_one(cells: &[Cell], pos: &mut usize) -> Value {
72    match &cells[*pos] {
73        Cell::Val(v) => {
74            *pos += 1;
75            v.clone()
76        }
77        Cell::CompoundStart(kind, n) => {
78            let kind = *kind;
79            let n = *n;
80            *pos += 1;
81            let mut elems = Vec::with_capacity(n);
82            for _ in 0..n {
83                elems.push(unflatten_one(cells, pos));
84            }
85            Value::Compound(kind, elems)
86        }
87    }
88}
89
90/// Skip past one logical value in a cell slice, advancing `pos`.
91fn skip_one(cells: &[Cell], pos: &mut usize) {
92    match &cells[*pos] {
93        Cell::Val(_) => *pos += 1,
94        Cell::CompoundStart(_, n) => {
95            let n = *n;
96            *pos += 1;
97            for _ in 0..n {
98                skip_one(cells, pos);
99            }
100        }
101    }
102}
103
104/// Reconstruct a `Vec<Value>` tuple from flat cells, reading `n_cols` logical values.
105fn unflatten_tuple(cells: &[Cell], n_cols: usize) -> Vec<Value> {
106    let mut pos = 0;
107    let mut tuple = Vec::with_capacity(n_cols);
108    for _ in 0..n_cols {
109        tuple.push(unflatten_one(cells, &mut pos));
110    }
111    tuple
112}
113
114/// A simple in-memory implementation of `Store`.
115/// Supports semi-naive evaluation by tracking "stable" and "delta" facts.
116///
117/// Compound values are stored inline: each `Value::Compound` is flattened into
118/// a `CompoundStart(n)` marker followed by its elements. This avoids nested
119/// heap allocations in stored tuples and enables future per-field indexing.
120#[derive(Default)]
121pub struct MemStore {
122    // Stable facts from previous iterations
123    stable: HashMap<String, Vec<Vec<Cell>>>,
124    // New facts from the current iteration
125    delta: HashMap<String, Vec<Vec<Cell>>>,
126    // Facts being collected for the next iteration
127    next_delta: HashMap<String, Vec<Vec<Cell>>>,
128
129    // Number of logical columns per relation (set on first insert).
130    arity: HashMap<String, usize>,
131
132    // Secondary indexes: (relation_name, col_idx) -> { Cell -> [row_indices] }
133    // Indexes use the first Cell of each logical column (scalars: the value itself,
134    // compounds: CompoundStart(n)). This enables index lookups on scalar columns.
135    stable_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
136    delta_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
137}
138
139impl MemStore {
140    pub fn new() -> Self {
141        Self::default()
142    }
143
144    /// Registers a relation (creating it if absent) to allow scanning it.
145    pub fn create_relation(&mut self, relation: &str) {
146        self.stable.entry(relation.to_string()).or_default();
147    }
148
149    /// Add a fact manually (for testing/setup). Auto-creates relation in stable.
150    pub fn add_fact(&mut self, relation: &str, args: Vec<Value>) {
151        let n_cols = args.len();
152        let cells = flatten_tuple(&args);
153        let table = self.stable.entry(relation.to_string()).or_default();
154        if !table.contains(&cells) {
155            let row_idx = table.len();
156            self.arity.entry(relation.to_string()).or_insert(n_cols);
157            table.push(cells.clone());
158            index_cells(&mut self.stable_indexes, relation, &cells, n_cols, row_idx);
159        }
160    }
161
162    /// Rebuilds stable and delta indexes for a single relation after mutation.
163    fn rebuild_indexes_for(&mut self, relation: &str) {
164        self.stable_indexes.retain(|(rel, _), _| rel != relation);
165        self.delta_indexes.retain(|(rel, _), _| rel != relation);
166
167        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
168
169        if let Some(table) = self.stable.get(relation) {
170            for (row_idx, cells) in table.iter().enumerate() {
171                index_cells(&mut self.stable_indexes, relation, cells, n_cols, row_idx);
172            }
173        }
174
175        if let Some(table) = self.delta.get(relation) {
176            for (row_idx, cells) in table.iter().enumerate() {
177                index_cells(&mut self.delta_indexes, relation, cells, n_cols, row_idx);
178            }
179        }
180    }
181
182    /// Unflatten stored cells into Values using the relation's arity.
183    fn to_values(&self, relation: &str, cells: &[Cell]) -> Vec<Value> {
184        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
185        if n_cols == 0 {
186            // Fallback: treat each cell as a scalar
187            cells
188                .iter()
189                .map(|c| match c {
190                    Cell::Val(v) => v.clone(),
191                    Cell::CompoundStart(_, _) => Value::Null,
192                })
193                .collect()
194        } else {
195            unflatten_tuple(cells, n_cols)
196        }
197    }
198
199    pub fn get_facts(&self, relation: &str) -> Vec<Vec<Value>> {
200        let mut all: Vec<Vec<Value>> = self
201            .stable
202            .get(relation)
203            .into_iter()
204            .flatten()
205            .map(|cells| self.to_values(relation, cells))
206            .collect();
207        if let Some(d) = self.delta.get(relation) {
208            for cells in d {
209                all.push(self.to_values(relation, cells));
210            }
211        }
212        all
213    }
214
215    /// Coalesce temporal intervals for a relation.
216    /// Groups facts by their non-temporal columns (all except the last 2),
217    /// sorts intervals by start time, and merges overlapping/adjacent intervals.
218    /// This prevents interval explosion in recursive temporal rules.
219    pub fn coalesce_temporal(&mut self, relation: &str) {
220        let n_cols = match self.arity.get(relation) {
221            Some(&n) if n >= 2 => n,
222            _ => return,
223        };
224        let key_cols = n_cols - 2; // non-temporal columns
225
226        // Collect all facts (stable + delta) as Value tuples
227        let mut all_facts: Vec<Vec<Value>> = Vec::new();
228        if let Some(table) = self.stable.get(relation) {
229            for cells in table {
230                all_facts.push(unflatten_tuple(cells, n_cols));
231            }
232        }
233        if let Some(table) = self.delta.get(relation) {
234            for cells in table {
235                all_facts.push(unflatten_tuple(cells, n_cols));
236            }
237        }
238
239        if all_facts.is_empty() {
240            return;
241        }
242
243        // Group by non-temporal key columns
244        let mut groups: HashMap<Vec<Value>, Vec<(i64, i64)>> = HashMap::new();
245        for fact in &all_facts {
246            let key: Vec<Value> = fact[..key_cols].to_vec();
247            let start = match &fact[key_cols] {
248                Value::Time(t) => *t,
249                Value::Number(n) => *n,
250                _ => continue,
251            };
252            let end = match &fact[key_cols + 1] {
253                Value::Time(t) => *t,
254                Value::Number(n) => *n,
255                _ => continue,
256            };
257            groups.entry(key).or_default().push((start, end));
258        }
259
260        // Coalesce intervals within each group
261        let mut coalesced_facts: Vec<Vec<Value>> = Vec::new();
262        for (key, mut intervals) in groups {
263            intervals.sort_by_key(|&(s, _)| s);
264            let mut merged: Vec<(i64, i64)> = vec![intervals[0]];
265            for &(s, e) in &intervals[1..] {
266                let last = merged.last_mut().unwrap();
267                // Merge if overlapping or adjacent (within 1 nanosecond)
268                if s <= last.1.saturating_add(1) {
269                    last.1 = last.1.max(e);
270                } else {
271                    merged.push((s, e));
272                }
273            }
274            for (start, end) in merged {
275                let mut fact = key.clone();
276                fact.push(Value::Time(start));
277                fact.push(Value::Time(end));
278                coalesced_facts.push(fact);
279            }
280        }
281
282        // Replace stable with coalesced facts, clear delta
283        let coalesced_cells: Vec<Vec<Cell>> = coalesced_facts
284            .iter()
285            .map(|fact| flatten_tuple(fact))
286            .collect();
287        self.stable.insert(relation.to_string(), coalesced_cells);
288        self.delta.remove(relation);
289        self.next_delta.remove(relation);
290        self.rebuild_indexes_for(relation);
291    }
292}
293
294/// Index a flattened tuple's logical columns.
295fn index_cells(
296    indexes: &mut HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
297    relation: &str,
298    cells: &[Cell],
299    n_cols: usize,
300    row_idx: usize,
301) {
302    let mut pos = 0;
303    for col_idx in 0..n_cols {
304        let key = cells[pos].clone();
305        skip_one(cells, &mut pos);
306        indexes
307            .entry((relation.to_string(), col_idx))
308            .or_default()
309            .entry(key)
310            .or_default()
311            .push(row_idx);
312    }
313}
314
315/// Convert a Value to its index Cell key (for index lookup matching).
316fn value_to_index_key(v: &Value) -> Cell {
317    match v {
318        Value::Compound(kind, elems) => Cell::CompoundStart(*kind, elems.len()),
319        _ => Cell::Val(v.clone()),
320    }
321}
322
323impl Store for MemStore {
324    fn scan(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
325        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
326        let s = self
327            .stable
328            .get(relation)
329            .into_iter()
330            .flatten()
331            .map(move |cells| unflatten_tuple(cells, n_cols));
332        let d = self
333            .delta
334            .get(relation)
335            .into_iter()
336            .flatten()
337            .map(move |cells| unflatten_tuple(cells, n_cols));
338        Ok(Box::new(s.chain(d)))
339    }
340
341    fn scan_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
342        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
343        match self.delta.get(relation) {
344            Some(tuples) => Ok(Box::new(
345                tuples
346                    .iter()
347                    .map(move |cells| unflatten_tuple(cells, n_cols)),
348            )),
349            None => Ok(Box::new(std::iter::empty())),
350        }
351    }
352
353    fn scan_next_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
354        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
355        match self.next_delta.get(relation) {
356            Some(tuples) => Ok(Box::new(
357                tuples
358                    .iter()
359                    .map(move |cells| unflatten_tuple(cells, n_cols)),
360            )),
361            None => Ok(Box::new(std::iter::empty())),
362        }
363    }
364
365    fn scan_index(
366        &self,
367        relation: &str,
368        col_idx: usize,
369        key: &Value,
370    ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
371        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
372        let cell_key = value_to_index_key(key);
373        let mut results: Vec<Vec<Value>> = Vec::new();
374
375        if let Some(idx_map) = self.stable_indexes.get(&(relation.to_string(), col_idx))
376            && let Some(row_indices) = idx_map.get(&cell_key)
377            && let Some(table) = self.stable.get(relation)
378        {
379            for &i in row_indices {
380                results.push(unflatten_tuple(&table[i], n_cols));
381            }
382        }
383
384        if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
385            && let Some(row_indices) = idx_map.get(&cell_key)
386            && let Some(table) = self.delta.get(relation)
387        {
388            for &i in row_indices {
389                results.push(unflatten_tuple(&table[i], n_cols));
390            }
391        }
392
393        Ok(Box::new(results.into_iter()))
394    }
395
396    fn scan_delta_index(
397        &self,
398        relation: &str,
399        col_idx: usize,
400        key: &Value,
401    ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
402        let n_cols = self.arity.get(relation).copied().unwrap_or(0);
403        let cell_key = value_to_index_key(key);
404        let mut results: Vec<Vec<Value>> = Vec::new();
405
406        if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
407            && let Some(row_indices) = idx_map.get(&cell_key)
408            && let Some(table) = self.delta.get(relation)
409        {
410            for &i in row_indices {
411                results.push(unflatten_tuple(&table[i], n_cols));
412            }
413        }
414
415        Ok(Box::new(results.into_iter()))
416    }
417
418    fn insert(&mut self, relation: &str, tuple: Vec<Value>) -> Result<bool> {
419        let n_cols = tuple.len();
420        let cells = flatten_tuple(&tuple);
421
422        // Check if fact is already in stable, delta, or next_delta
423        if self
424            .stable
425            .get(relation)
426            .is_some_and(|v| v.contains(&cells))
427            || self
428                .delta
429                .get(relation)
430                .is_some_and(|v| v.contains(&cells))
431            || self
432                .next_delta
433                .get(relation)
434                .is_some_and(|v| v.contains(&cells))
435        {
436            return Ok(false);
437        }
438
439        self.arity.entry(relation.to_string()).or_insert(n_cols);
440        self.next_delta
441            .entry(relation.to_string())
442            .or_default()
443            .push(cells);
444        Ok(true)
445    }
446
447    fn merge_deltas(&mut self) {
448        // 1. Move current delta to stable
449        for (rel_name, mut tuples) in self.delta.drain() {
450            let n_cols = self.arity.get(&rel_name).copied().unwrap_or(0);
451            let table = self.stable.entry(rel_name.clone()).or_default();
452            for cells in tuples.drain(..) {
453                let row_idx = table.len();
454                index_cells(
455                    &mut self.stable_indexes,
456                    &rel_name,
457                    &cells,
458                    n_cols,
459                    row_idx,
460                );
461                table.push(cells);
462            }
463        }
464        self.delta_indexes.clear();
465
466        // 2. Move next_delta to delta and build delta index
467        self.delta = std::mem::take(&mut self.next_delta);
468        for (rel_name, tuples) in &self.delta {
469            let n_cols = self.arity.get(rel_name).copied().unwrap_or(0);
470            for (row_idx, cells) in tuples.iter().enumerate() {
471                index_cells(
472                    &mut self.delta_indexes,
473                    rel_name,
474                    cells,
475                    n_cols,
476                    row_idx,
477                );
478            }
479        }
480    }
481
482    fn create_relation(&mut self, relation: &str) {
483        self.stable.entry(relation.to_string()).or_default();
484    }
485
486    fn retract(&mut self, relation: &str, tuple: &[Value]) -> Result<bool> {
487        let cells = flatten_tuple(tuple);
488        let removed = if let Some(table) = self.stable.get_mut(relation) {
489            if let Some(pos) = table.iter().position(|t| *t == cells) {
490                table.swap_remove(pos);
491                true
492            } else {
493                false
494            }
495        } else {
496            false
497        };
498
499        // Also remove from delta and next_delta
500        if let Some(table) = self.delta.get_mut(relation) {
501            if let Some(pos) = table.iter().position(|t| *t == cells) {
502                table.swap_remove(pos);
503            }
504        }
505        if let Some(table) = self.next_delta.get_mut(relation) {
506            if let Some(pos) = table.iter().position(|t| *t == cells) {
507                table.swap_remove(pos);
508            }
509        }
510
511        if removed {
512            self.rebuild_indexes_for(relation);
513        }
514        Ok(removed)
515    }
516
517    fn clear(&mut self, relation: &str) {
518        if let Some(table) = self.stable.get_mut(relation) {
519            table.clear();
520        }
521        if let Some(table) = self.delta.get_mut(relation) {
522            table.clear();
523        }
524        if let Some(table) = self.next_delta.get_mut(relation) {
525            table.clear();
526        }
527        self.stable_indexes.retain(|(rel, _), _| rel != relation);
528        self.delta_indexes.retain(|(rel, _), _| rel != relation);
529    }
530
531    fn relation_names(&self) -> Vec<String> {
532        let mut names: Vec<String> = self.stable.keys().cloned().collect();
533        for key in self.delta.keys() {
534            if !names.contains(key) {
535                names.push(key.clone());
536            }
537        }
538        names
539    }
540
541    fn coalesce_temporal(&mut self, relation: &str) {
542        MemStore::coalesce_temporal(self, relation);
543    }
544}
545
546/// A record of one derivation: which fact was derived and which premises were used.
547#[derive(Debug, Clone)]
548pub struct ProvenanceEntry {
549    /// The derived fact: (relation_name, tuple).
550    pub derived: (String, Vec<Value>),
551    /// The premise facts that contributed: (relation_name, tuple) for each.
552    pub premises: Vec<(String, Vec<Value>)>,
553}
554
555/// Lightweight recorder that captures derivation provenance during execution.
556///
557/// When enabled on the interpreter, every successful `Op::Insert` records
558/// which premise facts (from enclosing `Op::Iterate` scans) contributed to
559/// the derivation. When disabled (the default), there is zero overhead.
560#[derive(Default)]
561pub struct ProvenanceRecorder {
562    /// All recorded derivations.
563    pub entries: Vec<ProvenanceEntry>,
564    /// Stack of currently-active scan sources (pushed on Iterate, popped after).
565    active_premises: Vec<(String, Vec<Value>)>,
566}
567
568/// A pure Rust interpreter for Mangle IR.
569pub struct Interpreter<'a> {
570    ir: &'a Ir,
571    store: Box<dyn Store + 'a>,
572    provenance: Option<ProvenanceRecorder>,
573}
574
575struct Env {
576    vars: HashMap<NameId, Value>,
577}
578
579impl Env {
580    fn new() -> Self {
581        Self {
582            vars: HashMap::new(),
583        }
584    }
585}
586
587impl<'a> Interpreter<'a> {
588    pub fn new(ir: &'a Ir, store: Box<dyn Store + 'a>) -> Self {
589        Self {
590            ir,
591            store,
592            provenance: None,
593        }
594    }
595
596    /// Enable provenance recording. When set, every successful insert
597    /// records which premise facts were in the current environment.
598    pub fn with_provenance(mut self) -> Self {
599        self.provenance = Some(ProvenanceRecorder::default());
600        self
601    }
602
603    /// Helper to get the underlying store (e.g. to inspect results).
604    pub fn store(&self) -> &dyn Store {
605        &*self.store
606    }
607
608    /// Helper to get the underlying store mutably.
609    pub fn store_mut(&mut self) -> &mut dyn Store {
610        &mut *self.store
611    }
612
613    /// Consumes the interpreter and returns the underlying store.
614    pub fn into_store(self) -> Box<dyn Store + 'a> {
615        self.store
616    }
617
618    /// Consume the interpreter, returning the provenance recorder if enabled.
619    pub fn into_provenance(self) -> Option<ProvenanceRecorder> {
620        self.provenance
621    }
622
623    /// Consume the interpreter, returning the store and optional provenance.
624    pub fn into_parts(self) -> (Box<dyn Store + 'a>, Option<ProvenanceRecorder>) {
625        (self.store, self.provenance)
626    }
627
628    /// Executes the operation and returns the number of facts inserted.
629    pub fn execute(&mut self, op: &Op) -> Result<usize> {
630        let mut env = Env::new();
631        self.exec_op(op, &mut env)
632    }
633
634    fn exec_op(&mut self, op: &Op, env: &mut Env) -> Result<usize> {
635        match op {
636            Op::Nop => Ok(0),
637            Op::Seq(ops) => {
638                let mut count = 0;
639                for o in ops {
640                    count += self.exec_op(o, env)?;
641                }
642                Ok(count)
643            }
644            Op::Iterate { source, body } => {
645                let mut count = 0;
646                match source {
647                    DataSource::Scan { relation, vars } => {
648                        let rel_name = self.ir.resolve_name(*relation);
649                        let iter = self.store.scan(rel_name)?;
650                        let tuples: Vec<_> = iter.collect();
651
652                        for tuple in tuples {
653                            if tuple.len() != vars.len() {
654                                continue;
655                            }
656                            for (i, var) in vars.iter().enumerate() {
657                                env.vars.insert(*var, tuple[i].clone());
658                            }
659                            if let Some(ref mut prov) = self.provenance {
660                                prov.active_premises
661                                    .push((rel_name.to_string(), tuple.clone()));
662                            }
663                            count += self.exec_op(body, env)?;
664                            if self.provenance.is_some() {
665                                self.provenance.as_mut().unwrap().active_premises.pop();
666                            }
667                        }
668                    }
669                    DataSource::ScanDelta { relation, vars } => {
670                        let rel_name = self.ir.resolve_name(*relation);
671                        let iter = self.store.scan_delta(rel_name)?;
672                        let tuples: Vec<_> = iter.collect();
673
674                        for tuple in tuples {
675                            if tuple.len() != vars.len() {
676                                continue;
677                            }
678                            for (i, var) in vars.iter().enumerate() {
679                                env.vars.insert(*var, tuple[i].clone());
680                            }
681                            if let Some(ref mut prov) = self.provenance {
682                                prov.active_premises
683                                    .push((rel_name.to_string(), tuple.clone()));
684                            }
685                            count += self.exec_op(body, env)?;
686                            if self.provenance.is_some() {
687                                self.provenance.as_mut().unwrap().active_premises.pop();
688                            }
689                        }
690                    }
691                    DataSource::IndexLookup {
692                        relation,
693                        col_idx,
694                        key,
695                        vars,
696                    } => {
697                        let rel_name = self.ir.resolve_name(*relation);
698                        let key_val = self.eval_operand(key, env)?;
699
700                        let iter = self.store.scan_index(rel_name, *col_idx, &key_val)?;
701                        let tuples: Vec<_> = iter.collect();
702
703                        for tuple in tuples {
704                            if tuple.len() != vars.len() {
705                                continue;
706                            }
707                            for (i, var) in vars.iter().enumerate() {
708                                env.vars.insert(*var, tuple[i].clone());
709                            }
710                            if let Some(ref mut prov) = self.provenance {
711                                prov.active_premises
712                                    .push((rel_name.to_string(), tuple.clone()));
713                            }
714                            count += self.exec_op(body, env)?;
715                            if self.provenance.is_some() {
716                                self.provenance.as_mut().unwrap().active_premises.pop();
717                            }
718                        }
719                    }
720                }
721                Ok(count)
722            }
723            Op::Filter { cond, body } => {
724                if self.eval_cond(cond, env)? {
725                    self.exec_op(body, env)
726                } else {
727                    Ok(0)
728                }
729            }
730            Op::Insert { relation, args } => {
731                let rel_name = self.ir.resolve_name(*relation);
732                let mut tuple = Vec::new();
733                for arg in args {
734                    tuple.push(self.eval_operand(arg, env)?);
735                }
736                let is_new = self.store.insert(rel_name, tuple.clone())?;
737                if is_new {
738                    if let Some(ref mut prov) = self.provenance {
739                        prov.entries.push(ProvenanceEntry {
740                            derived: (rel_name.to_string(), tuple),
741                            premises: prov.active_premises.clone(),
742                        });
743                    }
744                    Ok(1)
745                } else {
746                    Ok(0)
747                }
748            }
749            Op::Let { var, expr, body } => {
750                let val = self.eval_expr(expr, env)?;
751                env.vars.insert(*var, val);
752                self.exec_op(body, env)
753            }
754            Op::GroupBy {
755                source,
756                vars,
757                keys,
758                aggregates,
759                body,
760            } => {
761                let rel_name = self.ir.resolve_name(*source);
762
763                // For GroupBy, we must scan ALL available facts including ones just produced in this stratum
764                // if we want to match Go implementation's behavior for non-recursive strata.
765                let iter = self.store.scan(rel_name)?;
766                let mut tuples: Vec<_> = iter.collect();
767
768                // Also scan next_delta if it's the same relation
769                if let Ok(nd_iter) = self.store.scan_next_delta(rel_name) {
770                    tuples.extend(nd_iter);
771                }
772
773                let mut groups: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
774
775                for tuple in tuples {
776                    if tuple.len() != vars.len() {
777                        continue;
778                    }
779                    // Temporarily bind variables to extract key
780                    for (i, var) in vars.iter().enumerate() {
781                        env.vars.insert(*var, tuple[i].clone());
782                    }
783
784                    let mut key = Vec::new();
785                    for k in keys {
786                        if let Some(val) = env.vars.get(k) {
787                            key.push(val.clone());
788                        } else {
789                            // Should not happen if well-typed
790                            key.push(Value::Null);
791                        }
792                    }
793                    groups.entry(key).or_default().push(tuple);
794                }
795
796                let mut count = 0;
797                for (key, group_tuples) in groups {
798                    // Bind keys
799                    for (i, k) in keys.iter().enumerate() {
800                        env.vars.insert(*k, key[i].clone());
801                    }
802
803                    // Compute aggregates
804                    for agg in aggregates {
805                        let val = self.eval_aggregate(agg, &group_tuples, vars, env)?;
806                        env.vars.insert(agg.var, val);
807                    }
808
809                    count += self.exec_op(body, env)?;
810                }
811                Ok(count)
812            }
813        }
814    }
815
816    fn eval_aggregate(
817        &self,
818        agg: &Aggregate,
819        group: &[Vec<Value>],
820        vars: &[NameId],
821        env: &mut Env,
822    ) -> Result<Value> {
823        let fn_name = self.ir.resolve_name(agg.func);
824        match fn_name {
825            "fn:count" => Ok(Value::Number(group.len() as i64)),
826            "fn:sum" => {
827                let arg = agg
828                    .args
829                    .first()
830                    .ok_or_else(|| anyhow!("fn:sum requires 1 argument"))?;
831
832                let mut int_sum: i64 = 0;
833                for tuple in group {
834                    for (i, var) in vars.iter().enumerate() {
835                        env.vars.insert(*var, tuple[i].clone());
836                    }
837                    let val = self.eval_operand(arg, env)?;
838                    match val {
839                        Value::Number(n) => int_sum += n,
840                        _ => return Err(anyhow!("fn:sum: expected integer, got {val}")),
841                    }
842                }
843                Ok(Value::Number(int_sum))
844            }
845            "fn:float:sum" => {
846                let arg = agg
847                    .args
848                    .first()
849                    .ok_or_else(|| anyhow!("fn:float:sum requires 1 argument"))?;
850
851                let mut float_sum: f64 = 0.0;
852                for tuple in group {
853                    for (i, var) in vars.iter().enumerate() {
854                        env.vars.insert(*var, tuple[i].clone());
855                    }
856                    let val = self.eval_operand(arg, env)?;
857                    float_sum += value_as_float(&val)?;
858                }
859                Ok(Value::Float(float_sum))
860            }
861            "fn:max" => {
862                let mut max_val = None;
863                let arg = agg
864                    .args
865                    .first()
866                    .ok_or_else(|| anyhow!("fn:max requires 1 argument"))?;
867
868                for tuple in group {
869                    for (i, var) in vars.iter().enumerate() {
870                        env.vars.insert(*var, tuple[i].clone());
871                    }
872                    let val = self.eval_operand(arg, env)?;
873                    match max_val {
874                        None => max_val = Some(val),
875                        Some(ref m) => {
876                            if val > *m {
877                                max_val = Some(val);
878                            }
879                        }
880                    }
881                }
882                max_val.ok_or_else(|| anyhow!("fn:max on empty group"))
883            }
884            "fn:min" => {
885                let mut min_val = None;
886                let arg = agg
887                    .args
888                    .first()
889                    .ok_or_else(|| anyhow!("fn:min requires 1 argument"))?;
890
891                for tuple in group {
892                    for (i, var) in vars.iter().enumerate() {
893                        env.vars.insert(*var, tuple[i].clone());
894                    }
895                    let val = self.eval_operand(arg, env)?;
896                    match min_val {
897                        None => min_val = Some(val),
898                        Some(ref m) => {
899                            if val < *m {
900                                min_val = Some(val);
901                            }
902                        }
903                    }
904                }
905                min_val.ok_or_else(|| anyhow!("fn:min on empty group"))
906            }
907            "fn:float:max" => {
908                let mut max_val: Option<f64> = None;
909                let arg = agg
910                    .args
911                    .first()
912                    .ok_or_else(|| anyhow!("fn:float:max requires 1 argument"))?;
913
914                for tuple in group {
915                    for (i, var) in vars.iter().enumerate() {
916                        env.vars.insert(*var, tuple[i].clone());
917                    }
918                    let val = self.eval_operand(arg, env)?;
919                    let f = value_as_float(&val)?;
920                    max_val = Some(match max_val {
921                        None => f,
922                        Some(m) => f.max(m),
923                    });
924                }
925                max_val
926                    .map(Value::Float)
927                    .ok_or_else(|| anyhow!("fn:float:max on empty group"))
928            }
929            "fn:float:min" => {
930                let mut min_val: Option<f64> = None;
931                let arg = agg
932                    .args
933                    .first()
934                    .ok_or_else(|| anyhow!("fn:float:min requires 1 argument"))?;
935
936                for tuple in group {
937                    for (i, var) in vars.iter().enumerate() {
938                        env.vars.insert(*var, tuple[i].clone());
939                    }
940                    let val = self.eval_operand(arg, env)?;
941                    let f = value_as_float(&val)?;
942                    min_val = Some(match min_val {
943                        None => f,
944                        Some(m) => f.min(m),
945                    });
946                }
947                min_val
948                    .map(Value::Float)
949                    .ok_or_else(|| anyhow!("fn:float:min on empty group"))
950            }
951            _ => Err(anyhow!("Unknown aggregation function: {fn_name}")),
952        }
953    }
954
955    fn eval_cond(&self, cond: &Condition, env: &Env) -> Result<bool> {
956        match cond {
957            Condition::Cmp { op, left, right } => {
958                let l = self.eval_operand(left, env)?;
959                let r = self.eval_operand(right, env)?;
960                match op {
961                    CmpOp::Eq => Ok(l == r),
962                    CmpOp::Neq => Ok(l != r),
963                    CmpOp::Lt => Ok(l < r),
964                    CmpOp::Le => Ok(l <= r),
965                    CmpOp::Gt => Ok(l > r),
966                    CmpOp::Ge => Ok(l >= r),
967                }
968            }
969            Condition::Negation { relation, args } => {
970                let rel_name = self.ir.resolve_name(*relation);
971                let iter = self.store.scan(rel_name)?;
972                for tuple in iter {
973                    let mut mat = true;
974                    if tuple.len() != args.len() {
975                        continue;
976                    }
977                    for (i, arg) in args.iter().enumerate() {
978                        let val = self.eval_operand(arg, env)?;
979                        if tuple[i] != val {
980                            mat = false;
981                            break;
982                        }
983                    }
984                    if mat {
985                        return Ok(false); // Found match, negation fails
986                    }
987                }
988                Ok(true) // No match found
989            }
990            Condition::Call { function, args } => {
991                let fn_name = self.ir.resolve_name(*function);
992                let mut vals = Vec::new();
993                for arg in args {
994                    vals.push(self.eval_operand(arg, env)?);
995                }
996                self.eval_builtin_predicate(fn_name, &vals)
997            }
998        }
999    }
1000
1001    fn eval_builtin_predicate(&self, name: &str, vals: &[Value]) -> Result<bool> {
1002        match name {
1003            ":string:starts_with" => match (&vals[0], &vals[1]) {
1004                (Value::String(s), Value::String(p)) => Ok(s.starts_with(p.as_str())),
1005                _ => Err(anyhow!(":string:starts_with: expected string arguments")),
1006            },
1007            ":string:ends_with" => match (&vals[0], &vals[1]) {
1008                (Value::String(s), Value::String(p)) => Ok(s.ends_with(p.as_str())),
1009                _ => Err(anyhow!(":string:ends_with: expected string arguments")),
1010            },
1011            ":string:contains" => match (&vals[0], &vals[1]) {
1012                (Value::String(s), Value::String(p)) => Ok(s.contains(p.as_str())),
1013                _ => Err(anyhow!(":string:contains: expected string arguments")),
1014            },
1015            ":match_prefix" => match (&vals[0], &vals[1]) {
1016                (Value::String(name), Value::String(prefix)) => {
1017                    Ok(name.starts_with(prefix.as_str()) && name.len() > prefix.len())
1018                }
1019                _ => Err(anyhow!(":match_prefix: expected string arguments")),
1020            },
1021            _ => Err(anyhow!("Unknown built-in predicate: {name}")),
1022        }
1023    }
1024
1025    fn eval_expr(&self, expr: &Expr, env: &Env) -> Result<Value> {
1026        match expr {
1027            Expr::Value(op) => self.eval_operand(op, env),
1028            Expr::Call { function, args } => {
1029                let fn_name = self.ir.resolve_name(*function);
1030                let mut vals = Vec::new();
1031                for arg in args {
1032                    vals.push(self.eval_operand(arg, env)?);
1033                }
1034                eval_function(fn_name, &vals)
1035            }
1036        }
1037    }
1038
1039    fn eval_operand(&self, op: &Operand, env: &Env) -> Result<Value> {
1040        match op {
1041            Operand::Var(v) => env
1042                .vars
1043                .get(v)
1044                .cloned()
1045                .ok_or_else(|| anyhow!("Variable not found")),
1046            Operand::Const(c) => match c {
1047                Constant::Number(n) => Ok(Value::Number(*n)),
1048                Constant::Float(f) => Ok(Value::Float(*f)),
1049                Constant::String(sid) => {
1050                    Ok(Value::String(self.ir.resolve_string(*sid).to_string()))
1051                }
1052                Constant::Name(nid) => Ok(Value::String(self.ir.resolve_name(*nid).to_string())),
1053                Constant::Time(t) => Ok(Value::Time(*t)),
1054                Constant::Duration(d) => Ok(Value::Duration(*d)),
1055            },
1056        }
1057    }
1058}
1059
1060// --- Helper functions ---
1061
1062fn value_as_float(v: &Value) -> Result<f64> {
1063    match v {
1064        Value::Float(f) => Ok(*f),
1065        Value::Number(n) => Ok(*n as f64),
1066        _ => Err(anyhow!("expected numeric value, got {v}")),
1067    }
1068}
1069
1070fn value_to_string(v: &Value) -> String {
1071    match v {
1072        Value::Number(n) => n.to_string(),
1073        Value::Float(f) => format!("{f}"),
1074        Value::String(s) => s.clone(),
1075        Value::Time(t) => format!("{}", Value::Time(*t)),
1076        Value::Duration(d) => format!("{}", Value::Duration(*d)),
1077        Value::Compound(kind, elems) => format!("{}", Value::Compound(*kind, elems.clone())),
1078        Value::Null => "null".to_string(),
1079    }
1080}
1081
1082/// Evaluate a built-in function call.
1083pub fn eval_function(fn_name: &str, vals: &[Value]) -> Result<Value> {
1084    match fn_name {
1085        // --- Integer arithmetic (variadic) ---
1086        "fn:plus" => {
1087            let mut sum: i64 = 0;
1088            for v in vals {
1089                match v {
1090                    Value::Number(n) => sum += n,
1091                    _ => return Err(anyhow!("fn:plus: expected integer, got {v}")),
1092                }
1093            }
1094            Ok(Value::Number(sum))
1095        }
1096        "fn:minus" => {
1097            if vals.is_empty() {
1098                return Err(anyhow!("fn:minus: requires at least 1 argument"));
1099            }
1100            let first = match &vals[0] {
1101                Value::Number(n) => *n,
1102                v => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1103            };
1104            if vals.len() == 1 {
1105                return Ok(Value::Number(-first));
1106            }
1107            let mut result = first;
1108            for v in &vals[1..] {
1109                match v {
1110                    Value::Number(n) => result -= n,
1111                    _ => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1112                }
1113            }
1114            Ok(Value::Number(result))
1115        }
1116        "fn:mult" => {
1117            let mut product: i64 = 1;
1118            for v in vals {
1119                match v {
1120                    Value::Number(n) => product *= n,
1121                    _ => return Err(anyhow!("fn:mult: expected integer, got {v}")),
1122                }
1123            }
1124            Ok(Value::Number(product))
1125        }
1126        "fn:div" => {
1127            if vals.is_empty() {
1128                return Err(anyhow!("fn:div: requires at least 1 argument"));
1129            }
1130            let first = match &vals[0] {
1131                Value::Number(n) => *n,
1132                v => return Err(anyhow!("fn:div: expected integer, got {v}")),
1133            };
1134            if vals.len() == 1 {
1135                if first == 0 {
1136                    return Err(anyhow!("Division by zero in fn:div"));
1137                }
1138                return Ok(Value::Number(1 / first));
1139            }
1140            let mut result = first;
1141            for v in &vals[1..] {
1142                match v {
1143                    Value::Number(0) => return Err(anyhow!("Division by zero in fn:div")),
1144                    Value::Number(n) => {
1145                        result /= n;
1146                        if result == 0 {
1147                            return Ok(Value::Number(0));
1148                        }
1149                    }
1150                    _ => return Err(anyhow!("fn:div: expected integer, got {v}")),
1151                }
1152            }
1153            Ok(Value::Number(result))
1154        }
1155
1156        // --- Float arithmetic (variadic, accepts Number via promotion) ---
1157        "fn:float:plus" => {
1158            let mut sum: f64 = 0.0;
1159            for v in vals {
1160                sum += value_as_float(v)?;
1161            }
1162            Ok(Value::Float(sum))
1163        }
1164        "fn:float:minus" => {
1165            if vals.is_empty() {
1166                return Err(anyhow!("fn:float:minus: requires at least 1 argument"));
1167            }
1168            let first = value_as_float(&vals[0])?;
1169            if vals.len() == 1 {
1170                return Ok(Value::Float(-first));
1171            }
1172            let mut result = first;
1173            for v in &vals[1..] {
1174                result -= value_as_float(v)?;
1175            }
1176            Ok(Value::Float(result))
1177        }
1178        "fn:float:mult" => {
1179            let mut product: f64 = 1.0;
1180            for v in vals {
1181                product *= value_as_float(v)?;
1182            }
1183            Ok(Value::Float(product))
1184        }
1185        "fn:float:div" => {
1186            if vals.is_empty() {
1187                return Err(anyhow!("fn:float:div: requires at least 1 argument"));
1188            }
1189            let first = value_as_float(&vals[0])?;
1190            if vals.len() == 1 {
1191                if first == 0.0 {
1192                    return Err(anyhow!("Division by zero in fn:float:div"));
1193                }
1194                return Ok(Value::Float(1.0 / first));
1195            }
1196            let mut result = first;
1197            for v in &vals[1..] {
1198                let d = value_as_float(v)?;
1199                if d == 0.0 {
1200                    return Err(anyhow!("Division by zero in fn:float:div"));
1201                }
1202                result /= d;
1203            }
1204            Ok(Value::Float(result))
1205        }
1206        "fn:sqrt" => {
1207            if vals.len() != 1 {
1208                return Err(anyhow!("fn:sqrt: requires exactly 1 argument"));
1209            }
1210            let f = value_as_float(&vals[0])?;
1211            Ok(Value::Float(f.sqrt()))
1212        }
1213
1214        // --- String functions ---
1215        "fn:string:concat" => {
1216            let mut result = String::new();
1217            for v in vals {
1218                result.push_str(&value_to_string(v));
1219            }
1220            Ok(Value::String(result))
1221        }
1222        "fn:string:replace" => {
1223            if vals.len() != 4 {
1224                return Err(anyhow!("fn:string:replace: requires 4 arguments (string, old, new, count)"));
1225            }
1226            let s = match &vals[0] {
1227                Value::String(s) => s,
1228                v => return Err(anyhow!("fn:string:replace: first arg must be string, got {v}")),
1229            };
1230            let old = match &vals[1] {
1231                Value::String(s) => s,
1232                v => return Err(anyhow!("fn:string:replace: second arg must be string, got {v}")),
1233            };
1234            let new_s = match &vals[2] {
1235                Value::String(s) => s,
1236                v => return Err(anyhow!("fn:string:replace: third arg must be string, got {v}")),
1237            };
1238            let count = match &vals[3] {
1239                Value::Number(n) => *n,
1240                v => return Err(anyhow!("fn:string:replace: fourth arg must be number, got {v}")),
1241            };
1242            let result = if count < 0 {
1243                s.replace(old.as_str(), new_s.as_str())
1244            } else {
1245                s.replacen(old.as_str(), new_s.as_str(), count as usize)
1246            };
1247            Ok(Value::String(result))
1248        }
1249
1250        // --- Type conversion functions ---
1251        "fn:number:to_string" => {
1252            if vals.len() != 1 {
1253                return Err(anyhow!("fn:number:to_string: requires 1 argument"));
1254            }
1255            match &vals[0] {
1256                Value::Number(n) => Ok(Value::String(n.to_string())),
1257                v => Err(anyhow!("fn:number:to_string: expected number, got {v}")),
1258            }
1259        }
1260        "fn:float64:to_string" => {
1261            if vals.len() != 1 {
1262                return Err(anyhow!("fn:float64:to_string: requires 1 argument"));
1263            }
1264            match &vals[0] {
1265                Value::Float(f) => Ok(Value::String(format!("{f}"))),
1266                v => Err(anyhow!("fn:float64:to_string: expected float, got {v}")),
1267            }
1268        }
1269        "fn:name:to_string" => {
1270            if vals.len() != 1 {
1271                return Err(anyhow!("fn:name:to_string: requires 1 argument"));
1272            }
1273            match &vals[0] {
1274                // Names are stored as String in the Rust Value representation
1275                Value::String(s) => Ok(Value::String(s.clone())),
1276                v => Err(anyhow!("fn:name:to_string: expected name, got {v}")),
1277            }
1278        }
1279
1280        // --- Time functions ---
1281        "fn:time:now" => {
1282            if !vals.is_empty() {
1283                return Err(anyhow!("fn:time:now: takes no arguments"));
1284            }
1285            let nanos = std::time::SystemTime::now()
1286                .duration_since(std::time::UNIX_EPOCH)
1287                .map_err(|e| anyhow!("fn:time:now: {e}"))?
1288                .as_nanos() as i64;
1289            Ok(Value::Time(nanos))
1290        }
1291        "fn:time:add" => {
1292            if vals.len() != 2 {
1293                return Err(anyhow!("fn:time:add: requires 2 arguments (time, duration)"));
1294            }
1295            match (&vals[0], &vals[1]) {
1296                (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t + d)),
1297                _ => Err(anyhow!("fn:time:add: expected (time, duration), got ({}, {})", vals[0], vals[1])),
1298            }
1299        }
1300        "fn:time:sub" => {
1301            if vals.len() != 2 {
1302                return Err(anyhow!("fn:time:sub: requires 2 arguments"));
1303            }
1304            match (&vals[0], &vals[1]) {
1305                (Value::Time(t1), Value::Time(t2)) => Ok(Value::Duration(t1 - t2)),
1306                (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t - d)),
1307                _ => Err(anyhow!("fn:time:sub: expected (time, time) or (time, duration)")),
1308            }
1309        }
1310        "fn:time:year" => time_component(vals, |secs, _| {
1311            let (y, _, _) = civil_from_epoch_secs(secs);
1312            y as i64
1313        }),
1314        "fn:time:month" => time_component(vals, |secs, _| {
1315            let (_, m, _) = civil_from_epoch_secs(secs);
1316            m as i64
1317        }),
1318        "fn:time:day" => time_component(vals, |secs, _| {
1319            let (_, _, d) = civil_from_epoch_secs(secs);
1320            d as i64
1321        }),
1322        "fn:time:hour" => time_component(vals, |secs, _| {
1323            secs.rem_euclid(86400) / 3600
1324        }),
1325        "fn:time:minute" => time_component(vals, |secs, _| {
1326            (secs.rem_euclid(86400) % 3600) / 60
1327        }),
1328        "fn:time:second" => time_component(vals, |secs, _| {
1329            secs.rem_euclid(86400) % 60
1330        }),
1331        "fn:time:from_unix_nanos" => {
1332            if vals.len() != 1 {
1333                return Err(anyhow!("fn:time:from_unix_nanos: requires 1 argument"));
1334            }
1335            match &vals[0] {
1336                Value::Number(n) => Ok(Value::Time(*n)),
1337                v => Err(anyhow!("fn:time:from_unix_nanos: expected number, got {v}")),
1338            }
1339        }
1340        "fn:time:to_unix_nanos" => {
1341            if vals.len() != 1 {
1342                return Err(anyhow!("fn:time:to_unix_nanos: requires 1 argument"));
1343            }
1344            match &vals[0] {
1345                Value::Time(t) => Ok(Value::Number(*t)),
1346                v => Err(anyhow!("fn:time:to_unix_nanos: expected time, got {v}")),
1347            }
1348        }
1349        "fn:time:trunc" => {
1350            if vals.len() != 2 {
1351                return Err(anyhow!("fn:time:trunc: requires 2 arguments (time, unit_name)"));
1352            }
1353            let t = match &vals[0] {
1354                Value::Time(t) => *t,
1355                v => return Err(anyhow!("fn:time:trunc: first arg must be time, got {v}")),
1356            };
1357            let unit_name = match &vals[1] {
1358                Value::String(s) => s.as_str(),
1359                v => return Err(anyhow!("fn:time:trunc: second arg must be name, got {v}")),
1360            };
1361            let d: i64 = match unit_name {
1362                "/nanosecond" => 1,
1363                "/microsecond" => 1_000,
1364                "/millisecond" => 1_000_000,
1365                "/second" => 1_000_000_000,
1366                "/minute" => 60 * 1_000_000_000,
1367                "/hour" => 3600 * 1_000_000_000,
1368                "/day" => 24 * 3600 * 1_000_000_000,
1369                _ => return Err(anyhow!("fn:time:trunc: unknown unit {unit_name:?}")),
1370            };
1371            Ok(Value::Time(t - t.rem_euclid(d)))
1372        }
1373        "fn:time:format" => {
1374            if vals.len() != 2 {
1375                return Err(anyhow!("fn:time:format: requires 2 arguments (time, precision)"));
1376            }
1377            let t = match &vals[0] {
1378                Value::Time(t) => *t,
1379                v => return Err(anyhow!("fn:time:format: first arg must be time, got {v}")),
1380            };
1381            let precision = match &vals[1] {
1382                Value::String(s) => s.as_str(),
1383                v => return Err(anyhow!("fn:time:format: second arg must be name, got {v}")),
1384            };
1385            Ok(Value::String(format_time_with_precision(t, precision)?))
1386        }
1387        "fn:time:format_civil" => {
1388            if vals.len() != 3 {
1389                return Err(anyhow!("fn:time:format_civil: requires 3 arguments (time, timezone, precision)"));
1390            }
1391            let t = match &vals[0] {
1392                Value::Time(t) => *t,
1393                v => return Err(anyhow!("fn:time:format_civil: first arg must be time, got {v}")),
1394            };
1395            let tz = match &vals[1] {
1396                Value::String(s) => s.as_str(),
1397                v => return Err(anyhow!("fn:time:format_civil: second arg must be string, got {v}")),
1398            };
1399            let precision = match &vals[2] {
1400                Value::String(s) => s.as_str(),
1401                v => return Err(anyhow!("fn:time:format_civil: third arg must be name, got {v}")),
1402            };
1403            let offset = parse_timezone_offset(tz)?;
1404            let adjusted = t + offset * 1_000_000_000;
1405            let formatted = format_time_with_precision(adjusted, precision)?;
1406            Ok(Value::String(formatted))
1407        }
1408        "fn:time:parse_rfc3339" => {
1409            if vals.len() != 1 {
1410                return Err(anyhow!("fn:time:parse_rfc3339: requires 1 argument"));
1411            }
1412            match &vals[0] {
1413                Value::String(s) => {
1414                    let nanos = parse_rfc3339_to_nanos(s)?;
1415                    Ok(Value::Time(nanos))
1416                }
1417                v => Err(anyhow!("fn:time:parse_rfc3339: expected string, got {v}")),
1418            }
1419        }
1420        "fn:time:parse_civil" => {
1421            if vals.len() != 2 {
1422                return Err(anyhow!("fn:time:parse_civil: requires 2 arguments (string, timezone)"));
1423            }
1424            let s = match &vals[0] {
1425                Value::String(s) => s.as_str(),
1426                v => return Err(anyhow!("fn:time:parse_civil: first arg must be string, got {v}")),
1427            };
1428            let tz = match &vals[1] {
1429                Value::String(s) => s.as_str(),
1430                v => return Err(anyhow!("fn:time:parse_civil: second arg must be string, got {v}")),
1431            };
1432            let offset = parse_timezone_offset(tz)?;
1433            let nanos = parse_civil_datetime_to_nanos(s)?;
1434            // Subtract offset to convert local time to UTC
1435            Ok(Value::Time(nanos - offset * 1_000_000_000))
1436        }
1437
1438        // --- Duration functions ---
1439        "fn:duration:add" => {
1440            if vals.len() != 2 {
1441                return Err(anyhow!("fn:duration:add: requires 2 arguments"));
1442            }
1443            match (&vals[0], &vals[1]) {
1444                (Value::Duration(a), Value::Duration(b)) => Ok(Value::Duration(a + b)),
1445                _ => Err(anyhow!("fn:duration:add: expected (duration, duration)")),
1446            }
1447        }
1448        "fn:duration:mult" => {
1449            if vals.len() != 2 {
1450                return Err(anyhow!("fn:duration:mult: requires 2 arguments"));
1451            }
1452            match (&vals[0], &vals[1]) {
1453                (Value::Duration(d), Value::Number(n)) => Ok(Value::Duration(d * n)),
1454                (Value::Number(n), Value::Duration(d)) => Ok(Value::Duration(n * d)),
1455                _ => Err(anyhow!("fn:duration:mult: expected (duration, number) or (number, duration)")),
1456            }
1457        }
1458        "fn:duration:hours" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 60.0 * 1_000_000_000.0)),
1459        "fn:duration:minutes" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 1_000_000_000.0)),
1460        "fn:duration:seconds" => duration_component_float(vals, |nanos| nanos as f64 / 1_000_000_000.0),
1461        "fn:duration:nanos" => duration_component_int(vals, |nanos| nanos),
1462        "fn:duration:from_nanos" => {
1463            if vals.len() != 1 {
1464                return Err(anyhow!("fn:duration:from_nanos: requires 1 argument"));
1465            }
1466            match &vals[0] {
1467                Value::Number(n) => Ok(Value::Duration(*n)),
1468                v => Err(anyhow!("fn:duration:from_nanos: expected number, got {v}")),
1469            }
1470        }
1471        "fn:duration:from_hours" => duration_from(vals, "hours", 60 * 60 * 1_000_000_000),
1472        "fn:duration:from_minutes" => duration_from(vals, "minutes", 60 * 1_000_000_000),
1473        "fn:duration:from_seconds" => duration_from(vals, "seconds", 1_000_000_000),
1474        "fn:duration:parse" => {
1475            if vals.len() != 1 {
1476                return Err(anyhow!("fn:duration:parse: requires 1 argument"));
1477            }
1478            match &vals[0] {
1479                Value::String(s) => {
1480                    let nanos = parse_duration_string(s)?;
1481                    Ok(Value::Duration(nanos))
1482                }
1483                v => Err(anyhow!("fn:duration:parse: expected string, got {v}")),
1484            }
1485        }
1486
1487        // --- Compound type constructors ---
1488        "fn:list" => Ok(Value::Compound(CompoundKind::List, vals.to_vec())),
1489        "fn:pair" => {
1490            if vals.len() != 2 {
1491                return Err(anyhow!("fn:pair: requires exactly 2 arguments"));
1492            }
1493            Ok(Value::Compound(CompoundKind::Pair, vals.to_vec()))
1494        }
1495        "fn:struct" => {
1496            // Args are interleaved: field_name, value, field_name, value, ...
1497            if vals.len() % 2 != 0 {
1498                return Err(anyhow!(
1499                    "fn:struct: requires even number of arguments (field, value pairs)"
1500                ));
1501            }
1502            Ok(Value::Compound(CompoundKind::Struct, vals.to_vec()))
1503        }
1504        "fn:map" => {
1505            // Args are interleaved: key, value, key, value, ...
1506            if vals.len() % 2 != 0 {
1507                return Err(anyhow!(
1508                    "fn:map: requires even number of arguments (key, value pairs)"
1509                ));
1510            }
1511            Ok(Value::Compound(CompoundKind::Map, vals.to_vec()))
1512        }
1513
1514        // --- Compound type accessors ---
1515        "fn:list:get" => {
1516            if vals.len() != 2 {
1517                return Err(anyhow!("fn:list:get: requires 2 arguments (list, index)"));
1518            }
1519            match (&vals[0], &vals[1]) {
1520                (Value::Compound(_, elems), Value::Number(idx)) => {
1521                    let i = *idx as usize;
1522                    elems
1523                        .get(i)
1524                        .cloned()
1525                        .ok_or_else(|| anyhow!("fn:list:get: index {i} out of bounds (len {})", elems.len()))
1526                }
1527                _ => Err(anyhow!("fn:list:get: expected (compound, number)")),
1528            }
1529        }
1530        "fn:list:len" | "fn:len" => {
1531            if vals.len() != 1 {
1532                return Err(anyhow!("fn:len: requires 1 argument"));
1533            }
1534            match &vals[0] {
1535                Value::Compound(_, elems) => Ok(Value::Number(elems.len() as i64)),
1536                _ => Err(anyhow!("fn:len: expected compound value")),
1537            }
1538        }
1539        "fn:pair:first" => {
1540            if vals.len() != 1 {
1541                return Err(anyhow!("fn:pair:first: requires 1 argument"));
1542            }
1543            match &vals[0] {
1544                Value::Compound(_, elems) if elems.len() >= 1 => Ok(elems[0].clone()),
1545                _ => Err(anyhow!("fn:pair:first: expected compound with at least 1 element")),
1546            }
1547        }
1548        "fn:pair:second" => {
1549            if vals.len() != 1 {
1550                return Err(anyhow!("fn:pair:second: requires 1 argument"));
1551            }
1552            match &vals[0] {
1553                Value::Compound(_, elems) if elems.len() >= 2 => Ok(elems[1].clone()),
1554                _ => Err(anyhow!("fn:pair:second: expected compound with at least 2 elements")),
1555            }
1556        }
1557        "fn:struct:get" | "fn:map:get" => {
1558            if vals.len() != 2 {
1559                return Err(anyhow!("{fn_name}: requires 2 arguments (compound, key)"));
1560            }
1561            match &vals[0] {
1562                Value::Compound(_, elems) => {
1563                    // Interleaved key-value pairs: [k1, v1, k2, v2, ...]
1564                    for pair in elems.chunks_exact(2) {
1565                        if pair[0] == vals[1] {
1566                            return Ok(pair[1].clone());
1567                        }
1568                    }
1569                    Err(anyhow!("{fn_name}: key not found"))
1570                }
1571                _ => Err(anyhow!("{fn_name}: expected compound value")),
1572            }
1573        }
1574        "fn:map:len" | "fn:struct:len" => {
1575            if vals.len() != 1 {
1576                return Err(anyhow!("{fn_name}: requires 1 argument"));
1577            }
1578            match &vals[0] {
1579                Value::Compound(_, elems) => Ok(Value::Number((elems.len() / 2) as i64)),
1580                _ => Err(anyhow!("{fn_name}: expected compound value")),
1581            }
1582        }
1583        "fn:map:keys" => {
1584            if vals.len() != 1 {
1585                return Err(anyhow!("fn:map:keys: requires 1 argument"));
1586            }
1587            match &vals[0] {
1588                Value::Compound(_, elems) => {
1589                    let keys: Vec<Value> = elems.chunks_exact(2).map(|p| p[0].clone()).collect();
1590                    Ok(Value::Compound(CompoundKind::List, keys))
1591                }
1592                _ => Err(anyhow!("fn:map:keys: expected compound value")),
1593            }
1594        }
1595        "fn:map:values" | "fn:struct:values" => {
1596            if vals.len() != 1 {
1597                return Err(anyhow!("{fn_name}: requires 1 argument"));
1598            }
1599            match &vals[0] {
1600                Value::Compound(_, elems) => {
1601                    let values: Vec<Value> = elems.chunks_exact(2).map(|p| p[1].clone()).collect();
1602                    Ok(Value::Compound(CompoundKind::List, values))
1603                }
1604                _ => Err(anyhow!("{fn_name}: expected compound value")),
1605            }
1606        }
1607
1608        _ => Err(anyhow!("Unknown function: {fn_name}")),
1609    }
1610}
1611
1612fn time_component(vals: &[Value], extract: impl Fn(i64, i64) -> i64) -> Result<Value> {
1613    if vals.len() != 1 {
1614        return Err(anyhow!("time component function: requires 1 argument"));
1615    }
1616    match &vals[0] {
1617        Value::Time(nanos) => {
1618            let secs = nanos.div_euclid(1_000_000_000);
1619            let sub_nanos = nanos.rem_euclid(1_000_000_000);
1620            Ok(Value::Number(extract(secs, sub_nanos)))
1621        }
1622        v => Err(anyhow!("time component function: expected time, got {v}")),
1623    }
1624}
1625
1626fn civil_from_epoch_secs(secs: i64) -> (i32, u32, u32) {
1627    let days = secs.div_euclid(86400);
1628    let z = days + 719468;
1629    let era = (if z >= 0 { z } else { z - 146096 }) / 146097;
1630    let doe = (z - era * 146097) as u32;
1631    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1632    let y = yoe as i64 + era * 400;
1633    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1634    let mp = (5 * doy + 2) / 153;
1635    let d = doy - (153 * mp + 2) / 5 + 1;
1636    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1637    let y = if m <= 2 { y + 1 } else { y };
1638    (y as i32, m, d)
1639}
1640
1641fn duration_component_float(vals: &[Value], extract: impl Fn(i64) -> f64) -> Result<Value> {
1642    if vals.len() != 1 {
1643        return Err(anyhow!("duration component function: requires 1 argument"));
1644    }
1645    match &vals[0] {
1646        Value::Duration(nanos) => Ok(Value::Float(extract(*nanos))),
1647        v => Err(anyhow!("duration component function: expected duration, got {v}")),
1648    }
1649}
1650
1651fn duration_component_int(vals: &[Value], extract: impl Fn(i64) -> i64) -> Result<Value> {
1652    if vals.len() != 1 {
1653        return Err(anyhow!("duration component function: requires 1 argument"));
1654    }
1655    match &vals[0] {
1656        Value::Duration(nanos) => Ok(Value::Number(extract(*nanos))),
1657        v => Err(anyhow!("duration component function: expected duration, got {v}")),
1658    }
1659}
1660
1661fn duration_from(vals: &[Value], name: &str, multiplier: i64) -> Result<Value> {
1662    if vals.len() != 1 {
1663        return Err(anyhow!("fn:duration:from_{name}: requires 1 argument"));
1664    }
1665    match &vals[0] {
1666        Value::Number(n) => Ok(Value::Duration(n * multiplier)),
1667        v => Err(anyhow!("fn:duration:from_{name}: expected number, got {v}")),
1668    }
1669}
1670
1671/// Format a time (in UTC nanoseconds) with a given precision specifier.
1672fn format_time_with_precision(nanos: i64, precision: &str) -> Result<String> {
1673    let secs = nanos.div_euclid(1_000_000_000);
1674    let sub_nanos = nanos.rem_euclid(1_000_000_000);
1675    let (y, m, d) = civil_from_epoch_secs(secs);
1676    let time_of_day = secs.rem_euclid(86400);
1677    let hour = time_of_day / 3600;
1678    let minute = (time_of_day % 3600) / 60;
1679    let second = time_of_day % 60;
1680
1681    match precision {
1682        "/year" => Ok(format!("{y:04}")),
1683        "/month" => Ok(format!("{y:04}-{m:02}")),
1684        "/day" => Ok(format!("{y:04}-{m:02}-{d:02}")),
1685        "/hour" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}Z")),
1686        "/minute" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}Z")),
1687        "/second" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z")),
1688        "/millisecond" => {
1689            let ms = sub_nanos / 1_000_000;
1690            Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ms:03}Z"))
1691        }
1692        "/microsecond" => {
1693            let us = sub_nanos / 1_000;
1694            Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{us:06}Z"))
1695        }
1696        "/nanosecond" => {
1697            if sub_nanos == 0 {
1698                Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z"))
1699            } else {
1700                let ns_str = format!("{sub_nanos:09}");
1701                let ns_trimmed = ns_str.trim_end_matches('0');
1702                Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ns_trimmed}Z"))
1703            }
1704        }
1705        _ => Err(anyhow!("unknown time precision: {precision:?}")),
1706    }
1707}
1708
1709/// Parse a timezone string. Supports "UTC" and fixed offsets like "+05:30", "-08:00".
1710/// Returns offset in seconds from UTC.
1711fn parse_timezone_offset(tz: &str) -> Result<i64> {
1712    match tz {
1713        "UTC" => Ok(0),
1714        s if s.starts_with('+') || s.starts_with('-') => {
1715            let sign: i64 = if s.starts_with('-') { -1 } else { 1 };
1716            let rest = &s[1..];
1717            let parts: Vec<&str> = rest.split(':').collect();
1718            if parts.len() != 2 {
1719                return Err(anyhow!("invalid timezone offset: {tz:?}"));
1720            }
1721            let hours: i64 = parts[0].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1722            let minutes: i64 = parts[1].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1723            Ok(sign * (hours * 3600 + minutes * 60))
1724        }
1725        _ => Err(anyhow!("unsupported timezone: {tz:?} (use \"UTC\" or offset like \"+05:30\")")),
1726    }
1727}
1728
1729/// Parse an RFC3339-like timestamp string to nanoseconds since epoch.
1730fn parse_rfc3339_to_nanos(s: &str) -> Result<i64> {
1731    // Minimal RFC3339: "2006-01-02T15:04:05Z" or with fractional seconds
1732    if s.len() < 10 {
1733        return Err(anyhow!("fn:time:parse_rfc3339: string too short: {s:?}"));
1734    }
1735    let year: i64 = s[0..4].parse().map_err(|_| anyhow!("invalid year in {s:?}"))?;
1736    let month: u32 = s[5..7].parse().map_err(|_| anyhow!("invalid month in {s:?}"))?;
1737    let day: u32 = s[8..10].parse().map_err(|_| anyhow!("invalid day in {s:?}"))?;
1738
1739    let (hour, minute, second, frac_nanos) = if s.len() > 10 && s.as_bytes()[10] == b'T' {
1740        if s.len() < 19 {
1741            return Err(anyhow!("fn:time:parse_rfc3339: incomplete time in {s:?}"));
1742        }
1743        let h: u32 = s[11..13].parse().map_err(|_| anyhow!("invalid hour"))?;
1744        let min: u32 = s[14..16].parse().map_err(|_| anyhow!("invalid minute"))?;
1745        let sec: u32 = s[17..19].parse().map_err(|_| anyhow!("invalid second"))?;
1746
1747        let frac = if s.len() > 19 && s.as_bytes()[19] == b'.' {
1748            let end = s.len() - if s.ends_with('Z') { 1 } else { 0 };
1749            let frac_str = &s[20..end];
1750            let padded = format!("{frac_str:0<9}");
1751            padded[..9].parse::<i64>().unwrap_or(0)
1752        } else {
1753            0
1754        };
1755        (h, min, sec, frac)
1756    } else {
1757        (0, 0, 0, 0)
1758    };
1759
1760    let y = if month <= 2 { year - 1 } else { year };
1761    let era = (if y >= 0 { y } else { y - 399 }) / 400;
1762    let yoe = (y - era * 400) as u32;
1763    let m_adj = if month > 2 { month - 3 } else { month + 9 };
1764    let doy = (153 * m_adj + 2) / 5 + day - 1;
1765    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1766    let days = era * 146097 + doe as i64 - 719468;
1767
1768    let total_seconds = days * 86400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64;
1769    Ok(total_seconds * 1_000_000_000 + frac_nanos)
1770}
1771
1772/// Parse a civil datetime string like "2024-01-15T10:30:00" or "2024-01-15T10:30:00.000000000".
1773fn parse_civil_datetime_to_nanos(s: &str) -> Result<i64> {
1774    // Reuse RFC3339 parser โ€” civil format is the same without 'Z' suffix
1775    parse_rfc3339_to_nanos(s)
1776}
1777
1778/// Parse a Go-style duration string like "1h30m", "500ms", "-2h45m30s", "1.5h".
1779fn parse_duration_string(s: &str) -> Result<i64> {
1780    if s.is_empty() {
1781        return Err(anyhow!("fn:duration:parse: empty string"));
1782    }
1783    if s == "0" || s == "0s" {
1784        return Ok(0);
1785    }
1786
1787    let (sign, mut rest) = if s.starts_with('-') {
1788        (-1i64, &s[1..])
1789    } else if s.starts_with('+') {
1790        (1i64, &s[1..])
1791    } else {
1792        (1i64, s)
1793    };
1794
1795    let mut total_nanos: i64 = 0;
1796
1797    while !rest.is_empty() {
1798        // Parse numeric part (integer or float)
1799        let num_end = rest
1800            .find(|c: char| !c.is_ascii_digit() && c != '.')
1801            .unwrap_or(rest.len());
1802        if num_end == 0 {
1803            return Err(anyhow!("fn:duration:parse: expected number in {s:?}"));
1804        }
1805        let num_str = &rest[..num_end];
1806        rest = &rest[num_end..];
1807
1808        // Parse unit suffix
1809        let (unit_nanos, unit_len) = if rest.starts_with("ns") {
1810            (1i64, 2)
1811        } else if rest.starts_with("us") || rest.starts_with("ยตs") {
1812            (1_000i64, if rest.starts_with("ยต") { 3 } else { 2 })
1813        } else if rest.starts_with("ms") {
1814            (1_000_000i64, 2)
1815        } else if rest.starts_with('s') {
1816            (1_000_000_000i64, 1)
1817        } else if rest.starts_with('m') {
1818            (60 * 1_000_000_000i64, 1)
1819        } else if rest.starts_with('h') {
1820            (3600 * 1_000_000_000i64, 1)
1821        } else {
1822            return Err(anyhow!("fn:duration:parse: unknown unit in {s:?}"));
1823        };
1824        rest = &rest[unit_len..];
1825
1826        if num_str.contains('.') {
1827            let val: f64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
1828            total_nanos += (val * unit_nanos as f64) as i64;
1829        } else {
1830            let val: i64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
1831            total_nanos += val * unit_nanos;
1832        }
1833    }
1834
1835    Ok(sign * total_nanos)
1836}
1837
1838#[cfg(test)]
1839mod tests {
1840    use super::*;
1841
1842    #[test]
1843    fn test_retract_existing() {
1844        let mut store = MemStore::new();
1845        store.add_fact("r", vec![Value::Number(1), Value::Number(2)]);
1846        store.add_fact("r", vec![Value::Number(3), Value::Number(4)]);
1847
1848        let removed = store
1849            .retract("r", &[Value::Number(1), Value::Number(2)])
1850            .unwrap();
1851        assert!(removed);
1852
1853        let facts = store.get_facts("r");
1854        assert_eq!(facts.len(), 1);
1855        assert_eq!(facts[0], vec![Value::Number(3), Value::Number(4)]);
1856    }
1857
1858    #[test]
1859    fn test_retract_nonexistent() {
1860        let mut store = MemStore::new();
1861        store.add_fact("r", vec![Value::Number(1)]);
1862
1863        let removed = store.retract("r", &[Value::Number(99)]).unwrap();
1864        assert!(!removed);
1865
1866        let facts = store.get_facts("r");
1867        assert_eq!(facts.len(), 1);
1868        assert_eq!(facts[0], vec![Value::Number(1)]);
1869    }
1870
1871    #[test]
1872    fn test_clear() {
1873        let mut store = MemStore::new();
1874        store.add_fact("r", vec![Value::Number(1)]);
1875        store.add_fact("r", vec![Value::Number(2)]);
1876        store.add_fact("s", vec![Value::Number(10)]);
1877
1878        store.clear("r");
1879
1880        let r_facts = store.get_facts("r");
1881        assert!(r_facts.is_empty());
1882
1883        // "s" should be untouched
1884        let s_facts = store.get_facts("s");
1885        assert_eq!(s_facts.len(), 1);
1886    }
1887
1888    #[test]
1889    fn test_relation_names() {
1890        let mut store = MemStore::new();
1891        store.create_relation("alpha");
1892        store.create_relation("beta");
1893        store.add_fact("gamma", vec![Value::Number(1)]);
1894
1895        let mut names = store.relation_names();
1896        names.sort();
1897        assert_eq!(names, vec!["alpha", "beta", "gamma"]);
1898    }
1899
1900    #[test]
1901    fn test_provenance_recording() {
1902        use mangle_ir::physical::{DataSource, Operand};
1903
1904        // Build a minimal IR manually to test provenance
1905        let mut ir = mangle_ir::Ir::new();
1906        let base_name = ir.intern_name("base");
1907        let derived_name = ir.intern_name("derived");
1908        let var_x = ir.intern_name("X");
1909
1910        // Create an Op: Iterate(Scan("base", [X]), Insert("derived", [X]))
1911        let op = Op::Iterate {
1912            source: DataSource::Scan {
1913                relation: base_name,
1914                vars: vec![var_x],
1915            },
1916            body: Box::new(Op::Insert {
1917                relation: derived_name,
1918                args: vec![Operand::Var(var_x)],
1919            }),
1920        };
1921
1922        let mut store = Box::new(MemStore::new());
1923        store.add_fact("base", vec![Value::Number(10)]);
1924        store.add_fact("base", vec![Value::Number(20)]);
1925        store.create_relation("derived");
1926
1927        let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>).with_provenance();
1928
1929        let count = interpreter.execute(&op).unwrap();
1930        assert_eq!(count, 2);
1931
1932        // Check provenance was recorded
1933        let prov = interpreter.provenance.as_ref().unwrap();
1934        assert_eq!(prov.entries.len(), 2);
1935
1936        // Each derived fact should have one premise (from "base")
1937        for entry in &prov.entries {
1938            assert_eq!(entry.derived.0, "derived");
1939            assert_eq!(entry.premises.len(), 1);
1940            assert_eq!(entry.premises[0].0, "base");
1941        }
1942
1943        // Check the actual derived facts
1944        let mut derived_vals: Vec<i64> = prov
1945            .entries
1946            .iter()
1947            .map(|e| match &e.derived.1[0] {
1948                Value::Number(n) => *n,
1949                _ => panic!("expected number"),
1950            })
1951            .collect();
1952        derived_vals.sort();
1953        assert_eq!(derived_vals, vec![10, 20]);
1954    }
1955
1956    #[test]
1957    fn test_float_values() {
1958        use mangle_ir::physical::{self, DataSource, Expr, Operand};
1959
1960        let mut ir = mangle_ir::Ir::new();
1961        let temps = ir.intern_name("temps");
1962        let result = ir.intern_name("result");
1963        let var_x = ir.intern_name("X");
1964        // First, verify basic scan+insert of floats works
1965        let simple_op = Op::Iterate {
1966            source: DataSource::Scan {
1967                relation: temps,
1968                vars: vec![var_x],
1969            },
1970            body: Box::new(Op::Insert {
1971                relation: result,
1972                args: vec![Operand::Var(var_x)],
1973            }),
1974        };
1975
1976        let mut store = Box::new(MemStore::new());
1977        store.add_fact("temps", vec![Value::Float(36.5)]);
1978        store.add_fact("temps", vec![Value::Float(35.9)]);
1979        store.add_fact("temps", vec![Value::Float(37.2)]);
1980        store.create_relation("result");
1981
1982        let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>);
1983        let count = interpreter.execute(&simple_op).unwrap();
1984        assert_eq!(count, 3, "basic float scan+insert should produce 3 facts");
1985
1986        // Now test with filter and arithmetic
1987        let mut ir2 = mangle_ir::Ir::new();
1988        let temps2 = ir2.intern_name("temps");
1989        let result2 = ir2.intern_name("result2");
1990        let var_x2 = ir2.intern_name("X");
1991        let var_y2 = ir2.intern_name("Y");
1992        let fn_plus2 = ir2.intern_name("fn:float:plus");
1993
1994        let op = Op::Iterate {
1995            source: DataSource::Scan {
1996                relation: temps2,
1997                vars: vec![var_x2],
1998            },
1999            body: Box::new(Op::Filter {
2000                cond: Condition::Cmp {
2001                    op: physical::CmpOp::Gt,
2002                    left: Operand::Var(var_x2),
2003                    right: Operand::Const(physical::Constant::Float(36.0)),
2004                },
2005                body: Box::new(Op::Let {
2006                    var: var_y2,
2007                    expr: Expr::Call {
2008                        function: fn_plus2,
2009                        args: vec![
2010                            Operand::Var(var_x2),
2011                            Operand::Const(physical::Constant::Float(0.5)),
2012                        ],
2013                    },
2014                    body: Box::new(Op::Insert {
2015                        relation: result2,
2016                        args: vec![Operand::Var(var_x2), Operand::Var(var_y2)],
2017                    }),
2018                }),
2019            }),
2020        };
2021
2022        let mut store2 = Box::new(MemStore::new());
2023        store2.add_fact("temps", vec![Value::Float(36.5)]);
2024        store2.add_fact("temps", vec![Value::Float(35.9)]);
2025        store2.add_fact("temps", vec![Value::Float(37.2)]);
2026        store2.create_relation("result2");
2027
2028        let mut interpreter2 = Interpreter::new(&ir2, store2 as Box<dyn Store>);
2029        let count2 = interpreter2.execute(&op).unwrap();
2030
2031        // Only 36.5 and 37.2 are > 36.0
2032        assert_eq!(count2, 2);
2033
2034        // Results are in next_delta, check via scan_next_delta
2035        let results: Vec<_> = interpreter2
2036            .store()
2037            .scan_next_delta("result2")
2038            .unwrap()
2039            .collect();
2040        assert_eq!(results.len(), 2);
2041
2042        let mut output: Vec<(f64, f64)> = results
2043            .iter()
2044            .map(|t| match (&t[0], &t[1]) {
2045                (Value::Float(a), Value::Float(b)) => (*a, *b),
2046                _ => panic!("expected floats"),
2047            })
2048            .collect();
2049        output.sort_by(|a, b| a.0.total_cmp(&b.0));
2050
2051        assert_eq!(output[0], (36.5, 37.0));
2052        assert_eq!(output[1], (37.2, 37.7));
2053    }
2054
2055    #[test]
2056    fn test_float_in_memstore() {
2057        // Test that Float values work correctly as HashMap keys (equality, hashing)
2058        let mut store = MemStore::new();
2059        store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2060        store.add_fact("data", vec![Value::Float(2.5), Value::Number(20)]);
2061        // Duplicate should not be added
2062        store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2063
2064        let facts = store.get_facts("data");
2065        assert_eq!(facts.len(), 2);
2066
2067        // Test index lookup
2068        let results: Vec<_> = store
2069            .scan_index("data", 0, &Value::Float(1.5))
2070            .unwrap()
2071            .collect();
2072        assert_eq!(results.len(), 1);
2073        assert_eq!(results[0][1], Value::Number(10));
2074    }
2075
2076    // --- eval_function unit tests ---
2077
2078    #[test]
2079    fn test_fn_plus_variadic() {
2080        assert_eq!(
2081            eval_function("fn:plus", &[Value::Number(1), Value::Number(2), Value::Number(3)]).unwrap(),
2082            Value::Number(6)
2083        );
2084        // Zero args returns 0 (identity)
2085        assert_eq!(eval_function("fn:plus", &[]).unwrap(), Value::Number(0));
2086    }
2087
2088    #[test]
2089    fn test_fn_minus_variadic() {
2090        // Unary
2091        assert_eq!(
2092            eval_function("fn:minus", &[Value::Number(5)]).unwrap(),
2093            Value::Number(-5)
2094        );
2095        // Binary
2096        assert_eq!(
2097            eval_function("fn:minus", &[Value::Number(10), Value::Number(3)]).unwrap(),
2098            Value::Number(7)
2099        );
2100        // Variadic: 100 - 10 - 20 = 70
2101        assert_eq!(
2102            eval_function("fn:minus", &[Value::Number(100), Value::Number(10), Value::Number(20)]).unwrap(),
2103            Value::Number(70)
2104        );
2105        // Zero args is an error
2106        assert!(eval_function("fn:minus", &[]).is_err());
2107    }
2108
2109    #[test]
2110    fn test_fn_mult_variadic() {
2111        assert_eq!(
2112            eval_function("fn:mult", &[Value::Number(2), Value::Number(3), Value::Number(4)]).unwrap(),
2113            Value::Number(24)
2114        );
2115        // Zero args returns 1 (identity)
2116        assert_eq!(eval_function("fn:mult", &[]).unwrap(), Value::Number(1));
2117    }
2118
2119    #[test]
2120    fn test_fn_div() {
2121        // Binary
2122        assert_eq!(
2123            eval_function("fn:div", &[Value::Number(10), Value::Number(3)]).unwrap(),
2124            Value::Number(3)
2125        );
2126        // Unary: 1/n
2127        assert_eq!(
2128            eval_function("fn:div", &[Value::Number(5)]).unwrap(),
2129            Value::Number(0)
2130        );
2131        assert_eq!(
2132            eval_function("fn:div", &[Value::Number(1)]).unwrap(),
2133            Value::Number(1)
2134        );
2135        // Division by zero
2136        assert!(eval_function("fn:div", &[Value::Number(1), Value::Number(0)]).is_err());
2137        assert!(eval_function("fn:div", &[Value::Number(0)]).is_err());
2138    }
2139
2140    #[test]
2141    fn test_fn_float_promotion() {
2142        // fn:float:plus accepts both Float and Number
2143        assert_eq!(
2144            eval_function("fn:float:plus", &[Value::Float(1.5), Value::Number(2)]).unwrap(),
2145            Value::Float(3.5)
2146        );
2147        // fn:sqrt accepts Number
2148        assert_eq!(
2149            eval_function("fn:sqrt", &[Value::Number(9)]).unwrap(),
2150            Value::Float(3.0)
2151        );
2152    }
2153
2154    #[test]
2155    fn test_fn_string_concat() {
2156        assert_eq!(
2157            eval_function(
2158                "fn:string:concat",
2159                &[Value::String("a".into()), Value::String("b".into()), Value::String("c".into())]
2160            ).unwrap(),
2161            Value::String("abc".to_string())
2162        );
2163        // Mixed types
2164        assert_eq!(
2165            eval_function(
2166                "fn:string:concat",
2167                &[Value::String("n=".into()), Value::Number(42)]
2168            ).unwrap(),
2169            Value::String("n=42".to_string())
2170        );
2171    }
2172
2173    #[test]
2174    fn test_fn_string_replace() {
2175        // Replace all (-1)
2176        assert_eq!(
2177            eval_function(
2178                "fn:string:replace",
2179                &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(-1)]
2180            ).unwrap(),
2181            Value::String("a_b_c".to_string())
2182        );
2183        // Replace first only (1)
2184        assert_eq!(
2185            eval_function(
2186                "fn:string:replace",
2187                &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(1)]
2188            ).unwrap(),
2189            Value::String("a_b-c".to_string())
2190        );
2191    }
2192
2193    #[test]
2194    fn test_fn_to_string() {
2195        assert_eq!(
2196            eval_function("fn:number:to_string", &[Value::Number(42)]).unwrap(),
2197            Value::String("42".to_string())
2198        );
2199        assert_eq!(
2200            eval_function("fn:float64:to_string", &[Value::Float(3.14)]).unwrap(),
2201            Value::String("3.14".to_string())
2202        );
2203        assert_eq!(
2204            eval_function("fn:name:to_string", &[Value::String("/role/admin".into())]).unwrap(),
2205            Value::String("/role/admin".to_string())
2206        );
2207    }
2208
2209    // -----------------------------------------------------------------------
2210    // Temporal coalescing tests (ported from Go factstore/temporal_test.go)
2211    // -----------------------------------------------------------------------
2212
2213    // Helper: create a time value in nanoseconds for a date
2214    fn date_nanos(year: i64, month: u32, day: u32) -> i64 {
2215        // Howard Hinnant's algorithm (matching the parser)
2216        let m = month;
2217        let y = if m <= 2 { year - 1 } else { year };
2218        let era = (if y >= 0 { y } else { y - 399 }) / 400;
2219        let yoe = (y - era * 400) as u32;
2220        let m_adj = if m > 2 { m - 3 } else { m + 9 };
2221        let doy = (153 * m_adj + 2) / 5 + day - 1;
2222        let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2223        let days = era * 146097 + doe as i64 - 719468;
2224        days * 86400 * 1_000_000_000
2225    }
2226
2227    fn datetime_nanos(year: i64, month: u32, day: u32, h: u32, m: u32, s: u32, ns: i64) -> i64 {
2228        date_nanos(year, month, day) + (h as i64) * 3_600_000_000_000
2229            + (m as i64) * 60_000_000_000 + (s as i64) * 1_000_000_000 + ns
2230    }
2231
2232    /// Go: TestTemporalStore_Coalesce - overlapping intervals merge into one
2233    #[test]
2234    fn test_coalesce_overlapping() {
2235        let mut store = MemStore::new();
2236        let jan1 = date_nanos(2024, 1, 1);
2237        let jan15 = date_nanos(2024, 1, 15);
2238        let jan10 = date_nanos(2024, 1, 10);
2239        let jan25 = date_nanos(2024, 1, 25);
2240        let jan20 = date_nanos(2024, 1, 20);
2241        let jan31 = date_nanos(2024, 1, 31);
2242
2243        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan1), Value::Time(jan15)]);
2244        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan10), Value::Time(jan25)]);
2245        store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan20), Value::Time(jan31)]);
2246
2247        assert_eq!(store.get_facts("active").len(), 3);
2248
2249        store.coalesce_temporal("active");
2250
2251        let facts = store.get_facts("active");
2252        assert_eq!(facts.len(), 1, "after coalesce: expected 1, got {:?}", facts);
2253        assert_eq!(facts[0][1], Value::Time(jan1), "start should be Jan 1");
2254        assert_eq!(facts[0][2], Value::Time(jan31), "end should be Jan 31");
2255    }
2256
2257    /// Go: TestTemporalStore_CoalesceAdjacent - adjacent intervals (1ns gap) coalesce
2258    #[test]
2259    fn test_coalesce_adjacent() {
2260        let mut store = MemStore::new();
2261        let shift1_start = datetime_nanos(2024, 1, 1, 8, 0, 0, 0);
2262        let shift1_end = datetime_nanos(2024, 1, 1, 16, 0, 0, 0);
2263        let shift2_start = datetime_nanos(2024, 1, 1, 16, 0, 0, 1); // 1ns after
2264        let shift2_end = date_nanos(2024, 1, 2);
2265
2266        store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift1_start), Value::Time(shift1_end)]);
2267        store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift2_start), Value::Time(shift2_end)]);
2268
2269        store.coalesce_temporal("shift");
2270
2271        let facts = store.get_facts("shift");
2272        assert_eq!(facts.len(), 1, "adjacent intervals should coalesce");
2273        assert_eq!(facts[0][1], Value::Time(shift1_start));
2274        assert_eq!(facts[0][2], Value::Time(shift2_end));
2275    }
2276
2277    /// Go: TestTemporalStore_CoalesceNonOverlapping - non-overlapping intervals stay separate
2278    #[test]
2279    fn test_coalesce_non_overlapping() {
2280        let mut store = MemStore::new();
2281        let jan1 = date_nanos(2024, 1, 1);
2282        let jan7 = date_nanos(2024, 1, 7);
2283        let jun1 = date_nanos(2024, 6, 1);
2284        let jun14 = date_nanos(2024, 6, 14);
2285
2286        store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan7)]);
2287        store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jun1), Value::Time(jun14)]);
2288
2289        store.coalesce_temporal("vacation");
2290
2291        let facts = store.get_facts("vacation");
2292        assert_eq!(facts.len(), 2, "non-overlapping intervals should stay separate");
2293    }
2294
2295    /// Go: TestTemporalStore_CoalesceMixedGranularity - sub-second precision
2296    #[test]
2297    fn test_coalesce_mixed_granularity() {
2298        let mut store = MemStore::new();
2299        // Second-level: 10:00:00 to 10:00:05
2300        let t1_start = datetime_nanos(2024, 1, 1, 10, 0, 0, 0);
2301        let t1_end = datetime_nanos(2024, 1, 1, 10, 0, 5, 0);
2302        // Overlapping millisecond-level: 10:00:04.5 to 10:00:06
2303        let t2_start = datetime_nanos(2024, 1, 1, 10, 0, 4, 500_000_000);
2304        let t2_end = datetime_nanos(2024, 1, 1, 10, 0, 6, 0);
2305        // Adjacent nanosecond-level: 10:00:06.000000001 to 10:00:07
2306        let t3_start = datetime_nanos(2024, 1, 1, 10, 0, 6, 1);
2307        let t3_end = datetime_nanos(2024, 1, 1, 10, 0, 7, 0);
2308
2309        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t1_start), Value::Time(t1_end)]);
2310        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t2_start), Value::Time(t2_end)]);
2311        store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t3_start), Value::Time(t3_end)]);
2312
2313        store.coalesce_temporal("event");
2314
2315        let facts = store.get_facts("event");
2316        assert_eq!(facts.len(), 1, "mixed granularity should coalesce to 1");
2317        assert_eq!(facts[0][1], Value::Time(t1_start), "start: 10:00:00");
2318        assert_eq!(facts[0][2], Value::Time(t3_end), "end: 10:00:07");
2319    }
2320
2321    /// Test coalescing with multiple keys: same relation, different key columns
2322    #[test]
2323    fn test_coalesce_multiple_keys() {
2324        let mut store = MemStore::new();
2325        let jan1 = date_nanos(2024, 1, 1);
2326        let jan10 = date_nanos(2024, 1, 10);
2327        let jan5 = date_nanos(2024, 1, 5);
2328        let jan15 = date_nanos(2024, 1, 15);
2329
2330        // Alice: two overlapping intervals
2331        store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan10)]);
2332        store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan5), Value::Time(jan15)]);
2333        // Bob: one interval
2334        store.add_fact("employed", vec![Value::String("/bob".into()), Value::Time(jan1), Value::Time(jan15)]);
2335
2336        store.coalesce_temporal("employed");
2337
2338        let facts = store.get_facts("employed");
2339        // Alice's 2 intervals merge to 1; Bob stays as 1
2340        assert_eq!(facts.len(), 2, "expected 2 facts after coalesce, got {:?}", facts);
2341    }
2342}