1use anyhow::{Result, anyhow};
30use mangle_ir::physical::{Aggregate, CmpOp, Condition, Constant, DataSource, Expr, Op, Operand};
31use mangle_ir::{Ir, NameId};
32use std::collections::HashMap;
33
34pub use mangle_common::{CompoundKind, Store, Value};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
40enum Cell {
41 Val(Value),
42 CompoundStart(CompoundKind, usize),
44}
45
46fn flatten_value(v: &Value, out: &mut Vec<Cell>) {
50 match v {
51 Value::Compound(kind, elems) => {
52 out.push(Cell::CompoundStart(*kind, elems.len()));
53 for elem in elems {
54 flatten_value(elem, out);
55 }
56 }
57 _ => out.push(Cell::Val(v.clone())),
58 }
59}
60
61fn flatten_tuple(tuple: &[Value]) -> Vec<Cell> {
63 let mut cells = Vec::new();
64 for v in tuple {
65 flatten_value(v, &mut cells);
66 }
67 cells
68}
69
70fn unflatten_one(cells: &[Cell], pos: &mut usize) -> Value {
72 match &cells[*pos] {
73 Cell::Val(v) => {
74 *pos += 1;
75 v.clone()
76 }
77 Cell::CompoundStart(kind, n) => {
78 let kind = *kind;
79 let n = *n;
80 *pos += 1;
81 let mut elems = Vec::with_capacity(n);
82 for _ in 0..n {
83 elems.push(unflatten_one(cells, pos));
84 }
85 Value::Compound(kind, elems)
86 }
87 }
88}
89
90fn skip_one(cells: &[Cell], pos: &mut usize) {
92 match &cells[*pos] {
93 Cell::Val(_) => *pos += 1,
94 Cell::CompoundStart(_, n) => {
95 let n = *n;
96 *pos += 1;
97 for _ in 0..n {
98 skip_one(cells, pos);
99 }
100 }
101 }
102}
103
104fn unflatten_tuple(cells: &[Cell], n_cols: usize) -> Vec<Value> {
106 let mut pos = 0;
107 let mut tuple = Vec::with_capacity(n_cols);
108 for _ in 0..n_cols {
109 tuple.push(unflatten_one(cells, &mut pos));
110 }
111 tuple
112}
113
114#[derive(Default)]
121pub struct MemStore {
122 stable: HashMap<String, Vec<Vec<Cell>>>,
124 delta: HashMap<String, Vec<Vec<Cell>>>,
126 next_delta: HashMap<String, Vec<Vec<Cell>>>,
128
129 arity: HashMap<String, usize>,
131
132 stable_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
136 delta_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
137}
138
139impl MemStore {
140 pub fn new() -> Self {
141 Self::default()
142 }
143
144 pub fn create_relation(&mut self, relation: &str) {
146 self.stable.entry(relation.to_string()).or_default();
147 }
148
149 pub fn add_fact(&mut self, relation: &str, args: Vec<Value>) {
151 let n_cols = args.len();
152 let cells = flatten_tuple(&args);
153 let table = self.stable.entry(relation.to_string()).or_default();
154 if !table.contains(&cells) {
155 let row_idx = table.len();
156 self.arity.entry(relation.to_string()).or_insert(n_cols);
157 table.push(cells.clone());
158 index_cells(&mut self.stable_indexes, relation, &cells, n_cols, row_idx);
159 }
160 }
161
162 fn rebuild_indexes_for(&mut self, relation: &str) {
164 self.stable_indexes.retain(|(rel, _), _| rel != relation);
165 self.delta_indexes.retain(|(rel, _), _| rel != relation);
166
167 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
168
169 if let Some(table) = self.stable.get(relation) {
170 for (row_idx, cells) in table.iter().enumerate() {
171 index_cells(&mut self.stable_indexes, relation, cells, n_cols, row_idx);
172 }
173 }
174
175 if let Some(table) = self.delta.get(relation) {
176 for (row_idx, cells) in table.iter().enumerate() {
177 index_cells(&mut self.delta_indexes, relation, cells, n_cols, row_idx);
178 }
179 }
180 }
181
182 fn to_values(&self, relation: &str, cells: &[Cell]) -> Vec<Value> {
184 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
185 if n_cols == 0 {
186 cells
188 .iter()
189 .map(|c| match c {
190 Cell::Val(v) => v.clone(),
191 Cell::CompoundStart(_, _) => Value::Null,
192 })
193 .collect()
194 } else {
195 unflatten_tuple(cells, n_cols)
196 }
197 }
198
199 pub fn get_facts(&self, relation: &str) -> Vec<Vec<Value>> {
200 let mut all: Vec<Vec<Value>> = self
201 .stable
202 .get(relation)
203 .into_iter()
204 .flatten()
205 .map(|cells| self.to_values(relation, cells))
206 .collect();
207 if let Some(d) = self.delta.get(relation) {
208 for cells in d {
209 all.push(self.to_values(relation, cells));
210 }
211 }
212 all
213 }
214
215 pub fn coalesce_temporal(&mut self, relation: &str) {
220 let n_cols = match self.arity.get(relation) {
221 Some(&n) if n >= 2 => n,
222 _ => return,
223 };
224 let key_cols = n_cols - 2; let mut all_facts: Vec<Vec<Value>> = Vec::new();
228 if let Some(table) = self.stable.get(relation) {
229 for cells in table {
230 all_facts.push(unflatten_tuple(cells, n_cols));
231 }
232 }
233 if let Some(table) = self.delta.get(relation) {
234 for cells in table {
235 all_facts.push(unflatten_tuple(cells, n_cols));
236 }
237 }
238
239 if all_facts.is_empty() {
240 return;
241 }
242
243 let mut groups: HashMap<Vec<Value>, Vec<(i64, i64)>> = HashMap::new();
245 for fact in &all_facts {
246 let key: Vec<Value> = fact[..key_cols].to_vec();
247 let start = match &fact[key_cols] {
248 Value::Time(t) => *t,
249 Value::Number(n) => *n,
250 _ => continue,
251 };
252 let end = match &fact[key_cols + 1] {
253 Value::Time(t) => *t,
254 Value::Number(n) => *n,
255 _ => continue,
256 };
257 groups.entry(key).or_default().push((start, end));
258 }
259
260 let mut coalesced_facts: Vec<Vec<Value>> = Vec::new();
262 for (key, mut intervals) in groups {
263 intervals.sort_by_key(|&(s, _)| s);
264 let mut merged: Vec<(i64, i64)> = vec![intervals[0]];
265 for &(s, e) in &intervals[1..] {
266 let last = merged.last_mut().unwrap();
267 if s <= last.1.saturating_add(1) {
269 last.1 = last.1.max(e);
270 } else {
271 merged.push((s, e));
272 }
273 }
274 for (start, end) in merged {
275 let mut fact = key.clone();
276 fact.push(Value::Time(start));
277 fact.push(Value::Time(end));
278 coalesced_facts.push(fact);
279 }
280 }
281
282 let coalesced_cells: Vec<Vec<Cell>> = coalesced_facts
284 .iter()
285 .map(|fact| flatten_tuple(fact))
286 .collect();
287 self.stable.insert(relation.to_string(), coalesced_cells);
288 self.delta.remove(relation);
289 self.next_delta.remove(relation);
290 self.rebuild_indexes_for(relation);
291 }
292}
293
294fn index_cells(
296 indexes: &mut HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
297 relation: &str,
298 cells: &[Cell],
299 n_cols: usize,
300 row_idx: usize,
301) {
302 let mut pos = 0;
303 for col_idx in 0..n_cols {
304 let key = cells[pos].clone();
305 skip_one(cells, &mut pos);
306 indexes
307 .entry((relation.to_string(), col_idx))
308 .or_default()
309 .entry(key)
310 .or_default()
311 .push(row_idx);
312 }
313}
314
315fn value_to_index_key(v: &Value) -> Cell {
317 match v {
318 Value::Compound(kind, elems) => Cell::CompoundStart(*kind, elems.len()),
319 _ => Cell::Val(v.clone()),
320 }
321}
322
323impl Store for MemStore {
324 fn scan(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
325 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
326 let s = self
327 .stable
328 .get(relation)
329 .into_iter()
330 .flatten()
331 .map(move |cells| unflatten_tuple(cells, n_cols));
332 let d = self
333 .delta
334 .get(relation)
335 .into_iter()
336 .flatten()
337 .map(move |cells| unflatten_tuple(cells, n_cols));
338 Ok(Box::new(s.chain(d)))
339 }
340
341 fn scan_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
342 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
343 match self.delta.get(relation) {
344 Some(tuples) => Ok(Box::new(
345 tuples
346 .iter()
347 .map(move |cells| unflatten_tuple(cells, n_cols)),
348 )),
349 None => Ok(Box::new(std::iter::empty())),
350 }
351 }
352
353 fn scan_next_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
354 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
355 match self.next_delta.get(relation) {
356 Some(tuples) => Ok(Box::new(
357 tuples
358 .iter()
359 .map(move |cells| unflatten_tuple(cells, n_cols)),
360 )),
361 None => Ok(Box::new(std::iter::empty())),
362 }
363 }
364
365 fn scan_index(
366 &self,
367 relation: &str,
368 col_idx: usize,
369 key: &Value,
370 ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
371 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
372 let cell_key = value_to_index_key(key);
373 let mut results: Vec<Vec<Value>> = Vec::new();
374
375 if let Some(idx_map) = self.stable_indexes.get(&(relation.to_string(), col_idx))
376 && let Some(row_indices) = idx_map.get(&cell_key)
377 && let Some(table) = self.stable.get(relation)
378 {
379 for &i in row_indices {
380 results.push(unflatten_tuple(&table[i], n_cols));
381 }
382 }
383
384 if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
385 && let Some(row_indices) = idx_map.get(&cell_key)
386 && let Some(table) = self.delta.get(relation)
387 {
388 for &i in row_indices {
389 results.push(unflatten_tuple(&table[i], n_cols));
390 }
391 }
392
393 Ok(Box::new(results.into_iter()))
394 }
395
396 fn scan_delta_index(
397 &self,
398 relation: &str,
399 col_idx: usize,
400 key: &Value,
401 ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
402 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
403 let cell_key = value_to_index_key(key);
404 let mut results: Vec<Vec<Value>> = Vec::new();
405
406 if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
407 && let Some(row_indices) = idx_map.get(&cell_key)
408 && let Some(table) = self.delta.get(relation)
409 {
410 for &i in row_indices {
411 results.push(unflatten_tuple(&table[i], n_cols));
412 }
413 }
414
415 Ok(Box::new(results.into_iter()))
416 }
417
418 fn insert(&mut self, relation: &str, tuple: Vec<Value>) -> Result<bool> {
419 let n_cols = tuple.len();
420 let cells = flatten_tuple(&tuple);
421
422 if self
424 .stable
425 .get(relation)
426 .is_some_and(|v| v.contains(&cells))
427 || self
428 .delta
429 .get(relation)
430 .is_some_and(|v| v.contains(&cells))
431 || self
432 .next_delta
433 .get(relation)
434 .is_some_and(|v| v.contains(&cells))
435 {
436 return Ok(false);
437 }
438
439 self.arity.entry(relation.to_string()).or_insert(n_cols);
440 self.next_delta
441 .entry(relation.to_string())
442 .or_default()
443 .push(cells);
444 Ok(true)
445 }
446
447 fn merge_deltas(&mut self) {
448 for (rel_name, mut tuples) in self.delta.drain() {
450 let n_cols = self.arity.get(&rel_name).copied().unwrap_or(0);
451 let table = self.stable.entry(rel_name.clone()).or_default();
452 for cells in tuples.drain(..) {
453 let row_idx = table.len();
454 index_cells(
455 &mut self.stable_indexes,
456 &rel_name,
457 &cells,
458 n_cols,
459 row_idx,
460 );
461 table.push(cells);
462 }
463 }
464 self.delta_indexes.clear();
465
466 self.delta = std::mem::take(&mut self.next_delta);
468 for (rel_name, tuples) in &self.delta {
469 let n_cols = self.arity.get(rel_name).copied().unwrap_or(0);
470 for (row_idx, cells) in tuples.iter().enumerate() {
471 index_cells(
472 &mut self.delta_indexes,
473 rel_name,
474 cells,
475 n_cols,
476 row_idx,
477 );
478 }
479 }
480 }
481
482 fn create_relation(&mut self, relation: &str) {
483 self.stable.entry(relation.to_string()).or_default();
484 }
485
486 fn retract(&mut self, relation: &str, tuple: &[Value]) -> Result<bool> {
487 let cells = flatten_tuple(tuple);
488 let removed = if let Some(table) = self.stable.get_mut(relation) {
489 if let Some(pos) = table.iter().position(|t| *t == cells) {
490 table.swap_remove(pos);
491 true
492 } else {
493 false
494 }
495 } else {
496 false
497 };
498
499 if let Some(table) = self.delta.get_mut(relation) {
501 if let Some(pos) = table.iter().position(|t| *t == cells) {
502 table.swap_remove(pos);
503 }
504 }
505 if let Some(table) = self.next_delta.get_mut(relation) {
506 if let Some(pos) = table.iter().position(|t| *t == cells) {
507 table.swap_remove(pos);
508 }
509 }
510
511 if removed {
512 self.rebuild_indexes_for(relation);
513 }
514 Ok(removed)
515 }
516
517 fn clear(&mut self, relation: &str) {
518 if let Some(table) = self.stable.get_mut(relation) {
519 table.clear();
520 }
521 if let Some(table) = self.delta.get_mut(relation) {
522 table.clear();
523 }
524 if let Some(table) = self.next_delta.get_mut(relation) {
525 table.clear();
526 }
527 self.stable_indexes.retain(|(rel, _), _| rel != relation);
528 self.delta_indexes.retain(|(rel, _), _| rel != relation);
529 }
530
531 fn relation_names(&self) -> Vec<String> {
532 let mut names: Vec<String> = self.stable.keys().cloned().collect();
533 for key in self.delta.keys() {
534 if !names.contains(key) {
535 names.push(key.clone());
536 }
537 }
538 names
539 }
540
541 fn coalesce_temporal(&mut self, relation: &str) {
542 MemStore::coalesce_temporal(self, relation);
543 }
544}
545
546#[derive(Debug, Clone)]
548pub struct ProvenanceEntry {
549 pub derived: (String, Vec<Value>),
551 pub premises: Vec<(String, Vec<Value>)>,
553}
554
555#[derive(Default)]
561pub struct ProvenanceRecorder {
562 pub entries: Vec<ProvenanceEntry>,
564 active_premises: Vec<(String, Vec<Value>)>,
566}
567
568pub struct Interpreter<'a> {
570 ir: &'a Ir,
571 store: Box<dyn Store + 'a>,
572 provenance: Option<ProvenanceRecorder>,
573}
574
575struct Env {
576 vars: HashMap<NameId, Value>,
577}
578
579impl Env {
580 fn new() -> Self {
581 Self {
582 vars: HashMap::new(),
583 }
584 }
585}
586
587impl<'a> Interpreter<'a> {
588 pub fn new(ir: &'a Ir, store: Box<dyn Store + 'a>) -> Self {
589 Self {
590 ir,
591 store,
592 provenance: None,
593 }
594 }
595
596 pub fn with_provenance(mut self) -> Self {
599 self.provenance = Some(ProvenanceRecorder::default());
600 self
601 }
602
603 pub fn store(&self) -> &dyn Store {
605 &*self.store
606 }
607
608 pub fn store_mut(&mut self) -> &mut dyn Store {
610 &mut *self.store
611 }
612
613 pub fn into_store(self) -> Box<dyn Store + 'a> {
615 self.store
616 }
617
618 pub fn into_provenance(self) -> Option<ProvenanceRecorder> {
620 self.provenance
621 }
622
623 pub fn into_parts(self) -> (Box<dyn Store + 'a>, Option<ProvenanceRecorder>) {
625 (self.store, self.provenance)
626 }
627
628 pub fn execute(&mut self, op: &Op) -> Result<usize> {
630 let mut env = Env::new();
631 self.exec_op(op, &mut env)
632 }
633
634 fn exec_op(&mut self, op: &Op, env: &mut Env) -> Result<usize> {
635 match op {
636 Op::Nop => Ok(0),
637 Op::Seq(ops) => {
638 let mut count = 0;
639 for o in ops {
640 count += self.exec_op(o, env)?;
641 }
642 Ok(count)
643 }
644 Op::Iterate { source, body } => {
645 let mut count = 0;
646 match source {
647 DataSource::Scan { relation, vars } => {
648 let rel_name = self.ir.resolve_name(*relation);
649 let iter = self.store.scan(rel_name)?;
650 let tuples: Vec<_> = iter.collect();
651
652 for tuple in tuples {
653 if tuple.len() != vars.len() {
654 continue;
655 }
656 for (i, var) in vars.iter().enumerate() {
657 env.vars.insert(*var, tuple[i].clone());
658 }
659 if let Some(ref mut prov) = self.provenance {
660 prov.active_premises
661 .push((rel_name.to_string(), tuple.clone()));
662 }
663 count += self.exec_op(body, env)?;
664 if self.provenance.is_some() {
665 self.provenance.as_mut().unwrap().active_premises.pop();
666 }
667 }
668 }
669 DataSource::ScanDelta { relation, vars } => {
670 let rel_name = self.ir.resolve_name(*relation);
671 let iter = self.store.scan_delta(rel_name)?;
672 let tuples: Vec<_> = iter.collect();
673
674 for tuple in tuples {
675 if tuple.len() != vars.len() {
676 continue;
677 }
678 for (i, var) in vars.iter().enumerate() {
679 env.vars.insert(*var, tuple[i].clone());
680 }
681 if let Some(ref mut prov) = self.provenance {
682 prov.active_premises
683 .push((rel_name.to_string(), tuple.clone()));
684 }
685 count += self.exec_op(body, env)?;
686 if self.provenance.is_some() {
687 self.provenance.as_mut().unwrap().active_premises.pop();
688 }
689 }
690 }
691 DataSource::IndexLookup {
692 relation,
693 col_idx,
694 key,
695 vars,
696 } => {
697 let rel_name = self.ir.resolve_name(*relation);
698 let key_val = self.eval_operand(key, env)?;
699
700 let iter = self.store.scan_index(rel_name, *col_idx, &key_val)?;
701 let tuples: Vec<_> = iter.collect();
702
703 for tuple in tuples {
704 if tuple.len() != vars.len() {
705 continue;
706 }
707 for (i, var) in vars.iter().enumerate() {
708 env.vars.insert(*var, tuple[i].clone());
709 }
710 if let Some(ref mut prov) = self.provenance {
711 prov.active_premises
712 .push((rel_name.to_string(), tuple.clone()));
713 }
714 count += self.exec_op(body, env)?;
715 if self.provenance.is_some() {
716 self.provenance.as_mut().unwrap().active_premises.pop();
717 }
718 }
719 }
720 }
721 Ok(count)
722 }
723 Op::Filter { cond, body } => {
724 if self.eval_cond(cond, env)? {
725 self.exec_op(body, env)
726 } else {
727 Ok(0)
728 }
729 }
730 Op::Insert { relation, args } => {
731 let rel_name = self.ir.resolve_name(*relation);
732 let mut tuple = Vec::new();
733 for arg in args {
734 tuple.push(self.eval_operand(arg, env)?);
735 }
736 let is_new = self.store.insert(rel_name, tuple.clone())?;
737 if is_new {
738 if let Some(ref mut prov) = self.provenance {
739 prov.entries.push(ProvenanceEntry {
740 derived: (rel_name.to_string(), tuple),
741 premises: prov.active_premises.clone(),
742 });
743 }
744 Ok(1)
745 } else {
746 Ok(0)
747 }
748 }
749 Op::Let { var, expr, body } => {
750 let val = self.eval_expr(expr, env)?;
751 env.vars.insert(*var, val);
752 self.exec_op(body, env)
753 }
754 Op::HashJoin {
755 build_source,
756 probe_source,
757 join_keys,
758 body,
759 } => self.exec_hash_join(build_source, probe_source, join_keys, body, env),
760 Op::GroupBy {
761 source,
762 vars,
763 keys,
764 aggregates,
765 body,
766 } => {
767 let rel_name = self.ir.resolve_name(*source);
768
769 let iter = self.store.scan(rel_name)?;
772 let mut tuples: Vec<_> = iter.collect();
773
774 if let Ok(nd_iter) = self.store.scan_next_delta(rel_name) {
776 tuples.extend(nd_iter);
777 }
778
779 let mut groups: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
780
781 for tuple in tuples {
782 if tuple.len() != vars.len() {
783 continue;
784 }
785 for (i, var) in vars.iter().enumerate() {
787 env.vars.insert(*var, tuple[i].clone());
788 }
789
790 let mut key = Vec::new();
791 for k in keys {
792 if let Some(val) = env.vars.get(k) {
793 key.push(val.clone());
794 } else {
795 key.push(Value::Null);
797 }
798 }
799 groups.entry(key).or_default().push(tuple);
800 }
801
802 let mut count = 0;
803 for (key, group_tuples) in groups {
804 for (i, k) in keys.iter().enumerate() {
806 env.vars.insert(*k, key[i].clone());
807 }
808
809 for agg in aggregates {
811 let val = self.eval_aggregate(agg, &group_tuples, vars, env)?;
812 env.vars.insert(agg.var, val);
813 }
814
815 count += self.exec_op(body, env)?;
816 }
817 Ok(count)
818 }
819 }
820 }
821
822 fn collect_data_source(
826 &mut self,
827 source: &DataSource,
828 env: &mut Env,
829 ) -> Result<(Vec<Vec<Value>>, Vec<NameId>)> {
830 match source {
831 DataSource::Scan { relation, vars } => {
832 let rel_name = self.ir.resolve_name(*relation);
833 let tuples: Vec<_> = self.store.scan(rel_name)?.collect();
834 Ok((tuples, vars.clone()))
835 }
836 DataSource::ScanDelta { relation, vars } => {
837 let rel_name = self.ir.resolve_name(*relation);
838 let tuples: Vec<_> = self.store.scan_delta(rel_name)?.collect();
839 Ok((tuples, vars.clone()))
840 }
841 DataSource::IndexLookup {
842 relation,
843 col_idx,
844 key,
845 vars,
846 } => {
847 let rel_name = self.ir.resolve_name(*relation);
848 let key_val = self.eval_operand(key, env)?;
849 let tuples: Vec<_> =
850 self.store.scan_index(rel_name, *col_idx, &key_val)?.collect();
851 Ok((tuples, vars.clone()))
852 }
853 }
854 }
855
856 fn exec_hash_join(
857 &mut self,
858 build_source: &DataSource,
859 probe_source: &DataSource,
860 join_keys: &[NameId],
861 body: &Op,
862 env: &mut Env,
863 ) -> Result<usize> {
864 let (build_tuples, build_vars) = self.collect_data_source(build_source, env)?;
865
866 let build_key_positions: Vec<usize> = join_keys
869 .iter()
870 .map(|k| {
871 build_vars
872 .iter()
873 .position(|v| v == k)
874 .ok_or_else(|| anyhow!("HashJoin: join key not in build_source vars"))
875 })
876 .collect::<Result<Vec<_>>>()?;
877
878 let mut table: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
879 for tuple in build_tuples {
880 if tuple.len() != build_vars.len() {
881 continue;
882 }
883 let key: Vec<Value> = build_key_positions
884 .iter()
885 .map(|&i| tuple[i].clone())
886 .collect();
887 table.entry(key).or_default().push(tuple);
888 }
889
890 let (probe_tuples, probe_vars) = self.collect_data_source(probe_source, env)?;
891 let probe_key_positions: Vec<usize> = join_keys
892 .iter()
893 .map(|k| {
894 probe_vars
895 .iter()
896 .position(|v| v == k)
897 .ok_or_else(|| anyhow!("HashJoin: join key not in probe_source vars"))
898 })
899 .collect::<Result<Vec<_>>>()?;
900
901 let mut count = 0;
902 for probe_tuple in probe_tuples {
903 if probe_tuple.len() != probe_vars.len() {
904 continue;
905 }
906 let key: Vec<Value> = probe_key_positions
907 .iter()
908 .map(|&i| probe_tuple[i].clone())
909 .collect();
910 let Some(matches) = table.get(&key) else {
911 continue;
912 };
913 for (i, var) in probe_vars.iter().enumerate() {
915 env.vars.insert(*var, probe_tuple[i].clone());
916 }
917 for build_tuple in matches {
918 for (i, var) in build_vars.iter().enumerate() {
919 env.vars.insert(*var, build_tuple[i].clone());
920 }
921 count += self.exec_op(body, env)?;
922 }
923 }
924 Ok(count)
925 }
926
927 fn eval_aggregate(
928 &self,
929 agg: &Aggregate,
930 group: &[Vec<Value>],
931 vars: &[NameId],
932 env: &mut Env,
933 ) -> Result<Value> {
934 let fn_name = self.ir.resolve_name(agg.func);
935 match fn_name {
936 "fn:count" => Ok(Value::Number(group.len() as i64)),
937 "fn:sum" => {
938 let arg = agg
939 .args
940 .first()
941 .ok_or_else(|| anyhow!("fn:sum requires 1 argument"))?;
942
943 let mut int_sum: i64 = 0;
944 for tuple in group {
945 for (i, var) in vars.iter().enumerate() {
946 env.vars.insert(*var, tuple[i].clone());
947 }
948 let val = self.eval_operand(arg, env)?;
949 match val {
950 Value::Number(n) => int_sum += n,
951 _ => return Err(anyhow!("fn:sum: expected integer, got {val}")),
952 }
953 }
954 Ok(Value::Number(int_sum))
955 }
956 "fn:float:sum" => {
957 let arg = agg
958 .args
959 .first()
960 .ok_or_else(|| anyhow!("fn:float:sum requires 1 argument"))?;
961
962 let mut float_sum: f64 = 0.0;
963 for tuple in group {
964 for (i, var) in vars.iter().enumerate() {
965 env.vars.insert(*var, tuple[i].clone());
966 }
967 let val = self.eval_operand(arg, env)?;
968 float_sum += value_as_float(&val)?;
969 }
970 Ok(Value::Float(float_sum))
971 }
972 "fn:max" => {
973 let mut max_val = None;
974 let arg = agg
975 .args
976 .first()
977 .ok_or_else(|| anyhow!("fn:max requires 1 argument"))?;
978
979 for tuple in group {
980 for (i, var) in vars.iter().enumerate() {
981 env.vars.insert(*var, tuple[i].clone());
982 }
983 let val = self.eval_operand(arg, env)?;
984 match max_val {
985 None => max_val = Some(val),
986 Some(ref m) => {
987 if val > *m {
988 max_val = Some(val);
989 }
990 }
991 }
992 }
993 max_val.ok_or_else(|| anyhow!("fn:max on empty group"))
994 }
995 "fn:min" => {
996 let mut min_val = None;
997 let arg = agg
998 .args
999 .first()
1000 .ok_or_else(|| anyhow!("fn:min requires 1 argument"))?;
1001
1002 for tuple in group {
1003 for (i, var) in vars.iter().enumerate() {
1004 env.vars.insert(*var, tuple[i].clone());
1005 }
1006 let val = self.eval_operand(arg, env)?;
1007 match min_val {
1008 None => min_val = Some(val),
1009 Some(ref m) => {
1010 if val < *m {
1011 min_val = Some(val);
1012 }
1013 }
1014 }
1015 }
1016 min_val.ok_or_else(|| anyhow!("fn:min on empty group"))
1017 }
1018 "fn:float:max" => {
1019 let mut max_val: Option<f64> = None;
1020 let arg = agg
1021 .args
1022 .first()
1023 .ok_or_else(|| anyhow!("fn:float:max requires 1 argument"))?;
1024
1025 for tuple in group {
1026 for (i, var) in vars.iter().enumerate() {
1027 env.vars.insert(*var, tuple[i].clone());
1028 }
1029 let val = self.eval_operand(arg, env)?;
1030 let f = value_as_float(&val)?;
1031 max_val = Some(match max_val {
1032 None => f,
1033 Some(m) => f.max(m),
1034 });
1035 }
1036 max_val
1037 .map(Value::Float)
1038 .ok_or_else(|| anyhow!("fn:float:max on empty group"))
1039 }
1040 "fn:float:min" => {
1041 let mut min_val: Option<f64> = None;
1042 let arg = agg
1043 .args
1044 .first()
1045 .ok_or_else(|| anyhow!("fn:float:min requires 1 argument"))?;
1046
1047 for tuple in group {
1048 for (i, var) in vars.iter().enumerate() {
1049 env.vars.insert(*var, tuple[i].clone());
1050 }
1051 let val = self.eval_operand(arg, env)?;
1052 let f = value_as_float(&val)?;
1053 min_val = Some(match min_val {
1054 None => f,
1055 Some(m) => f.min(m),
1056 });
1057 }
1058 min_val
1059 .map(Value::Float)
1060 .ok_or_else(|| anyhow!("fn:float:min on empty group"))
1061 }
1062 _ => Err(anyhow!("Unknown aggregation function: {fn_name}")),
1063 }
1064 }
1065
1066 fn eval_cond(&self, cond: &Condition, env: &Env) -> Result<bool> {
1067 match cond {
1068 Condition::Cmp { op, left, right } => {
1069 let l = self.eval_operand(left, env)?;
1070 let r = self.eval_operand(right, env)?;
1071 match op {
1072 CmpOp::Eq => Ok(l == r),
1073 CmpOp::Neq => Ok(l != r),
1074 CmpOp::Lt => Ok(l < r),
1075 CmpOp::Le => Ok(l <= r),
1076 CmpOp::Gt => Ok(l > r),
1077 CmpOp::Ge => Ok(l >= r),
1078 }
1079 }
1080 Condition::Negation { relation, args } => {
1081 let rel_name = self.ir.resolve_name(*relation);
1082 let iter = self.store.scan(rel_name)?;
1083 for tuple in iter {
1084 let mut mat = true;
1085 if tuple.len() != args.len() {
1086 continue;
1087 }
1088 for (i, arg) in args.iter().enumerate() {
1089 let val = self.eval_operand(arg, env)?;
1090 if tuple[i] != val {
1091 mat = false;
1092 break;
1093 }
1094 }
1095 if mat {
1096 return Ok(false); }
1098 }
1099 Ok(true) }
1101 Condition::Call { function, args } => {
1102 let fn_name = self.ir.resolve_name(*function);
1103 let mut vals = Vec::new();
1104 for arg in args {
1105 vals.push(self.eval_operand(arg, env)?);
1106 }
1107 self.eval_builtin_predicate(fn_name, &vals)
1108 }
1109 }
1110 }
1111
1112 fn eval_builtin_predicate(&self, name: &str, vals: &[Value]) -> Result<bool> {
1113 match name {
1114 ":string:starts_with" => match (&vals[0], &vals[1]) {
1115 (Value::String(s), Value::String(p)) => Ok(s.starts_with(p.as_str())),
1116 _ => Err(anyhow!(":string:starts_with: expected string arguments")),
1117 },
1118 ":string:ends_with" => match (&vals[0], &vals[1]) {
1119 (Value::String(s), Value::String(p)) => Ok(s.ends_with(p.as_str())),
1120 _ => Err(anyhow!(":string:ends_with: expected string arguments")),
1121 },
1122 ":string:contains" => match (&vals[0], &vals[1]) {
1123 (Value::String(s), Value::String(p)) => Ok(s.contains(p.as_str())),
1124 _ => Err(anyhow!(":string:contains: expected string arguments")),
1125 },
1126 ":match_prefix" => match (&vals[0], &vals[1]) {
1127 (Value::Name(name), Value::Name(prefix)) => {
1128 Ok(name.starts_with(prefix.as_str()) && name.len() > prefix.len())
1129 }
1130 _ => Err(anyhow!(":match_prefix: expected name arguments")),
1131 },
1132 _ => Err(anyhow!("Unknown built-in predicate: {name}")),
1133 }
1134 }
1135
1136 fn eval_expr(&self, expr: &Expr, env: &Env) -> Result<Value> {
1137 match expr {
1138 Expr::Value(op) => self.eval_operand(op, env),
1139 Expr::Call { function, args } => {
1140 let fn_name = self.ir.resolve_name(*function);
1141 let mut vals = Vec::new();
1142 for arg in args {
1143 vals.push(self.eval_operand(arg, env)?);
1144 }
1145 eval_function(fn_name, &vals)
1146 }
1147 }
1148 }
1149
1150 fn eval_operand(&self, op: &Operand, env: &Env) -> Result<Value> {
1151 match op {
1152 Operand::Var(v) => env
1153 .vars
1154 .get(v)
1155 .cloned()
1156 .ok_or_else(|| anyhow!("Variable not found")),
1157 Operand::Const(c) => match c {
1158 Constant::Number(n) => Ok(Value::Number(*n)),
1159 Constant::Float(f) => Ok(Value::Float(*f)),
1160 Constant::String(sid) => {
1161 Ok(Value::String(self.ir.resolve_string(*sid).to_string()))
1162 }
1163 Constant::Name(nid) => Ok(Value::Name(self.ir.resolve_name(*nid).to_string())),
1164 Constant::Time(t) => Ok(Value::Time(*t)),
1165 Constant::Duration(d) => Ok(Value::Duration(*d)),
1166 },
1167 }
1168 }
1169}
1170
1171fn value_as_float(v: &Value) -> Result<f64> {
1174 match v {
1175 Value::Float(f) => Ok(*f),
1176 Value::Number(n) => Ok(*n as f64),
1177 _ => Err(anyhow!("expected numeric value, got {v}")),
1178 }
1179}
1180
1181fn value_to_string(v: &Value) -> String {
1182 match v {
1183 Value::Number(n) => n.to_string(),
1184 Value::Float(f) => format!("{f}"),
1185 Value::String(s) => s.clone(),
1186 Value::Name(s) => s.clone(),
1187 Value::Time(t) => format!("{}", Value::Time(*t)),
1188 Value::Duration(d) => format!("{}", Value::Duration(*d)),
1189 Value::Compound(kind, elems) => format!("{}", Value::Compound(*kind, elems.clone())),
1190 Value::Null => "null".to_string(),
1191 }
1192}
1193
1194pub fn eval_function(fn_name: &str, vals: &[Value]) -> Result<Value> {
1196 match fn_name {
1197 "fn:plus" => {
1199 let mut sum: i64 = 0;
1200 for v in vals {
1201 match v {
1202 Value::Number(n) => sum += n,
1203 _ => return Err(anyhow!("fn:plus: expected integer, got {v}")),
1204 }
1205 }
1206 Ok(Value::Number(sum))
1207 }
1208 "fn:minus" => {
1209 if vals.is_empty() {
1210 return Err(anyhow!("fn:minus: requires at least 1 argument"));
1211 }
1212 let first = match &vals[0] {
1213 Value::Number(n) => *n,
1214 v => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1215 };
1216 if vals.len() == 1 {
1217 return Ok(Value::Number(-first));
1218 }
1219 let mut result = first;
1220 for v in &vals[1..] {
1221 match v {
1222 Value::Number(n) => result -= n,
1223 _ => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1224 }
1225 }
1226 Ok(Value::Number(result))
1227 }
1228 "fn:mult" => {
1229 let mut product: i64 = 1;
1230 for v in vals {
1231 match v {
1232 Value::Number(n) => product *= n,
1233 _ => return Err(anyhow!("fn:mult: expected integer, got {v}")),
1234 }
1235 }
1236 Ok(Value::Number(product))
1237 }
1238 "fn:div" => {
1239 if vals.is_empty() {
1240 return Err(anyhow!("fn:div: requires at least 1 argument"));
1241 }
1242 let first = match &vals[0] {
1243 Value::Number(n) => *n,
1244 v => return Err(anyhow!("fn:div: expected integer, got {v}")),
1245 };
1246 if vals.len() == 1 {
1247 if first == 0 {
1248 return Err(anyhow!("Division by zero in fn:div"));
1249 }
1250 return Ok(Value::Number(1 / first));
1251 }
1252 let mut result = first;
1253 for v in &vals[1..] {
1254 match v {
1255 Value::Number(0) => return Err(anyhow!("Division by zero in fn:div")),
1256 Value::Number(n) => {
1257 result /= n;
1258 if result == 0 {
1259 return Ok(Value::Number(0));
1260 }
1261 }
1262 _ => return Err(anyhow!("fn:div: expected integer, got {v}")),
1263 }
1264 }
1265 Ok(Value::Number(result))
1266 }
1267
1268 "fn:float:plus" => {
1270 let mut sum: f64 = 0.0;
1271 for v in vals {
1272 sum += value_as_float(v)?;
1273 }
1274 Ok(Value::Float(sum))
1275 }
1276 "fn:float:minus" => {
1277 if vals.is_empty() {
1278 return Err(anyhow!("fn:float:minus: requires at least 1 argument"));
1279 }
1280 let first = value_as_float(&vals[0])?;
1281 if vals.len() == 1 {
1282 return Ok(Value::Float(-first));
1283 }
1284 let mut result = first;
1285 for v in &vals[1..] {
1286 result -= value_as_float(v)?;
1287 }
1288 Ok(Value::Float(result))
1289 }
1290 "fn:float:mult" => {
1291 let mut product: f64 = 1.0;
1292 for v in vals {
1293 product *= value_as_float(v)?;
1294 }
1295 Ok(Value::Float(product))
1296 }
1297 "fn:float:div" => {
1298 if vals.is_empty() {
1299 return Err(anyhow!("fn:float:div: requires at least 1 argument"));
1300 }
1301 let first = value_as_float(&vals[0])?;
1302 if vals.len() == 1 {
1303 if first == 0.0 {
1304 return Err(anyhow!("Division by zero in fn:float:div"));
1305 }
1306 return Ok(Value::Float(1.0 / first));
1307 }
1308 let mut result = first;
1309 for v in &vals[1..] {
1310 let d = value_as_float(v)?;
1311 if d == 0.0 {
1312 return Err(anyhow!("Division by zero in fn:float:div"));
1313 }
1314 result /= d;
1315 }
1316 Ok(Value::Float(result))
1317 }
1318 "fn:sqrt" => {
1319 if vals.len() != 1 {
1320 return Err(anyhow!("fn:sqrt: requires exactly 1 argument"));
1321 }
1322 let f = value_as_float(&vals[0])?;
1323 Ok(Value::Float(f.sqrt()))
1324 }
1325
1326 "fn:string:concat" => {
1328 let mut result = String::new();
1329 for v in vals {
1330 result.push_str(&value_to_string(v));
1331 }
1332 Ok(Value::String(result))
1333 }
1334 "fn:string:replace" => {
1335 if vals.len() != 4 {
1336 return Err(anyhow!("fn:string:replace: requires 4 arguments (string, old, new, count)"));
1337 }
1338 let s = match &vals[0] {
1339 Value::String(s) => s,
1340 v => return Err(anyhow!("fn:string:replace: first arg must be string, got {v}")),
1341 };
1342 let old = match &vals[1] {
1343 Value::String(s) => s,
1344 v => return Err(anyhow!("fn:string:replace: second arg must be string, got {v}")),
1345 };
1346 let new_s = match &vals[2] {
1347 Value::String(s) => s,
1348 v => return Err(anyhow!("fn:string:replace: third arg must be string, got {v}")),
1349 };
1350 let count = match &vals[3] {
1351 Value::Number(n) => *n,
1352 v => return Err(anyhow!("fn:string:replace: fourth arg must be number, got {v}")),
1353 };
1354 let result = if count < 0 {
1355 s.replace(old.as_str(), new_s.as_str())
1356 } else {
1357 s.replacen(old.as_str(), new_s.as_str(), count as usize)
1358 };
1359 Ok(Value::String(result))
1360 }
1361
1362 "fn:number:to_string" => {
1364 if vals.len() != 1 {
1365 return Err(anyhow!("fn:number:to_string: requires 1 argument"));
1366 }
1367 match &vals[0] {
1368 Value::Number(n) => Ok(Value::String(n.to_string())),
1369 v => Err(anyhow!("fn:number:to_string: expected number, got {v}")),
1370 }
1371 }
1372 "fn:float64:to_string" => {
1373 if vals.len() != 1 {
1374 return Err(anyhow!("fn:float64:to_string: requires 1 argument"));
1375 }
1376 match &vals[0] {
1377 Value::Float(f) => Ok(Value::String(format!("{f}"))),
1378 v => Err(anyhow!("fn:float64:to_string: expected float, got {v}")),
1379 }
1380 }
1381 "fn:name:to_string" => {
1382 if vals.len() != 1 {
1383 return Err(anyhow!("fn:name:to_string: requires 1 argument"));
1384 }
1385 match &vals[0] {
1386 Value::Name(s) => Ok(Value::String(s.clone())),
1387 v => Err(anyhow!("fn:name:to_string: expected name, got {v}")),
1388 }
1389 }
1390
1391 "fn:time:now" => {
1393 if !vals.is_empty() {
1394 return Err(anyhow!("fn:time:now: takes no arguments"));
1395 }
1396 let nanos = std::time::SystemTime::now()
1397 .duration_since(std::time::UNIX_EPOCH)
1398 .map_err(|e| anyhow!("fn:time:now: {e}"))?
1399 .as_nanos() as i64;
1400 Ok(Value::Time(nanos))
1401 }
1402 "fn:time:add" => {
1403 if vals.len() != 2 {
1404 return Err(anyhow!("fn:time:add: requires 2 arguments (time, duration)"));
1405 }
1406 match (&vals[0], &vals[1]) {
1407 (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t + d)),
1408 _ => Err(anyhow!("fn:time:add: expected (time, duration), got ({}, {})", vals[0], vals[1])),
1409 }
1410 }
1411 "fn:time:sub" => {
1412 if vals.len() != 2 {
1413 return Err(anyhow!("fn:time:sub: requires 2 arguments"));
1414 }
1415 match (&vals[0], &vals[1]) {
1416 (Value::Time(t1), Value::Time(t2)) => Ok(Value::Duration(t1 - t2)),
1417 (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t - d)),
1418 _ => Err(anyhow!("fn:time:sub: expected (time, time) or (time, duration)")),
1419 }
1420 }
1421 "fn:time:year" => time_component(vals, |secs, _| {
1422 let (y, _, _) = civil_from_epoch_secs(secs);
1423 y as i64
1424 }),
1425 "fn:time:month" => time_component(vals, |secs, _| {
1426 let (_, m, _) = civil_from_epoch_secs(secs);
1427 m as i64
1428 }),
1429 "fn:time:day" => time_component(vals, |secs, _| {
1430 let (_, _, d) = civil_from_epoch_secs(secs);
1431 d as i64
1432 }),
1433 "fn:time:hour" => time_component(vals, |secs, _| {
1434 secs.rem_euclid(86400) / 3600
1435 }),
1436 "fn:time:minute" => time_component(vals, |secs, _| {
1437 (secs.rem_euclid(86400) % 3600) / 60
1438 }),
1439 "fn:time:second" => time_component(vals, |secs, _| {
1440 secs.rem_euclid(86400) % 60
1441 }),
1442 "fn:time:from_unix_nanos" => {
1443 if vals.len() != 1 {
1444 return Err(anyhow!("fn:time:from_unix_nanos: requires 1 argument"));
1445 }
1446 match &vals[0] {
1447 Value::Number(n) => Ok(Value::Time(*n)),
1448 v => Err(anyhow!("fn:time:from_unix_nanos: expected number, got {v}")),
1449 }
1450 }
1451 "fn:time:to_unix_nanos" => {
1452 if vals.len() != 1 {
1453 return Err(anyhow!("fn:time:to_unix_nanos: requires 1 argument"));
1454 }
1455 match &vals[0] {
1456 Value::Time(t) => Ok(Value::Number(*t)),
1457 v => Err(anyhow!("fn:time:to_unix_nanos: expected time, got {v}")),
1458 }
1459 }
1460 "fn:time:trunc" => {
1461 if vals.len() != 2 {
1462 return Err(anyhow!("fn:time:trunc: requires 2 arguments (time, unit_name)"));
1463 }
1464 let t = match &vals[0] {
1465 Value::Time(t) => *t,
1466 v => return Err(anyhow!("fn:time:trunc: first arg must be time, got {v}")),
1467 };
1468 let unit_name = match &vals[1] {
1469 Value::Name(s) => s.as_str(),
1470 v => return Err(anyhow!("fn:time:trunc: second arg must be name, got {v}")),
1471 };
1472 let d: i64 = match unit_name {
1473 "/nanosecond" => 1,
1474 "/microsecond" => 1_000,
1475 "/millisecond" => 1_000_000,
1476 "/second" => 1_000_000_000,
1477 "/minute" => 60 * 1_000_000_000,
1478 "/hour" => 3600 * 1_000_000_000,
1479 "/day" => 24 * 3600 * 1_000_000_000,
1480 _ => return Err(anyhow!("fn:time:trunc: unknown unit {unit_name:?}")),
1481 };
1482 Ok(Value::Time(t - t.rem_euclid(d)))
1483 }
1484 "fn:time:format" => {
1485 if vals.len() != 2 {
1486 return Err(anyhow!("fn:time:format: requires 2 arguments (time, precision)"));
1487 }
1488 let t = match &vals[0] {
1489 Value::Time(t) => *t,
1490 v => return Err(anyhow!("fn:time:format: first arg must be time, got {v}")),
1491 };
1492 let precision = match &vals[1] {
1493 Value::String(s) => s.as_str(),
1494 v => return Err(anyhow!("fn:time:format: second arg must be name, got {v}")),
1495 };
1496 Ok(Value::String(format_time_with_precision(t, precision)?))
1497 }
1498 "fn:time:format_civil" => {
1499 if vals.len() != 3 {
1500 return Err(anyhow!("fn:time:format_civil: requires 3 arguments (time, timezone, precision)"));
1501 }
1502 let t = match &vals[0] {
1503 Value::Time(t) => *t,
1504 v => return Err(anyhow!("fn:time:format_civil: first arg must be time, got {v}")),
1505 };
1506 let tz = match &vals[1] {
1507 Value::String(s) => s.as_str(),
1508 v => return Err(anyhow!("fn:time:format_civil: second arg must be string, got {v}")),
1509 };
1510 let precision = match &vals[2] {
1511 Value::String(s) => s.as_str(),
1512 v => return Err(anyhow!("fn:time:format_civil: third arg must be name, got {v}")),
1513 };
1514 let offset = parse_timezone_offset(tz)?;
1515 let adjusted = t + offset * 1_000_000_000;
1516 let formatted = format_time_with_precision(adjusted, precision)?;
1517 Ok(Value::String(formatted))
1518 }
1519 "fn:time:parse_rfc3339" => {
1520 if vals.len() != 1 {
1521 return Err(anyhow!("fn:time:parse_rfc3339: requires 1 argument"));
1522 }
1523 match &vals[0] {
1524 Value::String(s) => {
1525 let nanos = parse_rfc3339_to_nanos(s)?;
1526 Ok(Value::Time(nanos))
1527 }
1528 v => Err(anyhow!("fn:time:parse_rfc3339: expected string, got {v}")),
1529 }
1530 }
1531 "fn:time:parse_civil" => {
1532 if vals.len() != 2 {
1533 return Err(anyhow!("fn:time:parse_civil: requires 2 arguments (string, timezone)"));
1534 }
1535 let s = match &vals[0] {
1536 Value::String(s) => s.as_str(),
1537 v => return Err(anyhow!("fn:time:parse_civil: first arg must be string, got {v}")),
1538 };
1539 let tz = match &vals[1] {
1540 Value::String(s) => s.as_str(),
1541 v => return Err(anyhow!("fn:time:parse_civil: second arg must be string, got {v}")),
1542 };
1543 let offset = parse_timezone_offset(tz)?;
1544 let nanos = parse_civil_datetime_to_nanos(s)?;
1545 Ok(Value::Time(nanos - offset * 1_000_000_000))
1547 }
1548
1549 "fn:duration:add" => {
1551 if vals.len() != 2 {
1552 return Err(anyhow!("fn:duration:add: requires 2 arguments"));
1553 }
1554 match (&vals[0], &vals[1]) {
1555 (Value::Duration(a), Value::Duration(b)) => Ok(Value::Duration(a + b)),
1556 _ => Err(anyhow!("fn:duration:add: expected (duration, duration)")),
1557 }
1558 }
1559 "fn:duration:mult" => {
1560 if vals.len() != 2 {
1561 return Err(anyhow!("fn:duration:mult: requires 2 arguments"));
1562 }
1563 match (&vals[0], &vals[1]) {
1564 (Value::Duration(d), Value::Number(n)) => Ok(Value::Duration(d * n)),
1565 (Value::Number(n), Value::Duration(d)) => Ok(Value::Duration(n * d)),
1566 _ => Err(anyhow!("fn:duration:mult: expected (duration, number) or (number, duration)")),
1567 }
1568 }
1569 "fn:duration:hours" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 60.0 * 1_000_000_000.0)),
1570 "fn:duration:minutes" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 1_000_000_000.0)),
1571 "fn:duration:seconds" => duration_component_float(vals, |nanos| nanos as f64 / 1_000_000_000.0),
1572 "fn:duration:nanos" => duration_component_int(vals, |nanos| nanos),
1573 "fn:duration:from_nanos" => {
1574 if vals.len() != 1 {
1575 return Err(anyhow!("fn:duration:from_nanos: requires 1 argument"));
1576 }
1577 match &vals[0] {
1578 Value::Number(n) => Ok(Value::Duration(*n)),
1579 v => Err(anyhow!("fn:duration:from_nanos: expected number, got {v}")),
1580 }
1581 }
1582 "fn:duration:from_hours" => duration_from(vals, "hours", 60 * 60 * 1_000_000_000),
1583 "fn:duration:from_minutes" => duration_from(vals, "minutes", 60 * 1_000_000_000),
1584 "fn:duration:from_seconds" => duration_from(vals, "seconds", 1_000_000_000),
1585 "fn:duration:parse" => {
1586 if vals.len() != 1 {
1587 return Err(anyhow!("fn:duration:parse: requires 1 argument"));
1588 }
1589 match &vals[0] {
1590 Value::String(s) => {
1591 let nanos = parse_duration_string(s)?;
1592 Ok(Value::Duration(nanos))
1593 }
1594 v => Err(anyhow!("fn:duration:parse: expected string, got {v}")),
1595 }
1596 }
1597
1598 "fn:list" => Ok(Value::Compound(CompoundKind::List, vals.to_vec())),
1600 "fn:pair" => {
1601 if vals.len() != 2 {
1602 return Err(anyhow!("fn:pair: requires exactly 2 arguments"));
1603 }
1604 Ok(Value::Compound(CompoundKind::Pair, vals.to_vec()))
1605 }
1606 "fn:struct" => {
1607 if vals.len() % 2 != 0 {
1609 return Err(anyhow!(
1610 "fn:struct: requires even number of arguments (field, value pairs)"
1611 ));
1612 }
1613 Ok(Value::Compound(CompoundKind::Struct, vals.to_vec()))
1614 }
1615 "fn:map" => {
1616 if vals.len() % 2 != 0 {
1618 return Err(anyhow!(
1619 "fn:map: requires even number of arguments (key, value pairs)"
1620 ));
1621 }
1622 Ok(Value::Compound(CompoundKind::Map, vals.to_vec()))
1623 }
1624
1625 "fn:list:get" => {
1627 if vals.len() != 2 {
1628 return Err(anyhow!("fn:list:get: requires 2 arguments (list, index)"));
1629 }
1630 match (&vals[0], &vals[1]) {
1631 (Value::Compound(_, elems), Value::Number(idx)) => {
1632 let i = *idx as usize;
1633 elems
1634 .get(i)
1635 .cloned()
1636 .ok_or_else(|| anyhow!("fn:list:get: index {i} out of bounds (len {})", elems.len()))
1637 }
1638 _ => Err(anyhow!("fn:list:get: expected (compound, number)")),
1639 }
1640 }
1641 "fn:list:len" | "fn:len" => {
1642 if vals.len() != 1 {
1643 return Err(anyhow!("fn:len: requires 1 argument"));
1644 }
1645 match &vals[0] {
1646 Value::Compound(_, elems) => Ok(Value::Number(elems.len() as i64)),
1647 _ => Err(anyhow!("fn:len: expected compound value")),
1648 }
1649 }
1650 "fn:pair:first" => {
1651 if vals.len() != 1 {
1652 return Err(anyhow!("fn:pair:first: requires 1 argument"));
1653 }
1654 match &vals[0] {
1655 Value::Compound(_, elems) if elems.len() >= 1 => Ok(elems[0].clone()),
1656 _ => Err(anyhow!("fn:pair:first: expected compound with at least 1 element")),
1657 }
1658 }
1659 "fn:pair:second" => {
1660 if vals.len() != 1 {
1661 return Err(anyhow!("fn:pair:second: requires 1 argument"));
1662 }
1663 match &vals[0] {
1664 Value::Compound(_, elems) if elems.len() >= 2 => Ok(elems[1].clone()),
1665 _ => Err(anyhow!("fn:pair:second: expected compound with at least 2 elements")),
1666 }
1667 }
1668 "fn:struct:get" | "fn:map:get" => {
1669 if vals.len() != 2 {
1670 return Err(anyhow!("{fn_name}: requires 2 arguments (compound, key)"));
1671 }
1672 match &vals[0] {
1673 Value::Compound(_, elems) => {
1674 for pair in elems.chunks_exact(2) {
1676 if pair[0] == vals[1] {
1677 return Ok(pair[1].clone());
1678 }
1679 }
1680 Err(anyhow!("{fn_name}: key not found"))
1681 }
1682 _ => Err(anyhow!("{fn_name}: expected compound value")),
1683 }
1684 }
1685 "fn:map:len" | "fn:struct:len" => {
1686 if vals.len() != 1 {
1687 return Err(anyhow!("{fn_name}: requires 1 argument"));
1688 }
1689 match &vals[0] {
1690 Value::Compound(_, elems) => Ok(Value::Number((elems.len() / 2) as i64)),
1691 _ => Err(anyhow!("{fn_name}: expected compound value")),
1692 }
1693 }
1694 "fn:map:keys" => {
1695 if vals.len() != 1 {
1696 return Err(anyhow!("fn:map:keys: requires 1 argument"));
1697 }
1698 match &vals[0] {
1699 Value::Compound(_, elems) => {
1700 let keys: Vec<Value> = elems.chunks_exact(2).map(|p| p[0].clone()).collect();
1701 Ok(Value::Compound(CompoundKind::List, keys))
1702 }
1703 _ => Err(anyhow!("fn:map:keys: expected compound value")),
1704 }
1705 }
1706 "fn:map:values" | "fn:struct:values" => {
1707 if vals.len() != 1 {
1708 return Err(anyhow!("{fn_name}: requires 1 argument"));
1709 }
1710 match &vals[0] {
1711 Value::Compound(_, elems) => {
1712 let values: Vec<Value> = elems.chunks_exact(2).map(|p| p[1].clone()).collect();
1713 Ok(Value::Compound(CompoundKind::List, values))
1714 }
1715 _ => Err(anyhow!("{fn_name}: expected compound value")),
1716 }
1717 }
1718
1719 _ => Err(anyhow!("Unknown function: {fn_name}")),
1720 }
1721}
1722
1723fn time_component(vals: &[Value], extract: impl Fn(i64, i64) -> i64) -> Result<Value> {
1724 if vals.len() != 1 {
1725 return Err(anyhow!("time component function: requires 1 argument"));
1726 }
1727 match &vals[0] {
1728 Value::Time(nanos) => {
1729 let secs = nanos.div_euclid(1_000_000_000);
1730 let sub_nanos = nanos.rem_euclid(1_000_000_000);
1731 Ok(Value::Number(extract(secs, sub_nanos)))
1732 }
1733 v => Err(anyhow!("time component function: expected time, got {v}")),
1734 }
1735}
1736
1737fn civil_from_epoch_secs(secs: i64) -> (i32, u32, u32) {
1738 let days = secs.div_euclid(86400);
1739 let z = days + 719468;
1740 let era = (if z >= 0 { z } else { z - 146096 }) / 146097;
1741 let doe = (z - era * 146097) as u32;
1742 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1743 let y = yoe as i64 + era * 400;
1744 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1745 let mp = (5 * doy + 2) / 153;
1746 let d = doy - (153 * mp + 2) / 5 + 1;
1747 let m = if mp < 10 { mp + 3 } else { mp - 9 };
1748 let y = if m <= 2 { y + 1 } else { y };
1749 (y as i32, m, d)
1750}
1751
1752fn duration_component_float(vals: &[Value], extract: impl Fn(i64) -> f64) -> Result<Value> {
1753 if vals.len() != 1 {
1754 return Err(anyhow!("duration component function: requires 1 argument"));
1755 }
1756 match &vals[0] {
1757 Value::Duration(nanos) => Ok(Value::Float(extract(*nanos))),
1758 v => Err(anyhow!("duration component function: expected duration, got {v}")),
1759 }
1760}
1761
1762fn duration_component_int(vals: &[Value], extract: impl Fn(i64) -> i64) -> Result<Value> {
1763 if vals.len() != 1 {
1764 return Err(anyhow!("duration component function: requires 1 argument"));
1765 }
1766 match &vals[0] {
1767 Value::Duration(nanos) => Ok(Value::Number(extract(*nanos))),
1768 v => Err(anyhow!("duration component function: expected duration, got {v}")),
1769 }
1770}
1771
1772fn duration_from(vals: &[Value], name: &str, multiplier: i64) -> Result<Value> {
1773 if vals.len() != 1 {
1774 return Err(anyhow!("fn:duration:from_{name}: requires 1 argument"));
1775 }
1776 match &vals[0] {
1777 Value::Number(n) => Ok(Value::Duration(n * multiplier)),
1778 v => Err(anyhow!("fn:duration:from_{name}: expected number, got {v}")),
1779 }
1780}
1781
1782fn format_time_with_precision(nanos: i64, precision: &str) -> Result<String> {
1784 let secs = nanos.div_euclid(1_000_000_000);
1785 let sub_nanos = nanos.rem_euclid(1_000_000_000);
1786 let (y, m, d) = civil_from_epoch_secs(secs);
1787 let time_of_day = secs.rem_euclid(86400);
1788 let hour = time_of_day / 3600;
1789 let minute = (time_of_day % 3600) / 60;
1790 let second = time_of_day % 60;
1791
1792 match precision {
1793 "/year" => Ok(format!("{y:04}")),
1794 "/month" => Ok(format!("{y:04}-{m:02}")),
1795 "/day" => Ok(format!("{y:04}-{m:02}-{d:02}")),
1796 "/hour" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}Z")),
1797 "/minute" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}Z")),
1798 "/second" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z")),
1799 "/millisecond" => {
1800 let ms = sub_nanos / 1_000_000;
1801 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ms:03}Z"))
1802 }
1803 "/microsecond" => {
1804 let us = sub_nanos / 1_000;
1805 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{us:06}Z"))
1806 }
1807 "/nanosecond" => {
1808 if sub_nanos == 0 {
1809 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z"))
1810 } else {
1811 let ns_str = format!("{sub_nanos:09}");
1812 let ns_trimmed = ns_str.trim_end_matches('0');
1813 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ns_trimmed}Z"))
1814 }
1815 }
1816 _ => Err(anyhow!("unknown time precision: {precision:?}")),
1817 }
1818}
1819
1820fn parse_timezone_offset(tz: &str) -> Result<i64> {
1823 match tz {
1824 "UTC" => Ok(0),
1825 s if s.starts_with('+') || s.starts_with('-') => {
1826 let sign: i64 = if s.starts_with('-') { -1 } else { 1 };
1827 let rest = &s[1..];
1828 let parts: Vec<&str> = rest.split(':').collect();
1829 if parts.len() != 2 {
1830 return Err(anyhow!("invalid timezone offset: {tz:?}"));
1831 }
1832 let hours: i64 = parts[0].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1833 let minutes: i64 = parts[1].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1834 Ok(sign * (hours * 3600 + minutes * 60))
1835 }
1836 _ => Err(anyhow!("unsupported timezone: {tz:?} (use \"UTC\" or offset like \"+05:30\")")),
1837 }
1838}
1839
1840fn parse_rfc3339_to_nanos(s: &str) -> Result<i64> {
1842 if s.len() < 10 {
1844 return Err(anyhow!("fn:time:parse_rfc3339: string too short: {s:?}"));
1845 }
1846 let year: i64 = s[0..4].parse().map_err(|_| anyhow!("invalid year in {s:?}"))?;
1847 let month: u32 = s[5..7].parse().map_err(|_| anyhow!("invalid month in {s:?}"))?;
1848 let day: u32 = s[8..10].parse().map_err(|_| anyhow!("invalid day in {s:?}"))?;
1849
1850 let (hour, minute, second, frac_nanos) = if s.len() > 10 && s.as_bytes()[10] == b'T' {
1851 if s.len() < 19 {
1852 return Err(anyhow!("fn:time:parse_rfc3339: incomplete time in {s:?}"));
1853 }
1854 let h: u32 = s[11..13].parse().map_err(|_| anyhow!("invalid hour"))?;
1855 let min: u32 = s[14..16].parse().map_err(|_| anyhow!("invalid minute"))?;
1856 let sec: u32 = s[17..19].parse().map_err(|_| anyhow!("invalid second"))?;
1857
1858 let frac = if s.len() > 19 && s.as_bytes()[19] == b'.' {
1859 let end = s.len() - if s.ends_with('Z') { 1 } else { 0 };
1860 let frac_str = &s[20..end];
1861 let padded = format!("{frac_str:0<9}");
1862 padded[..9].parse::<i64>().unwrap_or(0)
1863 } else {
1864 0
1865 };
1866 (h, min, sec, frac)
1867 } else {
1868 (0, 0, 0, 0)
1869 };
1870
1871 let y = if month <= 2 { year - 1 } else { year };
1872 let era = (if y >= 0 { y } else { y - 399 }) / 400;
1873 let yoe = (y - era * 400) as u32;
1874 let m_adj = if month > 2 { month - 3 } else { month + 9 };
1875 let doy = (153 * m_adj + 2) / 5 + day - 1;
1876 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1877 let days = era * 146097 + doe as i64 - 719468;
1878
1879 let total_seconds = days * 86400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64;
1880 Ok(total_seconds * 1_000_000_000 + frac_nanos)
1881}
1882
1883fn parse_civil_datetime_to_nanos(s: &str) -> Result<i64> {
1885 parse_rfc3339_to_nanos(s)
1887}
1888
1889fn parse_duration_string(s: &str) -> Result<i64> {
1891 if s.is_empty() {
1892 return Err(anyhow!("fn:duration:parse: empty string"));
1893 }
1894 if s == "0" || s == "0s" {
1895 return Ok(0);
1896 }
1897
1898 let (sign, mut rest) = if s.starts_with('-') {
1899 (-1i64, &s[1..])
1900 } else if s.starts_with('+') {
1901 (1i64, &s[1..])
1902 } else {
1903 (1i64, s)
1904 };
1905
1906 let mut total_nanos: i64 = 0;
1907
1908 while !rest.is_empty() {
1909 let num_end = rest
1911 .find(|c: char| !c.is_ascii_digit() && c != '.')
1912 .unwrap_or(rest.len());
1913 if num_end == 0 {
1914 return Err(anyhow!("fn:duration:parse: expected number in {s:?}"));
1915 }
1916 let num_str = &rest[..num_end];
1917 rest = &rest[num_end..];
1918
1919 let (unit_nanos, unit_len) = if rest.starts_with("ns") {
1921 (1i64, 2)
1922 } else if rest.starts_with("us") || rest.starts_with("µs") {
1923 (1_000i64, if rest.starts_with("µ") { 3 } else { 2 })
1924 } else if rest.starts_with("ms") {
1925 (1_000_000i64, 2)
1926 } else if rest.starts_with('s') {
1927 (1_000_000_000i64, 1)
1928 } else if rest.starts_with('m') {
1929 (60 * 1_000_000_000i64, 1)
1930 } else if rest.starts_with('h') {
1931 (3600 * 1_000_000_000i64, 1)
1932 } else {
1933 return Err(anyhow!("fn:duration:parse: unknown unit in {s:?}"));
1934 };
1935 rest = &rest[unit_len..];
1936
1937 if num_str.contains('.') {
1938 let val: f64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
1939 total_nanos += (val * unit_nanos as f64) as i64;
1940 } else {
1941 let val: i64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
1942 total_nanos += val * unit_nanos;
1943 }
1944 }
1945
1946 Ok(sign * total_nanos)
1947}
1948
1949#[cfg(test)]
1950mod tests {
1951 use super::*;
1952
1953 #[test]
1954 fn test_retract_existing() {
1955 let mut store = MemStore::new();
1956 store.add_fact("r", vec![Value::Number(1), Value::Number(2)]);
1957 store.add_fact("r", vec![Value::Number(3), Value::Number(4)]);
1958
1959 let removed = store
1960 .retract("r", &[Value::Number(1), Value::Number(2)])
1961 .unwrap();
1962 assert!(removed);
1963
1964 let facts = store.get_facts("r");
1965 assert_eq!(facts.len(), 1);
1966 assert_eq!(facts[0], vec![Value::Number(3), Value::Number(4)]);
1967 }
1968
1969 #[test]
1970 fn test_retract_nonexistent() {
1971 let mut store = MemStore::new();
1972 store.add_fact("r", vec![Value::Number(1)]);
1973
1974 let removed = store.retract("r", &[Value::Number(99)]).unwrap();
1975 assert!(!removed);
1976
1977 let facts = store.get_facts("r");
1978 assert_eq!(facts.len(), 1);
1979 assert_eq!(facts[0], vec![Value::Number(1)]);
1980 }
1981
1982 #[test]
1983 fn test_clear() {
1984 let mut store = MemStore::new();
1985 store.add_fact("r", vec![Value::Number(1)]);
1986 store.add_fact("r", vec![Value::Number(2)]);
1987 store.add_fact("s", vec![Value::Number(10)]);
1988
1989 store.clear("r");
1990
1991 let r_facts = store.get_facts("r");
1992 assert!(r_facts.is_empty());
1993
1994 let s_facts = store.get_facts("s");
1996 assert_eq!(s_facts.len(), 1);
1997 }
1998
1999 #[test]
2000 fn test_relation_names() {
2001 let mut store = MemStore::new();
2002 store.create_relation("alpha");
2003 store.create_relation("beta");
2004 store.add_fact("gamma", vec![Value::Number(1)]);
2005
2006 let mut names = store.relation_names();
2007 names.sort();
2008 assert_eq!(names, vec!["alpha", "beta", "gamma"]);
2009 }
2010
2011 #[test]
2012 fn test_provenance_recording() {
2013 use mangle_ir::physical::{DataSource, Operand};
2014
2015 let mut ir = mangle_ir::Ir::new();
2017 let base_name = ir.intern_name("base");
2018 let derived_name = ir.intern_name("derived");
2019 let var_x = ir.intern_name("X");
2020
2021 let op = Op::Iterate {
2023 source: DataSource::Scan {
2024 relation: base_name,
2025 vars: vec![var_x],
2026 },
2027 body: Box::new(Op::Insert {
2028 relation: derived_name,
2029 args: vec![Operand::Var(var_x)],
2030 }),
2031 };
2032
2033 let mut store = Box::new(MemStore::new());
2034 store.add_fact("base", vec![Value::Number(10)]);
2035 store.add_fact("base", vec![Value::Number(20)]);
2036 store.create_relation("derived");
2037
2038 let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>).with_provenance();
2039
2040 let count = interpreter.execute(&op).unwrap();
2041 assert_eq!(count, 2);
2042
2043 let prov = interpreter.provenance.as_ref().unwrap();
2045 assert_eq!(prov.entries.len(), 2);
2046
2047 for entry in &prov.entries {
2049 assert_eq!(entry.derived.0, "derived");
2050 assert_eq!(entry.premises.len(), 1);
2051 assert_eq!(entry.premises[0].0, "base");
2052 }
2053
2054 let mut derived_vals: Vec<i64> = prov
2056 .entries
2057 .iter()
2058 .map(|e| match &e.derived.1[0] {
2059 Value::Number(n) => *n,
2060 _ => panic!("expected number"),
2061 })
2062 .collect();
2063 derived_vals.sort();
2064 assert_eq!(derived_vals, vec![10, 20]);
2065 }
2066
2067 #[test]
2068 fn test_float_values() {
2069 use mangle_ir::physical::{self, DataSource, Expr, Operand};
2070
2071 let mut ir = mangle_ir::Ir::new();
2072 let temps = ir.intern_name("temps");
2073 let result = ir.intern_name("result");
2074 let var_x = ir.intern_name("X");
2075 let simple_op = Op::Iterate {
2077 source: DataSource::Scan {
2078 relation: temps,
2079 vars: vec![var_x],
2080 },
2081 body: Box::new(Op::Insert {
2082 relation: result,
2083 args: vec![Operand::Var(var_x)],
2084 }),
2085 };
2086
2087 let mut store = Box::new(MemStore::new());
2088 store.add_fact("temps", vec![Value::Float(36.5)]);
2089 store.add_fact("temps", vec![Value::Float(35.9)]);
2090 store.add_fact("temps", vec![Value::Float(37.2)]);
2091 store.create_relation("result");
2092
2093 let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>);
2094 let count = interpreter.execute(&simple_op).unwrap();
2095 assert_eq!(count, 3, "basic float scan+insert should produce 3 facts");
2096
2097 let mut ir2 = mangle_ir::Ir::new();
2099 let temps2 = ir2.intern_name("temps");
2100 let result2 = ir2.intern_name("result2");
2101 let var_x2 = ir2.intern_name("X");
2102 let var_y2 = ir2.intern_name("Y");
2103 let fn_plus2 = ir2.intern_name("fn:float:plus");
2104
2105 let op = Op::Iterate {
2106 source: DataSource::Scan {
2107 relation: temps2,
2108 vars: vec![var_x2],
2109 },
2110 body: Box::new(Op::Filter {
2111 cond: Condition::Cmp {
2112 op: physical::CmpOp::Gt,
2113 left: Operand::Var(var_x2),
2114 right: Operand::Const(physical::Constant::Float(36.0)),
2115 },
2116 body: Box::new(Op::Let {
2117 var: var_y2,
2118 expr: Expr::Call {
2119 function: fn_plus2,
2120 args: vec![
2121 Operand::Var(var_x2),
2122 Operand::Const(physical::Constant::Float(0.5)),
2123 ],
2124 },
2125 body: Box::new(Op::Insert {
2126 relation: result2,
2127 args: vec![Operand::Var(var_x2), Operand::Var(var_y2)],
2128 }),
2129 }),
2130 }),
2131 };
2132
2133 let mut store2 = Box::new(MemStore::new());
2134 store2.add_fact("temps", vec![Value::Float(36.5)]);
2135 store2.add_fact("temps", vec![Value::Float(35.9)]);
2136 store2.add_fact("temps", vec![Value::Float(37.2)]);
2137 store2.create_relation("result2");
2138
2139 let mut interpreter2 = Interpreter::new(&ir2, store2 as Box<dyn Store>);
2140 let count2 = interpreter2.execute(&op).unwrap();
2141
2142 assert_eq!(count2, 2);
2144
2145 let results: Vec<_> = interpreter2
2147 .store()
2148 .scan_next_delta("result2")
2149 .unwrap()
2150 .collect();
2151 assert_eq!(results.len(), 2);
2152
2153 let mut output: Vec<(f64, f64)> = results
2154 .iter()
2155 .map(|t| match (&t[0], &t[1]) {
2156 (Value::Float(a), Value::Float(b)) => (*a, *b),
2157 _ => panic!("expected floats"),
2158 })
2159 .collect();
2160 output.sort_by(|a, b| a.0.total_cmp(&b.0));
2161
2162 assert_eq!(output[0], (36.5, 37.0));
2163 assert_eq!(output[1], (37.2, 37.7));
2164 }
2165
2166 #[test]
2167 fn test_float_in_memstore() {
2168 let mut store = MemStore::new();
2170 store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2171 store.add_fact("data", vec![Value::Float(2.5), Value::Number(20)]);
2172 store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2174
2175 let facts = store.get_facts("data");
2176 assert_eq!(facts.len(), 2);
2177
2178 let results: Vec<_> = store
2180 .scan_index("data", 0, &Value::Float(1.5))
2181 .unwrap()
2182 .collect();
2183 assert_eq!(results.len(), 1);
2184 assert_eq!(results[0][1], Value::Number(10));
2185 }
2186
2187 #[test]
2190 fn test_fn_plus_variadic() {
2191 assert_eq!(
2192 eval_function("fn:plus", &[Value::Number(1), Value::Number(2), Value::Number(3)]).unwrap(),
2193 Value::Number(6)
2194 );
2195 assert_eq!(eval_function("fn:plus", &[]).unwrap(), Value::Number(0));
2197 }
2198
2199 #[test]
2200 fn test_fn_minus_variadic() {
2201 assert_eq!(
2203 eval_function("fn:minus", &[Value::Number(5)]).unwrap(),
2204 Value::Number(-5)
2205 );
2206 assert_eq!(
2208 eval_function("fn:minus", &[Value::Number(10), Value::Number(3)]).unwrap(),
2209 Value::Number(7)
2210 );
2211 assert_eq!(
2213 eval_function("fn:minus", &[Value::Number(100), Value::Number(10), Value::Number(20)]).unwrap(),
2214 Value::Number(70)
2215 );
2216 assert!(eval_function("fn:minus", &[]).is_err());
2218 }
2219
2220 #[test]
2221 fn test_fn_mult_variadic() {
2222 assert_eq!(
2223 eval_function("fn:mult", &[Value::Number(2), Value::Number(3), Value::Number(4)]).unwrap(),
2224 Value::Number(24)
2225 );
2226 assert_eq!(eval_function("fn:mult", &[]).unwrap(), Value::Number(1));
2228 }
2229
2230 #[test]
2231 fn test_fn_div() {
2232 assert_eq!(
2234 eval_function("fn:div", &[Value::Number(10), Value::Number(3)]).unwrap(),
2235 Value::Number(3)
2236 );
2237 assert_eq!(
2239 eval_function("fn:div", &[Value::Number(5)]).unwrap(),
2240 Value::Number(0)
2241 );
2242 assert_eq!(
2243 eval_function("fn:div", &[Value::Number(1)]).unwrap(),
2244 Value::Number(1)
2245 );
2246 assert!(eval_function("fn:div", &[Value::Number(1), Value::Number(0)]).is_err());
2248 assert!(eval_function("fn:div", &[Value::Number(0)]).is_err());
2249 }
2250
2251 #[test]
2252 fn test_fn_float_promotion() {
2253 assert_eq!(
2255 eval_function("fn:float:plus", &[Value::Float(1.5), Value::Number(2)]).unwrap(),
2256 Value::Float(3.5)
2257 );
2258 assert_eq!(
2260 eval_function("fn:sqrt", &[Value::Number(9)]).unwrap(),
2261 Value::Float(3.0)
2262 );
2263 }
2264
2265 #[test]
2266 fn test_fn_string_concat() {
2267 assert_eq!(
2268 eval_function(
2269 "fn:string:concat",
2270 &[Value::String("a".into()), Value::String("b".into()), Value::String("c".into())]
2271 ).unwrap(),
2272 Value::String("abc".to_string())
2273 );
2274 assert_eq!(
2276 eval_function(
2277 "fn:string:concat",
2278 &[Value::String("n=".into()), Value::Number(42)]
2279 ).unwrap(),
2280 Value::String("n=42".to_string())
2281 );
2282 }
2283
2284 #[test]
2285 fn test_fn_string_replace() {
2286 assert_eq!(
2288 eval_function(
2289 "fn:string:replace",
2290 &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(-1)]
2291 ).unwrap(),
2292 Value::String("a_b_c".to_string())
2293 );
2294 assert_eq!(
2296 eval_function(
2297 "fn:string:replace",
2298 &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(1)]
2299 ).unwrap(),
2300 Value::String("a_b-c".to_string())
2301 );
2302 }
2303
2304 #[test]
2305 fn test_fn_to_string() {
2306 assert_eq!(
2307 eval_function("fn:number:to_string", &[Value::Number(42)]).unwrap(),
2308 Value::String("42".to_string())
2309 );
2310 assert_eq!(
2311 eval_function("fn:float64:to_string", &[Value::Float(3.14)]).unwrap(),
2312 Value::String("3.14".to_string())
2313 );
2314 assert_eq!(
2315 eval_function("fn:name:to_string", &[Value::Name("/role/admin".into())]).unwrap(),
2316 Value::String("/role/admin".to_string())
2317 );
2318 }
2319
2320 fn date_nanos(year: i64, month: u32, day: u32) -> i64 {
2326 let m = month;
2328 let y = if m <= 2 { year - 1 } else { year };
2329 let era = (if y >= 0 { y } else { y - 399 }) / 400;
2330 let yoe = (y - era * 400) as u32;
2331 let m_adj = if m > 2 { m - 3 } else { m + 9 };
2332 let doy = (153 * m_adj + 2) / 5 + day - 1;
2333 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2334 let days = era * 146097 + doe as i64 - 719468;
2335 days * 86400 * 1_000_000_000
2336 }
2337
2338 fn datetime_nanos(year: i64, month: u32, day: u32, h: u32, m: u32, s: u32, ns: i64) -> i64 {
2339 date_nanos(year, month, day) + (h as i64) * 3_600_000_000_000
2340 + (m as i64) * 60_000_000_000 + (s as i64) * 1_000_000_000 + ns
2341 }
2342
2343 #[test]
2345 fn test_coalesce_overlapping() {
2346 let mut store = MemStore::new();
2347 let jan1 = date_nanos(2024, 1, 1);
2348 let jan15 = date_nanos(2024, 1, 15);
2349 let jan10 = date_nanos(2024, 1, 10);
2350 let jan25 = date_nanos(2024, 1, 25);
2351 let jan20 = date_nanos(2024, 1, 20);
2352 let jan31 = date_nanos(2024, 1, 31);
2353
2354 store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan1), Value::Time(jan15)]);
2355 store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan10), Value::Time(jan25)]);
2356 store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan20), Value::Time(jan31)]);
2357
2358 assert_eq!(store.get_facts("active").len(), 3);
2359
2360 store.coalesce_temporal("active");
2361
2362 let facts = store.get_facts("active");
2363 assert_eq!(facts.len(), 1, "after coalesce: expected 1, got {:?}", facts);
2364 assert_eq!(facts[0][1], Value::Time(jan1), "start should be Jan 1");
2365 assert_eq!(facts[0][2], Value::Time(jan31), "end should be Jan 31");
2366 }
2367
2368 #[test]
2370 fn test_coalesce_adjacent() {
2371 let mut store = MemStore::new();
2372 let shift1_start = datetime_nanos(2024, 1, 1, 8, 0, 0, 0);
2373 let shift1_end = datetime_nanos(2024, 1, 1, 16, 0, 0, 0);
2374 let shift2_start = datetime_nanos(2024, 1, 1, 16, 0, 0, 1); let shift2_end = date_nanos(2024, 1, 2);
2376
2377 store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift1_start), Value::Time(shift1_end)]);
2378 store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift2_start), Value::Time(shift2_end)]);
2379
2380 store.coalesce_temporal("shift");
2381
2382 let facts = store.get_facts("shift");
2383 assert_eq!(facts.len(), 1, "adjacent intervals should coalesce");
2384 assert_eq!(facts[0][1], Value::Time(shift1_start));
2385 assert_eq!(facts[0][2], Value::Time(shift2_end));
2386 }
2387
2388 #[test]
2390 fn test_coalesce_non_overlapping() {
2391 let mut store = MemStore::new();
2392 let jan1 = date_nanos(2024, 1, 1);
2393 let jan7 = date_nanos(2024, 1, 7);
2394 let jun1 = date_nanos(2024, 6, 1);
2395 let jun14 = date_nanos(2024, 6, 14);
2396
2397 store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan7)]);
2398 store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jun1), Value::Time(jun14)]);
2399
2400 store.coalesce_temporal("vacation");
2401
2402 let facts = store.get_facts("vacation");
2403 assert_eq!(facts.len(), 2, "non-overlapping intervals should stay separate");
2404 }
2405
2406 #[test]
2408 fn test_coalesce_mixed_granularity() {
2409 let mut store = MemStore::new();
2410 let t1_start = datetime_nanos(2024, 1, 1, 10, 0, 0, 0);
2412 let t1_end = datetime_nanos(2024, 1, 1, 10, 0, 5, 0);
2413 let t2_start = datetime_nanos(2024, 1, 1, 10, 0, 4, 500_000_000);
2415 let t2_end = datetime_nanos(2024, 1, 1, 10, 0, 6, 0);
2416 let t3_start = datetime_nanos(2024, 1, 1, 10, 0, 6, 1);
2418 let t3_end = datetime_nanos(2024, 1, 1, 10, 0, 7, 0);
2419
2420 store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t1_start), Value::Time(t1_end)]);
2421 store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t2_start), Value::Time(t2_end)]);
2422 store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t3_start), Value::Time(t3_end)]);
2423
2424 store.coalesce_temporal("event");
2425
2426 let facts = store.get_facts("event");
2427 assert_eq!(facts.len(), 1, "mixed granularity should coalesce to 1");
2428 assert_eq!(facts[0][1], Value::Time(t1_start), "start: 10:00:00");
2429 assert_eq!(facts[0][2], Value::Time(t3_end), "end: 10:00:07");
2430 }
2431
2432 #[test]
2434 fn test_coalesce_multiple_keys() {
2435 let mut store = MemStore::new();
2436 let jan1 = date_nanos(2024, 1, 1);
2437 let jan10 = date_nanos(2024, 1, 10);
2438 let jan5 = date_nanos(2024, 1, 5);
2439 let jan15 = date_nanos(2024, 1, 15);
2440
2441 store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan10)]);
2443 store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan5), Value::Time(jan15)]);
2444 store.add_fact("employed", vec![Value::String("/bob".into()), Value::Time(jan1), Value::Time(jan15)]);
2446
2447 store.coalesce_temporal("employed");
2448
2449 let facts = store.get_facts("employed");
2450 assert_eq!(facts.len(), 2, "expected 2 facts after coalesce, got {:?}", facts);
2452 }
2453
2454 fn nested_loop_two_way(
2464 ir: &mangle_ir::Ir,
2465 a: NameId,
2466 b: NameId,
2467 result: NameId,
2468 x: NameId,
2469 z: NameId,
2470 y: NameId,
2471 z_right: NameId,
2472 ) -> Op {
2473 use mangle_ir::physical::{CmpOp, Condition, DataSource, Operand};
2474 let _ = ir;
2475 Op::Iterate {
2476 source: DataSource::Scan {
2477 relation: a,
2478 vars: vec![x, z],
2479 },
2480 body: Box::new(Op::Iterate {
2481 source: DataSource::Scan {
2482 relation: b,
2483 vars: vec![z_right, y],
2484 },
2485 body: Box::new(Op::Filter {
2486 cond: Condition::Cmp {
2487 op: CmpOp::Eq,
2488 left: Operand::Var(z),
2489 right: Operand::Var(z_right),
2490 },
2491 body: Box::new(Op::Insert {
2492 relation: result,
2493 args: vec![Operand::Var(x), Operand::Var(y)],
2494 }),
2495 }),
2496 }),
2497 }
2498 }
2499
2500 fn hash_join_two_way(
2502 a: NameId,
2503 b: NameId,
2504 result: NameId,
2505 x: NameId,
2506 z: NameId,
2507 y: NameId,
2508 ) -> Op {
2509 use mangle_ir::physical::{DataSource, Operand};
2510 Op::HashJoin {
2511 build_source: DataSource::Scan {
2512 relation: a,
2513 vars: vec![x, z],
2514 },
2515 probe_source: DataSource::Scan {
2516 relation: b,
2517 vars: vec![z, y],
2518 },
2519 join_keys: vec![z],
2520 body: Box::new(Op::Insert {
2521 relation: result,
2522 args: vec![Operand::Var(x), Operand::Var(y)],
2523 }),
2524 }
2525 }
2526
2527 fn run_plan(ir: &mangle_ir::Ir, facts: &[(&str, Vec<Value>)], op: &Op) -> Vec<Vec<Value>> {
2528 let mut store = Box::new(MemStore::new());
2529 for (rel, t) in facts {
2530 store.add_fact(rel, t.clone());
2531 }
2532 store.create_relation("result");
2533 let mut interp = Interpreter::new(ir, store as Box<dyn Store>);
2534 interp.execute(op).unwrap();
2535 let mut store = interp.into_store();
2537 store.merge_deltas();
2538 store.merge_deltas();
2539 store.scan("result").unwrap().collect()
2540 }
2541
2542 fn sorted(mut v: Vec<Vec<Value>>) -> Vec<Vec<Value>> {
2543 v.sort();
2544 v
2545 }
2546
2547 fn setup_two_way_ir()
2549 -> (mangle_ir::Ir, NameId, NameId, NameId, NameId, NameId, NameId, NameId) {
2550 let mut ir = mangle_ir::Ir::new();
2551 let a = ir.intern_name("a");
2552 let b = ir.intern_name("b");
2553 let result = ir.intern_name("result");
2554 let x = ir.intern_name("X");
2555 let z = ir.intern_name("Z");
2556 let y = ir.intern_name("Y");
2557 let z_right = ir.intern_name("Z_right");
2560 (ir, a, b, result, x, z, y, z_right)
2561 }
2562
2563 #[test]
2564 fn test_hashjoin_matches_nested_loop_basic() {
2565 let (ir, a, b, result, x, z, y, z_right) = setup_two_way_ir();
2566
2567 let facts: Vec<(&str, Vec<Value>)> = vec![
2568 ("a", vec![Value::Number(1), Value::Number(10)]),
2569 ("a", vec![Value::Number(2), Value::Number(20)]),
2570 ("a", vec![Value::Number(3), Value::Number(10)]),
2571 ("b", vec![Value::Number(10), Value::Number(100)]),
2572 ("b", vec![Value::Number(10), Value::Number(101)]),
2573 ("b", vec![Value::Number(20), Value::Number(200)]),
2574 ("b", vec![Value::Number(30), Value::Number(300)]),
2575 ];
2576
2577 let baseline = run_plan(
2578 &ir,
2579 &facts,
2580 &nested_loop_two_way(&ir, a, b, result, x, z, y, z_right),
2581 );
2582 let via_hash = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2583
2584 assert_eq!(
2585 sorted(baseline.clone()),
2586 sorted(via_hash.clone()),
2587 "HashJoin output must match nested-loop baseline"
2588 );
2589 assert_eq!(sorted(via_hash).len(), 5);
2591 }
2592
2593 #[test]
2594 fn test_hashjoin_empty_build() {
2595 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2596 let facts: Vec<(&str, Vec<Value>)> = vec![
2597 ("b", vec![Value::Number(10), Value::Number(100)]),
2598 ("b", vec![Value::Number(20), Value::Number(200)]),
2599 ];
2600 let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2601 assert!(out.is_empty());
2602 }
2603
2604 #[test]
2605 fn test_hashjoin_empty_probe() {
2606 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2607 let facts: Vec<(&str, Vec<Value>)> = vec![
2608 ("a", vec![Value::Number(1), Value::Number(10)]),
2609 ("a", vec![Value::Number(2), Value::Number(20)]),
2610 ];
2611 let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2612 assert!(out.is_empty());
2613 }
2614
2615 #[test]
2616 fn test_hashjoin_no_matches() {
2617 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2618 let facts: Vec<(&str, Vec<Value>)> = vec![
2619 ("a", vec![Value::Number(1), Value::Number(10)]),
2620 ("b", vec![Value::Number(99), Value::Number(200)]),
2621 ];
2622 let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2623 assert!(out.is_empty());
2624 }
2625
2626 #[test]
2627 fn test_hashjoin_value_variants_as_key() {
2628 let mut ir = mangle_ir::Ir::new();
2631 let a = ir.intern_name("a");
2632 let b = ir.intern_name("b");
2633 let result = ir.intern_name("result");
2634 let x = ir.intern_name("X");
2635 let k = ir.intern_name("K");
2636 let y = ir.intern_name("Y");
2637
2638 let facts: Vec<(&str, Vec<Value>)> = vec![
2639 ("a", vec![Value::Number(1), Value::String("hello".into())]),
2640 ("a", vec![Value::Number(2), Value::Name("/foo".into())]),
2641 ("b", vec![Value::String("hello".into()), Value::Number(100)]),
2642 ("b", vec![Value::Name("/foo".into()), Value::Number(200)]),
2643 (
2645 "b",
2646 vec![Value::String("/foo".into()), Value::Number(999)],
2647 ),
2648 ];
2649 let op = hash_join_two_way(a, b, result, x, k, y);
2650 let out = sorted(run_plan(&ir, &facts, &op));
2651 assert_eq!(
2652 out,
2653 sorted(vec![
2654 vec![Value::Number(1), Value::Number(100)],
2655 vec![Value::Number(2), Value::Number(200)],
2656 ])
2657 );
2658 }
2659
2660 #[test]
2661 fn test_hashjoin_multi_key() {
2662 let mut ir = mangle_ir::Ir::new();
2665 let a = ir.intern_name("a");
2666 let b = ir.intern_name("b");
2667 let result = ir.intern_name("result");
2668 let x = ir.intern_name("X");
2669 let k1 = ir.intern_name("K1");
2670 let k2 = ir.intern_name("K2");
2671 let w = ir.intern_name("W");
2672
2673 let op = Op::HashJoin {
2674 build_source: DataSource::Scan {
2675 relation: a,
2676 vars: vec![x, k1, k2],
2677 },
2678 probe_source: DataSource::Scan {
2679 relation: b,
2680 vars: vec![k1, k2, w],
2681 },
2682 join_keys: vec![k1, k2],
2683 body: Box::new(Op::Insert {
2684 relation: result,
2685 args: vec![Operand::Var(x), Operand::Var(w)],
2686 }),
2687 };
2688
2689 let facts: Vec<(&str, Vec<Value>)> = vec![
2690 (
2691 "a",
2692 vec![Value::Number(1), Value::Number(10), Value::Number(100)],
2693 ),
2694 (
2695 "a",
2696 vec![Value::Number(2), Value::Number(10), Value::Number(200)],
2697 ),
2698 (
2700 "a",
2701 vec![Value::Number(3), Value::Number(10), Value::Number(999)],
2702 ),
2703 (
2704 "b",
2705 vec![Value::Number(10), Value::Number(100), Value::Number(1000)],
2706 ),
2707 (
2708 "b",
2709 vec![Value::Number(10), Value::Number(200), Value::Number(2000)],
2710 ),
2711 ];
2712
2713 let out = sorted(run_plan(&ir, &facts, &op));
2714 assert_eq!(
2715 out,
2716 sorted(vec![
2717 vec![Value::Number(1), Value::Number(1000)],
2718 vec![Value::Number(2), Value::Number(2000)],
2719 ])
2720 );
2721 }
2722
2723 #[test]
2724 fn test_hashjoin_duplicate_build_keys() {
2725 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2728 let facts: Vec<(&str, Vec<Value>)> = vec![
2729 ("a", vec![Value::Number(1), Value::Number(10)]),
2730 ("a", vec![Value::Number(2), Value::Number(10)]),
2731 ("a", vec![Value::Number(3), Value::Number(10)]),
2732 ("b", vec![Value::Number(10), Value::Number(99)]),
2733 ];
2734 let op = hash_join_two_way(a, b, result, x, z, y);
2735 let out = sorted(run_plan(&ir, &facts, &op));
2736 assert_eq!(
2737 out,
2738 sorted(vec![
2739 vec![Value::Number(1), Value::Number(99)],
2740 vec![Value::Number(2), Value::Number(99)],
2741 vec![Value::Number(3), Value::Number(99)],
2742 ])
2743 );
2744 }
2745}