1use anyhow::{Result, anyhow};
30use mangle_ir::physical::{Aggregate, CmpOp, Condition, Constant, DataSource, Expr, Op, Operand};
31use mangle_ir::{Ir, NameId};
32use std::collections::HashMap;
33
34pub use mangle_common::{CompoundKind, Store, Value};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
40enum Cell {
41 Val(Value),
42 CompoundStart(CompoundKind, usize),
44}
45
46fn flatten_value(v: &Value, out: &mut Vec<Cell>) {
50 match v {
51 Value::Compound(kind, elems) => {
52 out.push(Cell::CompoundStart(*kind, elems.len()));
53 for elem in elems {
54 flatten_value(elem, out);
55 }
56 }
57 _ => out.push(Cell::Val(v.clone())),
58 }
59}
60
61fn flatten_tuple(tuple: &[Value]) -> Vec<Cell> {
63 let mut cells = Vec::new();
64 for v in tuple {
65 flatten_value(v, &mut cells);
66 }
67 cells
68}
69
70fn unflatten_one(cells: &[Cell], pos: &mut usize) -> Value {
72 match &cells[*pos] {
73 Cell::Val(v) => {
74 *pos += 1;
75 v.clone()
76 }
77 Cell::CompoundStart(kind, n) => {
78 let kind = *kind;
79 let n = *n;
80 *pos += 1;
81 let mut elems = Vec::with_capacity(n);
82 for _ in 0..n {
83 elems.push(unflatten_one(cells, pos));
84 }
85 Value::Compound(kind, elems)
86 }
87 }
88}
89
90fn skip_one(cells: &[Cell], pos: &mut usize) {
92 match &cells[*pos] {
93 Cell::Val(_) => *pos += 1,
94 Cell::CompoundStart(_, n) => {
95 let n = *n;
96 *pos += 1;
97 for _ in 0..n {
98 skip_one(cells, pos);
99 }
100 }
101 }
102}
103
104fn count_logical_values(cells: &[Cell]) -> usize {
106 let mut count = 0;
107 let mut pos = 0;
108 while pos < cells.len() {
109 count += 1;
110 skip_one(cells, &mut pos);
111 }
112 count
113}
114
115fn unflatten_tuple(cells: &[Cell], n_cols: usize) -> Vec<Value> {
117 let mut pos = 0;
118 let mut tuple = Vec::with_capacity(n_cols);
119 for _ in 0..n_cols {
120 tuple.push(unflatten_one(cells, &mut pos));
121 }
122 tuple
123}
124
125#[derive(Default)]
132pub struct MemStore {
133 stable: HashMap<String, Vec<Vec<Cell>>>,
135 delta: HashMap<String, Vec<Vec<Cell>>>,
137 next_delta: HashMap<String, Vec<Vec<Cell>>>,
139
140 arity: HashMap<String, usize>,
142
143 stable_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
147 delta_indexes: HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
148}
149
150impl MemStore {
151 pub fn new() -> Self {
152 Self::default()
153 }
154
155 pub fn create_relation(&mut self, relation: &str) {
157 self.stable.entry(relation.to_string()).or_default();
158 }
159
160 pub fn add_fact(&mut self, relation: &str, args: Vec<Value>) {
162 let n_cols = args.len();
163 let cells = flatten_tuple(&args);
164 let table = self.stable.entry(relation.to_string()).or_default();
165 if !table.contains(&cells) {
166 let row_idx = table.len();
167 self.arity.entry(relation.to_string()).or_insert(n_cols);
168 table.push(cells.clone());
169 index_cells(&mut self.stable_indexes, relation, &cells, n_cols, row_idx);
170 }
171 }
172
173 fn rebuild_indexes_for(&mut self, relation: &str) {
175 self.stable_indexes.retain(|(rel, _), _| rel != relation);
176 self.delta_indexes.retain(|(rel, _), _| rel != relation);
177
178 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
179
180 if let Some(table) = self.stable.get(relation) {
181 for (row_idx, cells) in table.iter().enumerate() {
182 index_cells(&mut self.stable_indexes, relation, cells, n_cols, row_idx);
183 }
184 }
185
186 if let Some(table) = self.delta.get(relation) {
187 for (row_idx, cells) in table.iter().enumerate() {
188 index_cells(&mut self.delta_indexes, relation, cells, n_cols, row_idx);
189 }
190 }
191 }
192
193 fn to_values(&self, relation: &str, cells: &[Cell]) -> Vec<Value> {
195 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
196 if n_cols == 0 {
197 cells
199 .iter()
200 .map(|c| match c {
201 Cell::Val(v) => v.clone(),
202 Cell::CompoundStart(_, _) => Value::Null,
203 })
204 .collect()
205 } else {
206 unflatten_tuple(cells, n_cols)
207 }
208 }
209
210 pub fn get_facts(&self, relation: &str) -> Vec<Vec<Value>> {
211 let mut all: Vec<Vec<Value>> = self
212 .stable
213 .get(relation)
214 .into_iter()
215 .flatten()
216 .map(|cells| self.to_values(relation, cells))
217 .collect();
218 if let Some(d) = self.delta.get(relation) {
219 for cells in d {
220 all.push(self.to_values(relation, cells));
221 }
222 }
223 all
224 }
225
226 pub fn coalesce_temporal(&mut self, relation: &str) {
231 let n_cols = match self.arity.get(relation) {
232 Some(&n) if n >= 2 => n,
233 _ => return,
234 };
235 let key_cols = n_cols - 2; let mut all_facts: Vec<Vec<Value>> = Vec::new();
239 if let Some(table) = self.stable.get(relation) {
240 for cells in table {
241 if count_logical_values(cells) != n_cols {
243 eprintln!("[mangle] coalesce_temporal: skipping malformed cells in '{relation}' (expected {n_cols} logical cols, got {})", count_logical_values(cells));
244 continue;
245 }
246 all_facts.push(unflatten_tuple(cells, n_cols));
247 }
248 }
249 if let Some(table) = self.delta.get(relation) {
250 for cells in table {
251 if count_logical_values(cells) != n_cols {
252 eprintln!("[mangle] coalesce_temporal: skipping malformed cells in delta '{relation}' (expected {n_cols} logical cols, got {})", count_logical_values(cells));
253 continue;
254 }
255 all_facts.push(unflatten_tuple(cells, n_cols));
256 }
257 }
258
259 if all_facts.is_empty() {
260 return;
261 }
262
263 let mut groups: HashMap<Vec<Value>, Vec<(i64, i64)>> = HashMap::new();
265 for fact in &all_facts {
266 let key: Vec<Value> = fact[..key_cols].to_vec();
267 let start = match &fact[key_cols] {
268 Value::Time(t) => *t,
269 Value::Number(n) => *n,
270 _ => continue,
271 };
272 let end = match &fact[key_cols + 1] {
273 Value::Time(t) => *t,
274 Value::Number(n) => *n,
275 _ => continue,
276 };
277 groups.entry(key).or_default().push((start, end));
278 }
279
280 let mut coalesced_facts: Vec<Vec<Value>> = Vec::new();
282 for (key, mut intervals) in groups {
283 intervals.sort_by_key(|&(s, _)| s);
284 let mut merged: Vec<(i64, i64)> = vec![intervals[0]];
285 for &(s, e) in &intervals[1..] {
286 let last = merged.last_mut().unwrap();
287 if s <= last.1.saturating_add(1) {
289 last.1 = last.1.max(e);
290 } else {
291 merged.push((s, e));
292 }
293 }
294 for (start, end) in merged {
295 let mut fact = key.clone();
296 fact.push(Value::Time(start));
297 fact.push(Value::Time(end));
298 coalesced_facts.push(fact);
299 }
300 }
301
302 let coalesced_cells: Vec<Vec<Cell>> = coalesced_facts
304 .iter()
305 .map(|fact| flatten_tuple(fact))
306 .collect();
307 self.stable.insert(relation.to_string(), coalesced_cells);
308 self.delta.remove(relation);
309 self.next_delta.remove(relation);
310 self.rebuild_indexes_for(relation);
311 }
312}
313
314fn index_cells(
316 indexes: &mut HashMap<(String, usize), HashMap<Cell, Vec<usize>>>,
317 relation: &str,
318 cells: &[Cell],
319 n_cols: usize,
320 row_idx: usize,
321) {
322 let mut pos = 0;
323 for col_idx in 0..n_cols {
324 let key = cells[pos].clone();
325 skip_one(cells, &mut pos);
326 indexes
327 .entry((relation.to_string(), col_idx))
328 .or_default()
329 .entry(key)
330 .or_default()
331 .push(row_idx);
332 }
333}
334
335fn value_to_index_key(v: &Value) -> Cell {
337 match v {
338 Value::Compound(kind, elems) => Cell::CompoundStart(*kind, elems.len()),
339 _ => Cell::Val(v.clone()),
340 }
341}
342
343impl Store for MemStore {
344 fn scan(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
345 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
346 let s = self
347 .stable
348 .get(relation)
349 .into_iter()
350 .flatten()
351 .map(move |cells| unflatten_tuple(cells, n_cols));
352 let d = self
353 .delta
354 .get(relation)
355 .into_iter()
356 .flatten()
357 .map(move |cells| unflatten_tuple(cells, n_cols));
358 Ok(Box::new(s.chain(d)))
359 }
360
361 fn scan_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
362 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
363 match self.delta.get(relation) {
364 Some(tuples) => Ok(Box::new(
365 tuples
366 .iter()
367 .map(move |cells| unflatten_tuple(cells, n_cols)),
368 )),
369 None => Ok(Box::new(std::iter::empty())),
370 }
371 }
372
373 fn scan_next_delta(&self, relation: &str) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
374 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
375 match self.next_delta.get(relation) {
376 Some(tuples) => Ok(Box::new(
377 tuples
378 .iter()
379 .map(move |cells| unflatten_tuple(cells, n_cols)),
380 )),
381 None => Ok(Box::new(std::iter::empty())),
382 }
383 }
384
385 fn scan_index(
386 &self,
387 relation: &str,
388 col_idx: usize,
389 key: &Value,
390 ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
391 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
392 let cell_key = value_to_index_key(key);
393 let mut results: Vec<Vec<Value>> = Vec::new();
394
395 if let Some(idx_map) = self.stable_indexes.get(&(relation.to_string(), col_idx))
396 && let Some(row_indices) = idx_map.get(&cell_key)
397 && let Some(table) = self.stable.get(relation)
398 {
399 for &i in row_indices {
400 results.push(unflatten_tuple(&table[i], n_cols));
401 }
402 }
403
404 if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
405 && let Some(row_indices) = idx_map.get(&cell_key)
406 && let Some(table) = self.delta.get(relation)
407 {
408 for &i in row_indices {
409 results.push(unflatten_tuple(&table[i], n_cols));
410 }
411 }
412
413 Ok(Box::new(results.into_iter()))
414 }
415
416 fn scan_delta_index(
417 &self,
418 relation: &str,
419 col_idx: usize,
420 key: &Value,
421 ) -> Result<Box<dyn Iterator<Item = Vec<Value>> + '_>> {
422 let n_cols = self.arity.get(relation).copied().unwrap_or(0);
423 let cell_key = value_to_index_key(key);
424 let mut results: Vec<Vec<Value>> = Vec::new();
425
426 if let Some(idx_map) = self.delta_indexes.get(&(relation.to_string(), col_idx))
427 && let Some(row_indices) = idx_map.get(&cell_key)
428 && let Some(table) = self.delta.get(relation)
429 {
430 for &i in row_indices {
431 results.push(unflatten_tuple(&table[i], n_cols));
432 }
433 }
434
435 Ok(Box::new(results.into_iter()))
436 }
437
438 fn insert(&mut self, relation: &str, tuple: Vec<Value>) -> Result<bool> {
439 let n_cols = tuple.len();
440 let cells = flatten_tuple(&tuple);
441
442 if self
444 .stable
445 .get(relation)
446 .is_some_and(|v| v.contains(&cells))
447 || self
448 .delta
449 .get(relation)
450 .is_some_and(|v| v.contains(&cells))
451 || self
452 .next_delta
453 .get(relation)
454 .is_some_and(|v| v.contains(&cells))
455 {
456 return Ok(false);
457 }
458
459 let existing_arity = self.arity.get(relation).copied();
461 match existing_arity {
462 Some(reg) if reg != n_cols => {
463 eprintln!("[mangle] ARITY MISMATCH in relation '{relation}': registered arity={reg}, inserting tuple with {n_cols} cols, cells.len()={}. Skipping.", cells.len());
464 return Ok(false);
465 }
466 None => {
467 eprintln!("[mangle] Registering arity for relation '{relation}': {n_cols} cols");
468 }
469 _ => {}
470 }
471
472 self.arity.entry(relation.to_string()).or_insert(n_cols);
473 self.next_delta
474 .entry(relation.to_string())
475 .or_default()
476 .push(cells);
477 Ok(true)
478 }
479
480 fn merge_deltas(&mut self) {
481 for (rel_name, mut tuples) in self.delta.drain() {
483 let n_cols = self.arity.get(&rel_name).copied().unwrap_or(0);
484 let table = self.stable.entry(rel_name.clone()).or_default();
485 for (i, cells) in tuples.drain(..).enumerate() {
486 if count_logical_values(&cells) != n_cols {
488 eprintln!("[mangle] SKIP: relation '{rel_name}' arity={n_cols} but tuple[{i}] has {} logical cols. Skipping.", count_logical_values(&cells));
489 continue;
490 }
491 let row_idx = table.len();
492 index_cells(
493 &mut self.stable_indexes,
494 &rel_name,
495 &cells,
496 n_cols,
497 row_idx,
498 );
499 table.push(cells);
500 }
501 }
502 self.delta_indexes.clear();
503
504 self.delta = std::mem::take(&mut self.next_delta);
506 for (rel_name, tuples) in &self.delta {
507 let n_cols = self.arity.get(rel_name).copied().unwrap_or(0);
508 for (row_idx, cells) in tuples.iter().enumerate() {
509 if count_logical_values(cells) != n_cols {
511 eprintln!("[mangle] SKIP: relation '{rel_name}' arity={n_cols} but delta tuple[{row_idx}] has {} logical cols. Skipping.", count_logical_values(cells));
512 continue;
513 }
514 index_cells(
515 &mut self.delta_indexes,
516 rel_name,
517 cells,
518 n_cols,
519 row_idx,
520 );
521 }
522 }
523 }
524
525 fn create_relation(&mut self, relation: &str) {
526 self.stable.entry(relation.to_string()).or_default();
527 }
528
529 fn retract(&mut self, relation: &str, tuple: &[Value]) -> Result<bool> {
530 let cells = flatten_tuple(tuple);
531 let removed = if let Some(table) = self.stable.get_mut(relation) {
532 if let Some(pos) = table.iter().position(|t| *t == cells) {
533 table.swap_remove(pos);
534 true
535 } else {
536 false
537 }
538 } else {
539 false
540 };
541
542 if let Some(table) = self.delta.get_mut(relation) {
544 if let Some(pos) = table.iter().position(|t| *t == cells) {
545 table.swap_remove(pos);
546 }
547 }
548 if let Some(table) = self.next_delta.get_mut(relation) {
549 if let Some(pos) = table.iter().position(|t| *t == cells) {
550 table.swap_remove(pos);
551 }
552 }
553
554 if removed {
555 self.rebuild_indexes_for(relation);
556 }
557 Ok(removed)
558 }
559
560 fn clear(&mut self, relation: &str) {
561 if let Some(table) = self.stable.get_mut(relation) {
562 table.clear();
563 }
564 if let Some(table) = self.delta.get_mut(relation) {
565 table.clear();
566 }
567 if let Some(table) = self.next_delta.get_mut(relation) {
568 table.clear();
569 }
570 self.stable_indexes.retain(|(rel, _), _| rel != relation);
571 self.delta_indexes.retain(|(rel, _), _| rel != relation);
572 }
573
574 fn relation_names(&self) -> Vec<String> {
575 let mut names: Vec<String> = self.stable.keys().cloned().collect();
576 for key in self.delta.keys() {
577 if !names.contains(key) {
578 names.push(key.clone());
579 }
580 }
581 names
582 }
583
584 fn coalesce_temporal(&mut self, relation: &str) {
585 MemStore::coalesce_temporal(self, relation);
586 }
587}
588
589#[derive(Debug, Clone)]
591pub struct ProvenanceEntry {
592 pub derived: (String, Vec<Value>),
594 pub premises: Vec<(String, Vec<Value>)>,
596}
597
598#[derive(Default)]
604pub struct ProvenanceRecorder {
605 pub entries: Vec<ProvenanceEntry>,
607 active_premises: Vec<(String, Vec<Value>)>,
609}
610
611pub struct Interpreter<'a> {
613 ir: &'a Ir,
614 store: Box<dyn Store + 'a>,
615 provenance: Option<ProvenanceRecorder>,
616}
617
618struct Env {
619 vars: HashMap<NameId, Value>,
620}
621
622impl Env {
623 fn new() -> Self {
624 Self {
625 vars: HashMap::new(),
626 }
627 }
628}
629
630impl<'a> Interpreter<'a> {
631 pub fn new(ir: &'a Ir, store: Box<dyn Store + 'a>) -> Self {
632 Self {
633 ir,
634 store,
635 provenance: None,
636 }
637 }
638
639 pub fn with_provenance(mut self) -> Self {
642 self.provenance = Some(ProvenanceRecorder::default());
643 self
644 }
645
646 pub fn store(&self) -> &dyn Store {
648 &*self.store
649 }
650
651 pub fn store_mut(&mut self) -> &mut dyn Store {
653 &mut *self.store
654 }
655
656 pub fn into_store(self) -> Box<dyn Store + 'a> {
658 self.store
659 }
660
661 pub fn into_provenance(self) -> Option<ProvenanceRecorder> {
663 self.provenance
664 }
665
666 pub fn provenance(&self) -> Option<&ProvenanceRecorder> {
672 self.provenance.as_ref()
673 }
674
675 pub fn into_parts(self) -> (Box<dyn Store + 'a>, Option<ProvenanceRecorder>) {
677 (self.store, self.provenance)
678 }
679
680 pub fn execute(&mut self, op: &Op) -> Result<usize> {
682 let mut env = Env::new();
683 self.exec_op(op, &mut env)
684 }
685
686 fn exec_op(&mut self, op: &Op, env: &mut Env) -> Result<usize> {
687 match op {
688 Op::Nop => Ok(0),
689 Op::Seq(ops) => {
690 let mut count = 0;
691 for o in ops {
692 count += self.exec_op(o, env)?;
693 }
694 Ok(count)
695 }
696 Op::Iterate { source, body } => {
697 let mut count = 0;
698 match source {
699 DataSource::Scan { relation, vars } => {
700 let rel_name = self.ir.resolve_name(*relation);
701 let iter = self.store.scan(rel_name)?;
702 let tuples: Vec<_> = iter.collect();
703
704 for tuple in tuples {
705 if tuple.len() != vars.len() {
706 continue;
707 }
708 for (i, var) in vars.iter().enumerate() {
709 env.vars.insert(*var, tuple[i].clone());
710 }
711 if let Some(ref mut prov) = self.provenance {
712 prov.active_premises
713 .push((rel_name.to_string(), tuple.clone()));
714 }
715 count += self.exec_op(body, env)?;
716 if self.provenance.is_some() {
717 self.provenance.as_mut().unwrap().active_premises.pop();
718 }
719 }
720 }
721 DataSource::ScanDelta { relation, vars } => {
722 let rel_name = self.ir.resolve_name(*relation);
723 let iter = self.store.scan_delta(rel_name)?;
724 let tuples: Vec<_> = iter.collect();
725
726 for tuple in tuples {
727 if tuple.len() != vars.len() {
728 continue;
729 }
730 for (i, var) in vars.iter().enumerate() {
731 env.vars.insert(*var, tuple[i].clone());
732 }
733 if let Some(ref mut prov) = self.provenance {
734 prov.active_premises
735 .push((rel_name.to_string(), tuple.clone()));
736 }
737 count += self.exec_op(body, env)?;
738 if self.provenance.is_some() {
739 self.provenance.as_mut().unwrap().active_premises.pop();
740 }
741 }
742 }
743 DataSource::IndexLookup {
744 relation,
745 col_idx,
746 key,
747 vars,
748 } => {
749 let rel_name = self.ir.resolve_name(*relation);
750 let key_val = self.eval_operand(key, env)?;
751
752 let iter = self.store.scan_index(rel_name, *col_idx, &key_val)?;
753 let tuples: Vec<_> = iter.collect();
754
755 for tuple in tuples {
756 if tuple.len() != vars.len() {
757 continue;
758 }
759 for (i, var) in vars.iter().enumerate() {
760 env.vars.insert(*var, tuple[i].clone());
761 }
762 if let Some(ref mut prov) = self.provenance {
763 prov.active_premises
764 .push((rel_name.to_string(), tuple.clone()));
765 }
766 count += self.exec_op(body, env)?;
767 if self.provenance.is_some() {
768 self.provenance.as_mut().unwrap().active_premises.pop();
769 }
770 }
771 }
772 }
773 Ok(count)
774 }
775 Op::Filter { cond, body } => {
776 if self.eval_cond(cond, env)? {
777 self.exec_op(body, env)
778 } else {
779 Ok(0)
780 }
781 }
782 Op::Insert { relation, args } => {
783 let rel_name = self.ir.resolve_name(*relation);
784 let mut tuple = Vec::new();
785 for arg in args {
786 tuple.push(self.eval_operand(arg, env)?);
787 }
788 let is_new = self.store.insert(rel_name, tuple.clone())?;
789 if is_new {
790 if let Some(ref mut prov) = self.provenance {
791 prov.entries.push(ProvenanceEntry {
792 derived: (rel_name.to_string(), tuple),
793 premises: prov.active_premises.clone(),
794 });
795 }
796 Ok(1)
797 } else {
798 Ok(0)
799 }
800 }
801 Op::Let { var, expr, body } => {
802 let val = self.eval_expr(expr, env)?;
803 env.vars.insert(*var, val);
804 self.exec_op(body, env)
805 }
806 Op::HashJoin {
807 build_source,
808 probe_source,
809 join_keys,
810 body,
811 } => self.exec_hash_join(build_source, probe_source, join_keys, body, env),
812 Op::GroupBy {
813 source,
814 vars,
815 keys,
816 aggregates,
817 body,
818 } => {
819 let rel_name = self.ir.resolve_name(*source);
820
821 let iter = self.store.scan(rel_name)?;
824 let mut tuples: Vec<_> = iter.collect();
825
826 if let Ok(nd_iter) = self.store.scan_next_delta(rel_name) {
828 tuples.extend(nd_iter);
829 }
830
831 let mut groups: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
832
833 for tuple in tuples {
834 if tuple.len() != vars.len() {
835 continue;
836 }
837 for (i, var) in vars.iter().enumerate() {
839 env.vars.insert(*var, tuple[i].clone());
840 }
841
842 let mut key = Vec::new();
843 for k in keys {
844 if let Some(val) = env.vars.get(k) {
845 key.push(val.clone());
846 } else {
847 key.push(Value::Null);
849 }
850 }
851 groups.entry(key).or_default().push(tuple);
852 }
853
854 let mut count = 0;
855 for (key, group_tuples) in groups {
856 for (i, k) in keys.iter().enumerate() {
858 env.vars.insert(*k, key[i].clone());
859 }
860
861 for agg in aggregates {
863 let val = self.eval_aggregate(agg, &group_tuples, vars, env)?;
864 env.vars.insert(agg.var, val);
865 }
866
867 count += self.exec_op(body, env)?;
868 }
869 Ok(count)
870 }
871 }
872 }
873
874 fn collect_data_source(
878 &mut self,
879 source: &DataSource,
880 env: &mut Env,
881 ) -> Result<(Vec<Vec<Value>>, Vec<NameId>)> {
882 match source {
883 DataSource::Scan { relation, vars } => {
884 let rel_name = self.ir.resolve_name(*relation);
885 let tuples: Vec<_> = self.store.scan(rel_name)?.collect();
886 Ok((tuples, vars.clone()))
887 }
888 DataSource::ScanDelta { relation, vars } => {
889 let rel_name = self.ir.resolve_name(*relation);
890 let tuples: Vec<_> = self.store.scan_delta(rel_name)?.collect();
891 Ok((tuples, vars.clone()))
892 }
893 DataSource::IndexLookup {
894 relation,
895 col_idx,
896 key,
897 vars,
898 } => {
899 let rel_name = self.ir.resolve_name(*relation);
900 let key_val = self.eval_operand(key, env)?;
901 let tuples: Vec<_> =
902 self.store.scan_index(rel_name, *col_idx, &key_val)?.collect();
903 Ok((tuples, vars.clone()))
904 }
905 }
906 }
907
908 fn exec_hash_join(
909 &mut self,
910 build_source: &DataSource,
911 probe_source: &DataSource,
912 join_keys: &[NameId],
913 body: &Op,
914 env: &mut Env,
915 ) -> Result<usize> {
916 let (build_tuples, build_vars) = self.collect_data_source(build_source, env)?;
917
918 let build_key_positions: Vec<usize> = join_keys
921 .iter()
922 .map(|k| {
923 build_vars
924 .iter()
925 .position(|v| v == k)
926 .ok_or_else(|| anyhow!("HashJoin: join key not in build_source vars"))
927 })
928 .collect::<Result<Vec<_>>>()?;
929
930 let mut table: HashMap<Vec<Value>, Vec<Vec<Value>>> = HashMap::new();
931 for tuple in build_tuples {
932 if tuple.len() != build_vars.len() {
933 continue;
934 }
935 let key: Vec<Value> = build_key_positions
936 .iter()
937 .map(|&i| tuple[i].clone())
938 .collect();
939 table.entry(key).or_default().push(tuple);
940 }
941
942 let (probe_tuples, probe_vars) = self.collect_data_source(probe_source, env)?;
943 let probe_key_positions: Vec<usize> = join_keys
944 .iter()
945 .map(|k| {
946 probe_vars
947 .iter()
948 .position(|v| v == k)
949 .ok_or_else(|| anyhow!("HashJoin: join key not in probe_source vars"))
950 })
951 .collect::<Result<Vec<_>>>()?;
952
953 let mut count = 0;
954 for probe_tuple in probe_tuples {
955 if probe_tuple.len() != probe_vars.len() {
956 continue;
957 }
958 let key: Vec<Value> = probe_key_positions
959 .iter()
960 .map(|&i| probe_tuple[i].clone())
961 .collect();
962 let Some(matches) = table.get(&key) else {
963 continue;
964 };
965 for (i, var) in probe_vars.iter().enumerate() {
967 env.vars.insert(*var, probe_tuple[i].clone());
968 }
969 for build_tuple in matches {
970 for (i, var) in build_vars.iter().enumerate() {
971 env.vars.insert(*var, build_tuple[i].clone());
972 }
973 count += self.exec_op(body, env)?;
974 }
975 }
976 Ok(count)
977 }
978
979 fn eval_aggregate(
980 &self,
981 agg: &Aggregate,
982 group: &[Vec<Value>],
983 vars: &[NameId],
984 env: &mut Env,
985 ) -> Result<Value> {
986 let fn_name = self.ir.resolve_name(agg.func);
987 match fn_name {
988 "fn:count" => Ok(Value::Number(group.len() as i64)),
989 "fn:sum" => {
990 let arg = agg
991 .args
992 .first()
993 .ok_or_else(|| anyhow!("fn:sum requires 1 argument"))?;
994
995 let mut int_sum: i64 = 0;
996 for tuple in group {
997 for (i, var) in vars.iter().enumerate() {
998 env.vars.insert(*var, tuple[i].clone());
999 }
1000 let val = self.eval_operand(arg, env)?;
1001 match val {
1002 Value::Number(n) => int_sum += n,
1003 _ => return Err(anyhow!("fn:sum: expected integer, got {val}")),
1004 }
1005 }
1006 Ok(Value::Number(int_sum))
1007 }
1008 "fn:float:sum" => {
1009 let arg = agg
1010 .args
1011 .first()
1012 .ok_or_else(|| anyhow!("fn:float:sum requires 1 argument"))?;
1013
1014 let mut float_sum: f64 = 0.0;
1015 for tuple in group {
1016 for (i, var) in vars.iter().enumerate() {
1017 env.vars.insert(*var, tuple[i].clone());
1018 }
1019 let val = self.eval_operand(arg, env)?;
1020 float_sum += value_as_float(&val)?;
1021 }
1022 Ok(Value::Float(float_sum))
1023 }
1024 "fn:max" => {
1025 let mut max_val = None;
1026 let arg = agg
1027 .args
1028 .first()
1029 .ok_or_else(|| anyhow!("fn:max requires 1 argument"))?;
1030
1031 for tuple in group {
1032 for (i, var) in vars.iter().enumerate() {
1033 env.vars.insert(*var, tuple[i].clone());
1034 }
1035 let val = self.eval_operand(arg, env)?;
1036 match max_val {
1037 None => max_val = Some(val),
1038 Some(ref m) => {
1039 if val > *m {
1040 max_val = Some(val);
1041 }
1042 }
1043 }
1044 }
1045 max_val.ok_or_else(|| anyhow!("fn:max on empty group"))
1046 }
1047 "fn:min" => {
1048 let mut min_val = None;
1049 let arg = agg
1050 .args
1051 .first()
1052 .ok_or_else(|| anyhow!("fn:min requires 1 argument"))?;
1053
1054 for tuple in group {
1055 for (i, var) in vars.iter().enumerate() {
1056 env.vars.insert(*var, tuple[i].clone());
1057 }
1058 let val = self.eval_operand(arg, env)?;
1059 match min_val {
1060 None => min_val = Some(val),
1061 Some(ref m) => {
1062 if val < *m {
1063 min_val = Some(val);
1064 }
1065 }
1066 }
1067 }
1068 min_val.ok_or_else(|| anyhow!("fn:min on empty group"))
1069 }
1070 "fn:float:max" => {
1071 let mut max_val: Option<f64> = None;
1072 let arg = agg
1073 .args
1074 .first()
1075 .ok_or_else(|| anyhow!("fn:float:max requires 1 argument"))?;
1076
1077 for tuple in group {
1078 for (i, var) in vars.iter().enumerate() {
1079 env.vars.insert(*var, tuple[i].clone());
1080 }
1081 let val = self.eval_operand(arg, env)?;
1082 let f = value_as_float(&val)?;
1083 max_val = Some(match max_val {
1084 None => f,
1085 Some(m) => f.max(m),
1086 });
1087 }
1088 max_val
1089 .map(Value::Float)
1090 .ok_or_else(|| anyhow!("fn:float:max on empty group"))
1091 }
1092 "fn:float:min" => {
1093 let mut min_val: Option<f64> = None;
1094 let arg = agg
1095 .args
1096 .first()
1097 .ok_or_else(|| anyhow!("fn:float:min requires 1 argument"))?;
1098
1099 for tuple in group {
1100 for (i, var) in vars.iter().enumerate() {
1101 env.vars.insert(*var, tuple[i].clone());
1102 }
1103 let val = self.eval_operand(arg, env)?;
1104 let f = value_as_float(&val)?;
1105 min_val = Some(match min_val {
1106 None => f,
1107 Some(m) => f.min(m),
1108 });
1109 }
1110 min_val
1111 .map(Value::Float)
1112 .ok_or_else(|| anyhow!("fn:float:min on empty group"))
1113 }
1114 "fn:collect" | "fn:collect_distinct" => {
1115 let arg = agg
1116 .args
1117 .first()
1118 .ok_or_else(|| anyhow!("{fn_name}: requires 1 argument"))?;
1119 let mut out: Vec<Value> = Vec::with_capacity(group.len());
1120 for tuple in group {
1121 for (i, var) in vars.iter().enumerate() {
1122 env.vars.insert(*var, tuple[i].clone());
1123 }
1124 let val = self.eval_operand(arg, env)?;
1125 if fn_name == "fn:collect_distinct" && out.contains(&val) {
1126 continue;
1127 }
1128 out.push(val);
1129 }
1130 Ok(Value::Compound(CompoundKind::List, out))
1131 }
1132 _ => Err(anyhow!("Unknown aggregation function: {fn_name}")),
1133 }
1134 }
1135
1136 fn eval_cond(&self, cond: &Condition, env: &Env) -> Result<bool> {
1137 match cond {
1138 Condition::Cmp { op, left, right } => {
1139 let l = self.eval_operand(left, env)?;
1140 let r = self.eval_operand(right, env)?;
1141 match op {
1142 CmpOp::Eq => Ok(l == r),
1143 CmpOp::Neq => Ok(l != r),
1144 CmpOp::Lt => Ok(l < r),
1145 CmpOp::Le => Ok(l <= r),
1146 CmpOp::Gt => Ok(l > r),
1147 CmpOp::Ge => Ok(l >= r),
1148 }
1149 }
1150 Condition::Negation { relation, args } => {
1151 let rel_name = self.ir.resolve_name(*relation);
1152 let iter = self.store.scan(rel_name)?;
1153 for tuple in iter {
1154 let mut mat = true;
1155 if tuple.len() != args.len() {
1156 continue;
1157 }
1158 for (i, arg) in args.iter().enumerate() {
1159 let val = self.eval_operand(arg, env)?;
1160 if tuple[i] != val {
1161 mat = false;
1162 break;
1163 }
1164 }
1165 if mat {
1166 return Ok(false); }
1168 }
1169 Ok(true) }
1171 Condition::Call { function, args } => {
1172 let fn_name = self.ir.resolve_name(*function);
1173 let mut vals = Vec::new();
1174 for arg in args {
1175 vals.push(self.eval_operand(arg, env)?);
1176 }
1177 self.eval_builtin_predicate(fn_name, &vals)
1178 }
1179 }
1180 }
1181
1182 fn eval_builtin_predicate(&self, name: &str, vals: &[Value]) -> Result<bool> {
1183 match name {
1184 ":string:starts_with" => match (&vals[0], &vals[1]) {
1185 (Value::String(s), Value::String(p)) => Ok(s.starts_with(p.as_str())),
1186 _ => Err(anyhow!(":string:starts_with: expected string arguments")),
1187 },
1188 ":string:ends_with" => match (&vals[0], &vals[1]) {
1189 (Value::String(s), Value::String(p)) => Ok(s.ends_with(p.as_str())),
1190 _ => Err(anyhow!(":string:ends_with: expected string arguments")),
1191 },
1192 ":string:contains" => match (&vals[0], &vals[1]) {
1193 (Value::String(s), Value::String(p)) => Ok(s.contains(p.as_str())),
1194 _ => Err(anyhow!(":string:contains: expected string arguments")),
1195 },
1196 ":match_prefix" => match (&vals[0], &vals[1]) {
1197 (Value::Name(name), Value::Name(prefix)) => {
1198 Ok(name.starts_with(prefix.as_str()) && name.len() > prefix.len())
1199 }
1200 _ => Err(anyhow!(":match_prefix: expected name arguments")),
1201 },
1202 _ => Err(anyhow!("Unknown built-in predicate: {name}")),
1203 }
1204 }
1205
1206 fn eval_expr(&self, expr: &Expr, env: &Env) -> Result<Value> {
1207 match expr {
1208 Expr::Value(op) => self.eval_operand(op, env),
1209 Expr::Call { function, args } => {
1210 let fn_name = self.ir.resolve_name(*function);
1211 let mut vals = Vec::new();
1212 for arg in args {
1213 vals.push(self.eval_operand(arg, env)?);
1214 }
1215 eval_function(fn_name, &vals)
1216 }
1217 }
1218 }
1219
1220 fn eval_operand(&self, op: &Operand, env: &Env) -> Result<Value> {
1221 match op {
1222 Operand::Var(v) => env
1223 .vars
1224 .get(v)
1225 .cloned()
1226 .ok_or_else(|| anyhow!("Variable not found")),
1227 Operand::Const(c) => match c {
1228 Constant::Number(n) => Ok(Value::Number(*n)),
1229 Constant::Float(f) => Ok(Value::Float(*f)),
1230 Constant::String(sid) => {
1231 Ok(Value::String(self.ir.resolve_string(*sid).to_string()))
1232 }
1233 Constant::Name(nid) => Ok(Value::Name(self.ir.resolve_name(*nid).to_string())),
1234 Constant::Time(t) => Ok(Value::Time(*t)),
1235 Constant::Duration(d) => Ok(Value::Duration(*d)),
1236 },
1237 }
1238 }
1239}
1240
1241fn value_as_float(v: &Value) -> Result<f64> {
1244 match v {
1245 Value::Float(f) => Ok(*f),
1246 Value::Number(n) => Ok(*n as f64),
1247 _ => Err(anyhow!("expected numeric value, got {v}")),
1248 }
1249}
1250
1251fn value_to_string(v: &Value) -> String {
1252 match v {
1253 Value::Number(n) => n.to_string(),
1254 Value::Float(f) => format!("{f}"),
1255 Value::String(s) => s.clone(),
1256 Value::Name(s) => s.clone(),
1257 Value::Time(t) => format!("{}", Value::Time(*t)),
1258 Value::Duration(d) => format!("{}", Value::Duration(*d)),
1259 Value::Compound(kind, elems) => format!("{}", Value::Compound(*kind, elems.clone())),
1260 Value::Null => "null".to_string(),
1261 }
1262}
1263
1264pub fn eval_function(fn_name: &str, vals: &[Value]) -> Result<Value> {
1266 match fn_name {
1267 "fn:plus" => {
1269 let mut sum: i64 = 0;
1270 for v in vals {
1271 match v {
1272 Value::Number(n) => sum += n,
1273 _ => return Err(anyhow!("fn:plus: expected integer, got {v}")),
1274 }
1275 }
1276 Ok(Value::Number(sum))
1277 }
1278 "fn:minus" => {
1279 if vals.is_empty() {
1280 return Err(anyhow!("fn:minus: requires at least 1 argument"));
1281 }
1282 let first = match &vals[0] {
1283 Value::Number(n) => *n,
1284 v => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1285 };
1286 if vals.len() == 1 {
1287 return Ok(Value::Number(-first));
1288 }
1289 let mut result = first;
1290 for v in &vals[1..] {
1291 match v {
1292 Value::Number(n) => result -= n,
1293 _ => return Err(anyhow!("fn:minus: expected integer, got {v}")),
1294 }
1295 }
1296 Ok(Value::Number(result))
1297 }
1298 "fn:mult" => {
1299 let mut product: i64 = 1;
1300 for v in vals {
1301 match v {
1302 Value::Number(n) => product *= n,
1303 _ => return Err(anyhow!("fn:mult: expected integer, got {v}")),
1304 }
1305 }
1306 Ok(Value::Number(product))
1307 }
1308 "fn:div" => {
1309 if vals.is_empty() {
1310 return Err(anyhow!("fn:div: requires at least 1 argument"));
1311 }
1312 let first = match &vals[0] {
1313 Value::Number(n) => *n,
1314 v => return Err(anyhow!("fn:div: expected integer, got {v}")),
1315 };
1316 if vals.len() == 1 {
1317 if first == 0 {
1318 return Err(anyhow!("Division by zero in fn:div"));
1319 }
1320 return Ok(Value::Number(1 / first));
1321 }
1322 let mut result = first;
1323 for v in &vals[1..] {
1324 match v {
1325 Value::Number(0) => return Err(anyhow!("Division by zero in fn:div")),
1326 Value::Number(n) => {
1327 result /= n;
1328 if result == 0 {
1329 return Ok(Value::Number(0));
1330 }
1331 }
1332 _ => return Err(anyhow!("fn:div: expected integer, got {v}")),
1333 }
1334 }
1335 Ok(Value::Number(result))
1336 }
1337
1338 "fn:float:plus" => {
1340 let mut sum: f64 = 0.0;
1341 for v in vals {
1342 sum += value_as_float(v)?;
1343 }
1344 Ok(Value::Float(sum))
1345 }
1346 "fn:float:minus" => {
1347 if vals.is_empty() {
1348 return Err(anyhow!("fn:float:minus: requires at least 1 argument"));
1349 }
1350 let first = value_as_float(&vals[0])?;
1351 if vals.len() == 1 {
1352 return Ok(Value::Float(-first));
1353 }
1354 let mut result = first;
1355 for v in &vals[1..] {
1356 result -= value_as_float(v)?;
1357 }
1358 Ok(Value::Float(result))
1359 }
1360 "fn:float:mult" => {
1361 let mut product: f64 = 1.0;
1362 for v in vals {
1363 product *= value_as_float(v)?;
1364 }
1365 Ok(Value::Float(product))
1366 }
1367 "fn:float:div" => {
1368 if vals.is_empty() {
1369 return Err(anyhow!("fn:float:div: requires at least 1 argument"));
1370 }
1371 let first = value_as_float(&vals[0])?;
1372 if vals.len() == 1 {
1373 if first == 0.0 {
1374 return Err(anyhow!("Division by zero in fn:float:div"));
1375 }
1376 return Ok(Value::Float(1.0 / first));
1377 }
1378 let mut result = first;
1379 for v in &vals[1..] {
1380 let d = value_as_float(v)?;
1381 if d == 0.0 {
1382 return Err(anyhow!("Division by zero in fn:float:div"));
1383 }
1384 result /= d;
1385 }
1386 Ok(Value::Float(result))
1387 }
1388 "fn:sqrt" => {
1389 if vals.len() != 1 {
1390 return Err(anyhow!("fn:sqrt: requires exactly 1 argument"));
1391 }
1392 let f = value_as_float(&vals[0])?;
1393 Ok(Value::Float(f.sqrt()))
1394 }
1395
1396 "fn:string:concat" => {
1398 let mut result = String::new();
1399 for v in vals {
1400 result.push_str(&value_to_string(v));
1401 }
1402 Ok(Value::String(result))
1403 }
1404 "fn:string:replace" => {
1405 if vals.len() != 4 {
1406 return Err(anyhow!("fn:string:replace: requires 4 arguments (string, old, new, count)"));
1407 }
1408 let s = match &vals[0] {
1409 Value::String(s) => s,
1410 v => return Err(anyhow!("fn:string:replace: first arg must be string, got {v}")),
1411 };
1412 let old = match &vals[1] {
1413 Value::String(s) => s,
1414 v => return Err(anyhow!("fn:string:replace: second arg must be string, got {v}")),
1415 };
1416 let new_s = match &vals[2] {
1417 Value::String(s) => s,
1418 v => return Err(anyhow!("fn:string:replace: third arg must be string, got {v}")),
1419 };
1420 let count = match &vals[3] {
1421 Value::Number(n) => *n,
1422 v => return Err(anyhow!("fn:string:replace: fourth arg must be number, got {v}")),
1423 };
1424 let result = if count < 0 {
1425 s.replace(old.as_str(), new_s.as_str())
1426 } else {
1427 s.replacen(old.as_str(), new_s.as_str(), count as usize)
1428 };
1429 Ok(Value::String(result))
1430 }
1431
1432 "fn:number:to_string" => {
1434 if vals.len() != 1 {
1435 return Err(anyhow!("fn:number:to_string: requires 1 argument"));
1436 }
1437 match &vals[0] {
1438 Value::Number(n) => Ok(Value::String(n.to_string())),
1439 v => Err(anyhow!("fn:number:to_string: expected number, got {v}")),
1440 }
1441 }
1442 "fn:float64:to_string" => {
1443 if vals.len() != 1 {
1444 return Err(anyhow!("fn:float64:to_string: requires 1 argument"));
1445 }
1446 match &vals[0] {
1447 Value::Float(f) => Ok(Value::String(format!("{f}"))),
1448 v => Err(anyhow!("fn:float64:to_string: expected float, got {v}")),
1449 }
1450 }
1451 "fn:name:to_string" => {
1452 if vals.len() != 1 {
1453 return Err(anyhow!("fn:name:to_string: requires 1 argument"));
1454 }
1455 match &vals[0] {
1456 Value::Name(s) => Ok(Value::String(s.clone())),
1457 v => Err(anyhow!("fn:name:to_string: expected name, got {v}")),
1458 }
1459 }
1460
1461 "fn:time:now" => {
1463 if !vals.is_empty() {
1464 return Err(anyhow!("fn:time:now: takes no arguments"));
1465 }
1466 let nanos = std::time::SystemTime::now()
1467 .duration_since(std::time::UNIX_EPOCH)
1468 .map_err(|e| anyhow!("fn:time:now: {e}"))?
1469 .as_nanos() as i64;
1470 Ok(Value::Time(nanos))
1471 }
1472 "fn:time:add" => {
1473 if vals.len() != 2 {
1474 return Err(anyhow!("fn:time:add: requires 2 arguments (time, duration)"));
1475 }
1476 match (&vals[0], &vals[1]) {
1477 (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t + d)),
1478 _ => Err(anyhow!("fn:time:add: expected (time, duration), got ({}, {})", vals[0], vals[1])),
1479 }
1480 }
1481 "fn:time:sub" => {
1482 if vals.len() != 2 {
1483 return Err(anyhow!("fn:time:sub: requires 2 arguments"));
1484 }
1485 match (&vals[0], &vals[1]) {
1486 (Value::Time(t1), Value::Time(t2)) => Ok(Value::Duration(t1 - t2)),
1487 (Value::Time(t), Value::Duration(d)) => Ok(Value::Time(t - d)),
1488 _ => Err(anyhow!("fn:time:sub: expected (time, time) or (time, duration)")),
1489 }
1490 }
1491 "fn:time:year" => time_component(vals, |secs, _| {
1492 let (y, _, _) = civil_from_epoch_secs(secs);
1493 y as i64
1494 }),
1495 "fn:time:month" => time_component(vals, |secs, _| {
1496 let (_, m, _) = civil_from_epoch_secs(secs);
1497 m as i64
1498 }),
1499 "fn:time:day" => time_component(vals, |secs, _| {
1500 let (_, _, d) = civil_from_epoch_secs(secs);
1501 d as i64
1502 }),
1503 "fn:time:hour" => time_component(vals, |secs, _| {
1504 secs.rem_euclid(86400) / 3600
1505 }),
1506 "fn:time:minute" => time_component(vals, |secs, _| {
1507 (secs.rem_euclid(86400) % 3600) / 60
1508 }),
1509 "fn:time:second" => time_component(vals, |secs, _| {
1510 secs.rem_euclid(86400) % 60
1511 }),
1512 "fn:time:from_unix_nanos" => {
1513 if vals.len() != 1 {
1514 return Err(anyhow!("fn:time:from_unix_nanos: requires 1 argument"));
1515 }
1516 match &vals[0] {
1517 Value::Number(n) => Ok(Value::Time(*n)),
1518 v => Err(anyhow!("fn:time:from_unix_nanos: expected number, got {v}")),
1519 }
1520 }
1521 "fn:time:to_unix_nanos" => {
1522 if vals.len() != 1 {
1523 return Err(anyhow!("fn:time:to_unix_nanos: requires 1 argument"));
1524 }
1525 match &vals[0] {
1526 Value::Time(t) => Ok(Value::Number(*t)),
1527 v => Err(anyhow!("fn:time:to_unix_nanos: expected time, got {v}")),
1528 }
1529 }
1530 "fn:time:trunc" => {
1531 if vals.len() != 2 {
1532 return Err(anyhow!("fn:time:trunc: requires 2 arguments (time, unit_name)"));
1533 }
1534 let t = match &vals[0] {
1535 Value::Time(t) => *t,
1536 v => return Err(anyhow!("fn:time:trunc: first arg must be time, got {v}")),
1537 };
1538 let unit_name = match &vals[1] {
1539 Value::Name(s) => s.as_str(),
1540 v => return Err(anyhow!("fn:time:trunc: second arg must be name, got {v}")),
1541 };
1542 let d: i64 = match unit_name {
1543 "/nanosecond" => 1,
1544 "/microsecond" => 1_000,
1545 "/millisecond" => 1_000_000,
1546 "/second" => 1_000_000_000,
1547 "/minute" => 60 * 1_000_000_000,
1548 "/hour" => 3600 * 1_000_000_000,
1549 "/day" => 24 * 3600 * 1_000_000_000,
1550 _ => return Err(anyhow!("fn:time:trunc: unknown unit {unit_name:?}")),
1551 };
1552 Ok(Value::Time(t - t.rem_euclid(d)))
1553 }
1554 "fn:time:format" => {
1555 if vals.len() != 2 {
1556 return Err(anyhow!("fn:time:format: requires 2 arguments (time, precision)"));
1557 }
1558 let t = match &vals[0] {
1559 Value::Time(t) => *t,
1560 v => return Err(anyhow!("fn:time:format: first arg must be time, got {v}")),
1561 };
1562 let precision = match &vals[1] {
1563 Value::String(s) => s.as_str(),
1564 v => return Err(anyhow!("fn:time:format: second arg must be name, got {v}")),
1565 };
1566 Ok(Value::String(format_time_with_precision(t, precision)?))
1567 }
1568 "fn:time:format_civil" => {
1569 if vals.len() != 3 {
1570 return Err(anyhow!("fn:time:format_civil: requires 3 arguments (time, timezone, precision)"));
1571 }
1572 let t = match &vals[0] {
1573 Value::Time(t) => *t,
1574 v => return Err(anyhow!("fn:time:format_civil: first arg must be time, got {v}")),
1575 };
1576 let tz = match &vals[1] {
1577 Value::String(s) => s.as_str(),
1578 v => return Err(anyhow!("fn:time:format_civil: second arg must be string, got {v}")),
1579 };
1580 let precision = match &vals[2] {
1581 Value::String(s) => s.as_str(),
1582 v => return Err(anyhow!("fn:time:format_civil: third arg must be name, got {v}")),
1583 };
1584 let offset = parse_timezone_offset(tz)?;
1585 let adjusted = t + offset * 1_000_000_000;
1586 let formatted = format_time_with_precision(adjusted, precision)?;
1587 Ok(Value::String(formatted))
1588 }
1589 "fn:time:parse_rfc3339" => {
1590 if vals.len() != 1 {
1591 return Err(anyhow!("fn:time:parse_rfc3339: requires 1 argument"));
1592 }
1593 match &vals[0] {
1594 Value::String(s) => {
1595 let nanos = parse_rfc3339_to_nanos(s)?;
1596 Ok(Value::Time(nanos))
1597 }
1598 v => Err(anyhow!("fn:time:parse_rfc3339: expected string, got {v}")),
1599 }
1600 }
1601 "fn:time:parse_civil" => {
1602 if vals.len() != 2 {
1603 return Err(anyhow!("fn:time:parse_civil: requires 2 arguments (string, timezone)"));
1604 }
1605 let s = match &vals[0] {
1606 Value::String(s) => s.as_str(),
1607 v => return Err(anyhow!("fn:time:parse_civil: first arg must be string, got {v}")),
1608 };
1609 let tz = match &vals[1] {
1610 Value::String(s) => s.as_str(),
1611 v => return Err(anyhow!("fn:time:parse_civil: second arg must be string, got {v}")),
1612 };
1613 let offset = parse_timezone_offset(tz)?;
1614 let nanos = parse_civil_datetime_to_nanos(s)?;
1615 Ok(Value::Time(nanos - offset * 1_000_000_000))
1617 }
1618
1619 "fn:duration:add" => {
1621 if vals.len() != 2 {
1622 return Err(anyhow!("fn:duration:add: requires 2 arguments"));
1623 }
1624 match (&vals[0], &vals[1]) {
1625 (Value::Duration(a), Value::Duration(b)) => Ok(Value::Duration(a + b)),
1626 _ => Err(anyhow!("fn:duration:add: expected (duration, duration)")),
1627 }
1628 }
1629 "fn:duration:mult" => {
1630 if vals.len() != 2 {
1631 return Err(anyhow!("fn:duration:mult: requires 2 arguments"));
1632 }
1633 match (&vals[0], &vals[1]) {
1634 (Value::Duration(d), Value::Number(n)) => Ok(Value::Duration(d * n)),
1635 (Value::Number(n), Value::Duration(d)) => Ok(Value::Duration(n * d)),
1636 _ => Err(anyhow!("fn:duration:mult: expected (duration, number) or (number, duration)")),
1637 }
1638 }
1639 "fn:duration:hours" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 60.0 * 1_000_000_000.0)),
1640 "fn:duration:minutes" => duration_component_float(vals, |nanos| nanos as f64 / (60.0 * 1_000_000_000.0)),
1641 "fn:duration:seconds" => duration_component_float(vals, |nanos| nanos as f64 / 1_000_000_000.0),
1642 "fn:duration:nanos" => duration_component_int(vals, |nanos| nanos),
1643 "fn:duration:from_nanos" => {
1644 if vals.len() != 1 {
1645 return Err(anyhow!("fn:duration:from_nanos: requires 1 argument"));
1646 }
1647 match &vals[0] {
1648 Value::Number(n) => Ok(Value::Duration(*n)),
1649 v => Err(anyhow!("fn:duration:from_nanos: expected number, got {v}")),
1650 }
1651 }
1652 "fn:duration:from_hours" => duration_from(vals, "hours", 60 * 60 * 1_000_000_000),
1653 "fn:duration:from_minutes" => duration_from(vals, "minutes", 60 * 1_000_000_000),
1654 "fn:duration:from_seconds" => duration_from(vals, "seconds", 1_000_000_000),
1655 "fn:duration:parse" => {
1656 if vals.len() != 1 {
1657 return Err(anyhow!("fn:duration:parse: requires 1 argument"));
1658 }
1659 match &vals[0] {
1660 Value::String(s) => {
1661 let nanos = parse_duration_string(s)?;
1662 Ok(Value::Duration(nanos))
1663 }
1664 v => Err(anyhow!("fn:duration:parse: expected string, got {v}")),
1665 }
1666 }
1667
1668 "fn:list" => Ok(Value::Compound(CompoundKind::List, vals.to_vec())),
1670 "fn:pair" => {
1671 if vals.len() != 2 {
1672 return Err(anyhow!("fn:pair: requires exactly 2 arguments"));
1673 }
1674 Ok(Value::Compound(CompoundKind::Pair, vals.to_vec()))
1675 }
1676 "fn:struct" => {
1677 if vals.len() % 2 != 0 {
1679 return Err(anyhow!(
1680 "fn:struct: requires even number of arguments (field, value pairs)"
1681 ));
1682 }
1683 Ok(Value::Compound(CompoundKind::Struct, vals.to_vec()))
1684 }
1685 "fn:map" => {
1686 if vals.len() % 2 != 0 {
1688 return Err(anyhow!(
1689 "fn:map: requires even number of arguments (key, value pairs)"
1690 ));
1691 }
1692 Ok(Value::Compound(CompoundKind::Map, vals.to_vec()))
1693 }
1694
1695 "fn:list:get" => {
1697 if vals.len() != 2 {
1698 return Err(anyhow!("fn:list:get: requires 2 arguments (list, index)"));
1699 }
1700 match (&vals[0], &vals[1]) {
1701 (Value::Compound(_, elems), Value::Number(idx)) => {
1702 let i = *idx as usize;
1703 elems
1704 .get(i)
1705 .cloned()
1706 .ok_or_else(|| anyhow!("fn:list:get: index {i} out of bounds (len {})", elems.len()))
1707 }
1708 _ => Err(anyhow!("fn:list:get: expected (compound, number)")),
1709 }
1710 }
1711 "fn:list:append" => {
1712 if vals.len() != 2 {
1713 return Err(anyhow!("fn:list:append: requires 2 arguments (list, elem)"));
1714 }
1715 match &vals[0] {
1716 Value::Compound(CompoundKind::List, elems) => {
1717 let mut out = elems.clone();
1718 out.push(vals[1].clone());
1719 Ok(Value::Compound(CompoundKind::List, out))
1720 }
1721 _ => Err(anyhow!("fn:list:append: expected list as first argument")),
1722 }
1723 }
1724 "fn:list:len" | "fn:len" => {
1725 if vals.len() != 1 {
1726 return Err(anyhow!("fn:len: requires 1 argument"));
1727 }
1728 match &vals[0] {
1729 Value::Compound(_, elems) => Ok(Value::Number(elems.len() as i64)),
1730 _ => Err(anyhow!("fn:len: expected compound value")),
1731 }
1732 }
1733 "fn:pair:first" => {
1734 if vals.len() != 1 {
1735 return Err(anyhow!("fn:pair:first: requires 1 argument"));
1736 }
1737 match &vals[0] {
1738 Value::Compound(_, elems) if elems.len() >= 1 => Ok(elems[0].clone()),
1739 _ => Err(anyhow!("fn:pair:first: expected compound with at least 1 element")),
1740 }
1741 }
1742 "fn:pair:second" => {
1743 if vals.len() != 1 {
1744 return Err(anyhow!("fn:pair:second: requires 1 argument"));
1745 }
1746 match &vals[0] {
1747 Value::Compound(_, elems) if elems.len() >= 2 => Ok(elems[1].clone()),
1748 _ => Err(anyhow!("fn:pair:second: expected compound with at least 2 elements")),
1749 }
1750 }
1751 "fn:struct:get" | "fn:map:get" => {
1752 if vals.len() != 2 {
1753 return Err(anyhow!("{fn_name}: requires 2 arguments (compound, key)"));
1754 }
1755 match &vals[0] {
1756 Value::Compound(_, elems) => {
1757 for pair in elems.chunks_exact(2) {
1759 if pair[0] == vals[1] {
1760 return Ok(pair[1].clone());
1761 }
1762 }
1763 Err(anyhow!("{fn_name}: key not found"))
1764 }
1765 _ => Err(anyhow!("{fn_name}: expected compound value")),
1766 }
1767 }
1768 "fn:map:len" | "fn:struct:len" => {
1769 if vals.len() != 1 {
1770 return Err(anyhow!("{fn_name}: requires 1 argument"));
1771 }
1772 match &vals[0] {
1773 Value::Compound(_, elems) => Ok(Value::Number((elems.len() / 2) as i64)),
1774 _ => Err(anyhow!("{fn_name}: expected compound value")),
1775 }
1776 }
1777 "fn:map:keys" => {
1778 if vals.len() != 1 {
1779 return Err(anyhow!("fn:map:keys: requires 1 argument"));
1780 }
1781 match &vals[0] {
1782 Value::Compound(_, elems) => {
1783 let keys: Vec<Value> = elems.chunks_exact(2).map(|p| p[0].clone()).collect();
1784 Ok(Value::Compound(CompoundKind::List, keys))
1785 }
1786 _ => Err(anyhow!("fn:map:keys: expected compound value")),
1787 }
1788 }
1789 "fn:map:values" | "fn:struct:values" => {
1790 if vals.len() != 1 {
1791 return Err(anyhow!("{fn_name}: requires 1 argument"));
1792 }
1793 match &vals[0] {
1794 Value::Compound(_, elems) => {
1795 let values: Vec<Value> = elems.chunks_exact(2).map(|p| p[1].clone()).collect();
1796 Ok(Value::Compound(CompoundKind::List, values))
1797 }
1798 _ => Err(anyhow!("{fn_name}: expected compound value")),
1799 }
1800 }
1801
1802 _ => Err(anyhow!("Unknown function: {fn_name}")),
1803 }
1804}
1805
1806fn time_component(vals: &[Value], extract: impl Fn(i64, i64) -> i64) -> Result<Value> {
1807 if vals.len() != 1 {
1808 return Err(anyhow!("time component function: requires 1 argument"));
1809 }
1810 match &vals[0] {
1811 Value::Time(nanos) => {
1812 let secs = nanos.div_euclid(1_000_000_000);
1813 let sub_nanos = nanos.rem_euclid(1_000_000_000);
1814 Ok(Value::Number(extract(secs, sub_nanos)))
1815 }
1816 v => Err(anyhow!("time component function: expected time, got {v}")),
1817 }
1818}
1819
1820fn civil_from_epoch_secs(secs: i64) -> (i32, u32, u32) {
1821 let days = secs.div_euclid(86400);
1822 let z = days + 719468;
1823 let era = (if z >= 0 { z } else { z - 146096 }) / 146097;
1824 let doe = (z - era * 146097) as u32;
1825 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1826 let y = yoe as i64 + era * 400;
1827 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1828 let mp = (5 * doy + 2) / 153;
1829 let d = doy - (153 * mp + 2) / 5 + 1;
1830 let m = if mp < 10 { mp + 3 } else { mp - 9 };
1831 let y = if m <= 2 { y + 1 } else { y };
1832 (y as i32, m, d)
1833}
1834
1835fn duration_component_float(vals: &[Value], extract: impl Fn(i64) -> f64) -> Result<Value> {
1836 if vals.len() != 1 {
1837 return Err(anyhow!("duration component function: requires 1 argument"));
1838 }
1839 match &vals[0] {
1840 Value::Duration(nanos) => Ok(Value::Float(extract(*nanos))),
1841 v => Err(anyhow!("duration component function: expected duration, got {v}")),
1842 }
1843}
1844
1845fn duration_component_int(vals: &[Value], extract: impl Fn(i64) -> i64) -> Result<Value> {
1846 if vals.len() != 1 {
1847 return Err(anyhow!("duration component function: requires 1 argument"));
1848 }
1849 match &vals[0] {
1850 Value::Duration(nanos) => Ok(Value::Number(extract(*nanos))),
1851 v => Err(anyhow!("duration component function: expected duration, got {v}")),
1852 }
1853}
1854
1855fn duration_from(vals: &[Value], name: &str, multiplier: i64) -> Result<Value> {
1856 if vals.len() != 1 {
1857 return Err(anyhow!("fn:duration:from_{name}: requires 1 argument"));
1858 }
1859 match &vals[0] {
1860 Value::Number(n) => Ok(Value::Duration(n * multiplier)),
1861 v => Err(anyhow!("fn:duration:from_{name}: expected number, got {v}")),
1862 }
1863}
1864
1865fn format_time_with_precision(nanos: i64, precision: &str) -> Result<String> {
1867 let secs = nanos.div_euclid(1_000_000_000);
1868 let sub_nanos = nanos.rem_euclid(1_000_000_000);
1869 let (y, m, d) = civil_from_epoch_secs(secs);
1870 let time_of_day = secs.rem_euclid(86400);
1871 let hour = time_of_day / 3600;
1872 let minute = (time_of_day % 3600) / 60;
1873 let second = time_of_day % 60;
1874
1875 match precision {
1876 "/year" => Ok(format!("{y:04}")),
1877 "/month" => Ok(format!("{y:04}-{m:02}")),
1878 "/day" => Ok(format!("{y:04}-{m:02}-{d:02}")),
1879 "/hour" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}Z")),
1880 "/minute" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}Z")),
1881 "/second" => Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z")),
1882 "/millisecond" => {
1883 let ms = sub_nanos / 1_000_000;
1884 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ms:03}Z"))
1885 }
1886 "/microsecond" => {
1887 let us = sub_nanos / 1_000;
1888 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{us:06}Z"))
1889 }
1890 "/nanosecond" => {
1891 if sub_nanos == 0 {
1892 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}Z"))
1893 } else {
1894 let ns_str = format!("{sub_nanos:09}");
1895 let ns_trimmed = ns_str.trim_end_matches('0');
1896 Ok(format!("{y:04}-{m:02}-{d:02}T{hour:02}:{minute:02}:{second:02}.{ns_trimmed}Z"))
1897 }
1898 }
1899 _ => Err(anyhow!("unknown time precision: {precision:?}")),
1900 }
1901}
1902
1903fn parse_timezone_offset(tz: &str) -> Result<i64> {
1906 match tz {
1907 "UTC" => Ok(0),
1908 s if s.starts_with('+') || s.starts_with('-') => {
1909 let sign: i64 = if s.starts_with('-') { -1 } else { 1 };
1910 let rest = &s[1..];
1911 let parts: Vec<&str> = rest.split(':').collect();
1912 if parts.len() != 2 {
1913 return Err(anyhow!("invalid timezone offset: {tz:?}"));
1914 }
1915 let hours: i64 = parts[0].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1916 let minutes: i64 = parts[1].parse().map_err(|_| anyhow!("invalid timezone: {tz:?}"))?;
1917 Ok(sign * (hours * 3600 + minutes * 60))
1918 }
1919 _ => Err(anyhow!("unsupported timezone: {tz:?} (use \"UTC\" or offset like \"+05:30\")")),
1920 }
1921}
1922
1923fn parse_rfc3339_to_nanos(s: &str) -> Result<i64> {
1925 if s.len() < 10 {
1927 return Err(anyhow!("fn:time:parse_rfc3339: string too short: {s:?}"));
1928 }
1929 let year: i64 = s[0..4].parse().map_err(|_| anyhow!("invalid year in {s:?}"))?;
1930 let month: u32 = s[5..7].parse().map_err(|_| anyhow!("invalid month in {s:?}"))?;
1931 let day: u32 = s[8..10].parse().map_err(|_| anyhow!("invalid day in {s:?}"))?;
1932
1933 let (hour, minute, second, frac_nanos) = if s.len() > 10 && s.as_bytes()[10] == b'T' {
1934 if s.len() < 19 {
1935 return Err(anyhow!("fn:time:parse_rfc3339: incomplete time in {s:?}"));
1936 }
1937 let h: u32 = s[11..13].parse().map_err(|_| anyhow!("invalid hour"))?;
1938 let min: u32 = s[14..16].parse().map_err(|_| anyhow!("invalid minute"))?;
1939 let sec: u32 = s[17..19].parse().map_err(|_| anyhow!("invalid second"))?;
1940
1941 let frac = if s.len() > 19 && s.as_bytes()[19] == b'.' {
1942 let end = s.len() - if s.ends_with('Z') { 1 } else { 0 };
1943 let frac_str = &s[20..end];
1944 let padded = format!("{frac_str:0<9}");
1945 padded[..9].parse::<i64>().unwrap_or(0)
1946 } else {
1947 0
1948 };
1949 (h, min, sec, frac)
1950 } else {
1951 (0, 0, 0, 0)
1952 };
1953
1954 let y = if month <= 2 { year - 1 } else { year };
1955 let era = (if y >= 0 { y } else { y - 399 }) / 400;
1956 let yoe = (y - era * 400) as u32;
1957 let m_adj = if month > 2 { month - 3 } else { month + 9 };
1958 let doy = (153 * m_adj + 2) / 5 + day - 1;
1959 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
1960 let days = era * 146097 + doe as i64 - 719468;
1961
1962 let total_seconds = days * 86400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64;
1963 Ok(total_seconds * 1_000_000_000 + frac_nanos)
1964}
1965
1966fn parse_civil_datetime_to_nanos(s: &str) -> Result<i64> {
1968 parse_rfc3339_to_nanos(s)
1970}
1971
1972fn parse_duration_string(s: &str) -> Result<i64> {
1974 if s.is_empty() {
1975 return Err(anyhow!("fn:duration:parse: empty string"));
1976 }
1977 if s == "0" || s == "0s" {
1978 return Ok(0);
1979 }
1980
1981 let (sign, mut rest) = if s.starts_with('-') {
1982 (-1i64, &s[1..])
1983 } else if s.starts_with('+') {
1984 (1i64, &s[1..])
1985 } else {
1986 (1i64, s)
1987 };
1988
1989 let mut total_nanos: i64 = 0;
1990
1991 while !rest.is_empty() {
1992 let num_end = rest
1994 .find(|c: char| !c.is_ascii_digit() && c != '.')
1995 .unwrap_or(rest.len());
1996 if num_end == 0 {
1997 return Err(anyhow!("fn:duration:parse: expected number in {s:?}"));
1998 }
1999 let num_str = &rest[..num_end];
2000 rest = &rest[num_end..];
2001
2002 let (unit_nanos, unit_len) = if rest.starts_with("ns") {
2004 (1i64, 2)
2005 } else if rest.starts_with("us") || rest.starts_with("µs") {
2006 (1_000i64, if rest.starts_with("µ") { 3 } else { 2 })
2007 } else if rest.starts_with("ms") {
2008 (1_000_000i64, 2)
2009 } else if rest.starts_with('s') {
2010 (1_000_000_000i64, 1)
2011 } else if rest.starts_with('m') {
2012 (60 * 1_000_000_000i64, 1)
2013 } else if rest.starts_with('h') {
2014 (3600 * 1_000_000_000i64, 1)
2015 } else {
2016 return Err(anyhow!("fn:duration:parse: unknown unit in {s:?}"));
2017 };
2018 rest = &rest[unit_len..];
2019
2020 if num_str.contains('.') {
2021 let val: f64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
2022 total_nanos += (val * unit_nanos as f64) as i64;
2023 } else {
2024 let val: i64 = num_str.parse().map_err(|_| anyhow!("fn:duration:parse: invalid number {num_str:?}"))?;
2025 total_nanos += val * unit_nanos;
2026 }
2027 }
2028
2029 Ok(sign * total_nanos)
2030}
2031
2032#[cfg(test)]
2033mod tests {
2034 use super::*;
2035
2036 #[test]
2037 fn test_retract_existing() {
2038 let mut store = MemStore::new();
2039 store.add_fact("r", vec![Value::Number(1), Value::Number(2)]);
2040 store.add_fact("r", vec![Value::Number(3), Value::Number(4)]);
2041
2042 let removed = store
2043 .retract("r", &[Value::Number(1), Value::Number(2)])
2044 .unwrap();
2045 assert!(removed);
2046
2047 let facts = store.get_facts("r");
2048 assert_eq!(facts.len(), 1);
2049 assert_eq!(facts[0], vec![Value::Number(3), Value::Number(4)]);
2050 }
2051
2052 #[test]
2053 fn test_retract_nonexistent() {
2054 let mut store = MemStore::new();
2055 store.add_fact("r", vec![Value::Number(1)]);
2056
2057 let removed = store.retract("r", &[Value::Number(99)]).unwrap();
2058 assert!(!removed);
2059
2060 let facts = store.get_facts("r");
2061 assert_eq!(facts.len(), 1);
2062 assert_eq!(facts[0], vec![Value::Number(1)]);
2063 }
2064
2065 #[test]
2066 fn test_clear() {
2067 let mut store = MemStore::new();
2068 store.add_fact("r", vec![Value::Number(1)]);
2069 store.add_fact("r", vec![Value::Number(2)]);
2070 store.add_fact("s", vec![Value::Number(10)]);
2071
2072 store.clear("r");
2073
2074 let r_facts = store.get_facts("r");
2075 assert!(r_facts.is_empty());
2076
2077 let s_facts = store.get_facts("s");
2079 assert_eq!(s_facts.len(), 1);
2080 }
2081
2082 #[test]
2083 fn test_relation_names() {
2084 let mut store = MemStore::new();
2085 store.create_relation("alpha");
2086 store.create_relation("beta");
2087 store.add_fact("gamma", vec![Value::Number(1)]);
2088
2089 let mut names = store.relation_names();
2090 names.sort();
2091 assert_eq!(names, vec!["alpha", "beta", "gamma"]);
2092 }
2093
2094 #[test]
2095 fn test_provenance_recording() {
2096 use mangle_ir::physical::{DataSource, Operand};
2097
2098 let mut ir = mangle_ir::Ir::new();
2100 let base_name = ir.intern_name("base");
2101 let derived_name = ir.intern_name("derived");
2102 let var_x = ir.intern_name("X");
2103
2104 let op = Op::Iterate {
2106 source: DataSource::Scan {
2107 relation: base_name,
2108 vars: vec![var_x],
2109 },
2110 body: Box::new(Op::Insert {
2111 relation: derived_name,
2112 args: vec![Operand::Var(var_x)],
2113 }),
2114 };
2115
2116 let mut store = Box::new(MemStore::new());
2117 store.add_fact("base", vec![Value::Number(10)]);
2118 store.add_fact("base", vec![Value::Number(20)]);
2119 store.create_relation("derived");
2120
2121 let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>).with_provenance();
2122
2123 let count = interpreter.execute(&op).unwrap();
2124 assert_eq!(count, 2);
2125
2126 let prov = interpreter.provenance.as_ref().unwrap();
2128 assert_eq!(prov.entries.len(), 2);
2129
2130 for entry in &prov.entries {
2132 assert_eq!(entry.derived.0, "derived");
2133 assert_eq!(entry.premises.len(), 1);
2134 assert_eq!(entry.premises[0].0, "base");
2135 }
2136
2137 let mut derived_vals: Vec<i64> = prov
2139 .entries
2140 .iter()
2141 .map(|e| match &e.derived.1[0] {
2142 Value::Number(n) => *n,
2143 _ => panic!("expected number"),
2144 })
2145 .collect();
2146 derived_vals.sort();
2147 assert_eq!(derived_vals, vec![10, 20]);
2148 }
2149
2150 #[test]
2151 fn test_float_values() {
2152 use mangle_ir::physical::{self, DataSource, Expr, Operand};
2153
2154 let mut ir = mangle_ir::Ir::new();
2155 let temps = ir.intern_name("temps");
2156 let result = ir.intern_name("result");
2157 let var_x = ir.intern_name("X");
2158 let simple_op = Op::Iterate {
2160 source: DataSource::Scan {
2161 relation: temps,
2162 vars: vec![var_x],
2163 },
2164 body: Box::new(Op::Insert {
2165 relation: result,
2166 args: vec![Operand::Var(var_x)],
2167 }),
2168 };
2169
2170 let mut store = Box::new(MemStore::new());
2171 store.add_fact("temps", vec![Value::Float(36.5)]);
2172 store.add_fact("temps", vec![Value::Float(35.9)]);
2173 store.add_fact("temps", vec![Value::Float(37.2)]);
2174 store.create_relation("result");
2175
2176 let mut interpreter = Interpreter::new(&ir, store as Box<dyn Store>);
2177 let count = interpreter.execute(&simple_op).unwrap();
2178 assert_eq!(count, 3, "basic float scan+insert should produce 3 facts");
2179
2180 let mut ir2 = mangle_ir::Ir::new();
2182 let temps2 = ir2.intern_name("temps");
2183 let result2 = ir2.intern_name("result2");
2184 let var_x2 = ir2.intern_name("X");
2185 let var_y2 = ir2.intern_name("Y");
2186 let fn_plus2 = ir2.intern_name("fn:float:plus");
2187
2188 let op = Op::Iterate {
2189 source: DataSource::Scan {
2190 relation: temps2,
2191 vars: vec![var_x2],
2192 },
2193 body: Box::new(Op::Filter {
2194 cond: Condition::Cmp {
2195 op: physical::CmpOp::Gt,
2196 left: Operand::Var(var_x2),
2197 right: Operand::Const(physical::Constant::Float(36.0)),
2198 },
2199 body: Box::new(Op::Let {
2200 var: var_y2,
2201 expr: Expr::Call {
2202 function: fn_plus2,
2203 args: vec![
2204 Operand::Var(var_x2),
2205 Operand::Const(physical::Constant::Float(0.5)),
2206 ],
2207 },
2208 body: Box::new(Op::Insert {
2209 relation: result2,
2210 args: vec![Operand::Var(var_x2), Operand::Var(var_y2)],
2211 }),
2212 }),
2213 }),
2214 };
2215
2216 let mut store2 = Box::new(MemStore::new());
2217 store2.add_fact("temps", vec![Value::Float(36.5)]);
2218 store2.add_fact("temps", vec![Value::Float(35.9)]);
2219 store2.add_fact("temps", vec![Value::Float(37.2)]);
2220 store2.create_relation("result2");
2221
2222 let mut interpreter2 = Interpreter::new(&ir2, store2 as Box<dyn Store>);
2223 let count2 = interpreter2.execute(&op).unwrap();
2224
2225 assert_eq!(count2, 2);
2227
2228 let results: Vec<_> = interpreter2
2230 .store()
2231 .scan_next_delta("result2")
2232 .unwrap()
2233 .collect();
2234 assert_eq!(results.len(), 2);
2235
2236 let mut output: Vec<(f64, f64)> = results
2237 .iter()
2238 .map(|t| match (&t[0], &t[1]) {
2239 (Value::Float(a), Value::Float(b)) => (*a, *b),
2240 _ => panic!("expected floats"),
2241 })
2242 .collect();
2243 output.sort_by(|a, b| a.0.total_cmp(&b.0));
2244
2245 assert_eq!(output[0], (36.5, 37.0));
2246 assert_eq!(output[1], (37.2, 37.7));
2247 }
2248
2249 #[test]
2250 fn test_float_in_memstore() {
2251 let mut store = MemStore::new();
2253 store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2254 store.add_fact("data", vec![Value::Float(2.5), Value::Number(20)]);
2255 store.add_fact("data", vec![Value::Float(1.5), Value::Number(10)]);
2257
2258 let facts = store.get_facts("data");
2259 assert_eq!(facts.len(), 2);
2260
2261 let results: Vec<_> = store
2263 .scan_index("data", 0, &Value::Float(1.5))
2264 .unwrap()
2265 .collect();
2266 assert_eq!(results.len(), 1);
2267 assert_eq!(results[0][1], Value::Number(10));
2268 }
2269
2270 #[test]
2273 fn test_fn_plus_variadic() {
2274 assert_eq!(
2275 eval_function("fn:plus", &[Value::Number(1), Value::Number(2), Value::Number(3)]).unwrap(),
2276 Value::Number(6)
2277 );
2278 assert_eq!(eval_function("fn:plus", &[]).unwrap(), Value::Number(0));
2280 }
2281
2282 #[test]
2283 fn test_fn_minus_variadic() {
2284 assert_eq!(
2286 eval_function("fn:minus", &[Value::Number(5)]).unwrap(),
2287 Value::Number(-5)
2288 );
2289 assert_eq!(
2291 eval_function("fn:minus", &[Value::Number(10), Value::Number(3)]).unwrap(),
2292 Value::Number(7)
2293 );
2294 assert_eq!(
2296 eval_function("fn:minus", &[Value::Number(100), Value::Number(10), Value::Number(20)]).unwrap(),
2297 Value::Number(70)
2298 );
2299 assert!(eval_function("fn:minus", &[]).is_err());
2301 }
2302
2303 #[test]
2304 fn test_fn_mult_variadic() {
2305 assert_eq!(
2306 eval_function("fn:mult", &[Value::Number(2), Value::Number(3), Value::Number(4)]).unwrap(),
2307 Value::Number(24)
2308 );
2309 assert_eq!(eval_function("fn:mult", &[]).unwrap(), Value::Number(1));
2311 }
2312
2313 #[test]
2314 fn test_fn_div() {
2315 assert_eq!(
2317 eval_function("fn:div", &[Value::Number(10), Value::Number(3)]).unwrap(),
2318 Value::Number(3)
2319 );
2320 assert_eq!(
2322 eval_function("fn:div", &[Value::Number(5)]).unwrap(),
2323 Value::Number(0)
2324 );
2325 assert_eq!(
2326 eval_function("fn:div", &[Value::Number(1)]).unwrap(),
2327 Value::Number(1)
2328 );
2329 assert!(eval_function("fn:div", &[Value::Number(1), Value::Number(0)]).is_err());
2331 assert!(eval_function("fn:div", &[Value::Number(0)]).is_err());
2332 }
2333
2334 #[test]
2335 fn test_fn_float_promotion() {
2336 assert_eq!(
2338 eval_function("fn:float:plus", &[Value::Float(1.5), Value::Number(2)]).unwrap(),
2339 Value::Float(3.5)
2340 );
2341 assert_eq!(
2343 eval_function("fn:sqrt", &[Value::Number(9)]).unwrap(),
2344 Value::Float(3.0)
2345 );
2346 }
2347
2348 #[test]
2349 fn test_fn_string_concat() {
2350 assert_eq!(
2351 eval_function(
2352 "fn:string:concat",
2353 &[Value::String("a".into()), Value::String("b".into()), Value::String("c".into())]
2354 ).unwrap(),
2355 Value::String("abc".to_string())
2356 );
2357 assert_eq!(
2359 eval_function(
2360 "fn:string:concat",
2361 &[Value::String("n=".into()), Value::Number(42)]
2362 ).unwrap(),
2363 Value::String("n=42".to_string())
2364 );
2365 }
2366
2367 #[test]
2368 fn test_fn_string_replace() {
2369 assert_eq!(
2371 eval_function(
2372 "fn:string:replace",
2373 &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(-1)]
2374 ).unwrap(),
2375 Value::String("a_b_c".to_string())
2376 );
2377 assert_eq!(
2379 eval_function(
2380 "fn:string:replace",
2381 &[Value::String("a-b-c".into()), Value::String("-".into()), Value::String("_".into()), Value::Number(1)]
2382 ).unwrap(),
2383 Value::String("a_b-c".to_string())
2384 );
2385 }
2386
2387 #[test]
2388 fn test_fn_to_string() {
2389 assert_eq!(
2390 eval_function("fn:number:to_string", &[Value::Number(42)]).unwrap(),
2391 Value::String("42".to_string())
2392 );
2393 assert_eq!(
2394 eval_function("fn:float64:to_string", &[Value::Float(3.14)]).unwrap(),
2395 Value::String("3.14".to_string())
2396 );
2397 assert_eq!(
2398 eval_function("fn:name:to_string", &[Value::Name("/role/admin".into())]).unwrap(),
2399 Value::String("/role/admin".to_string())
2400 );
2401 }
2402
2403 fn date_nanos(year: i64, month: u32, day: u32) -> i64 {
2409 let m = month;
2411 let y = if m <= 2 { year - 1 } else { year };
2412 let era = (if y >= 0 { y } else { y - 399 }) / 400;
2413 let yoe = (y - era * 400) as u32;
2414 let m_adj = if m > 2 { m - 3 } else { m + 9 };
2415 let doy = (153 * m_adj + 2) / 5 + day - 1;
2416 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
2417 let days = era * 146097 + doe as i64 - 719468;
2418 days * 86400 * 1_000_000_000
2419 }
2420
2421 fn datetime_nanos(year: i64, month: u32, day: u32, h: u32, m: u32, s: u32, ns: i64) -> i64 {
2422 date_nanos(year, month, day) + (h as i64) * 3_600_000_000_000
2423 + (m as i64) * 60_000_000_000 + (s as i64) * 1_000_000_000 + ns
2424 }
2425
2426 #[test]
2428 fn test_coalesce_overlapping() {
2429 let mut store = MemStore::new();
2430 let jan1 = date_nanos(2024, 1, 1);
2431 let jan15 = date_nanos(2024, 1, 15);
2432 let jan10 = date_nanos(2024, 1, 10);
2433 let jan25 = date_nanos(2024, 1, 25);
2434 let jan20 = date_nanos(2024, 1, 20);
2435 let jan31 = date_nanos(2024, 1, 31);
2436
2437 store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan1), Value::Time(jan15)]);
2438 store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan10), Value::Time(jan25)]);
2439 store.add_fact("active", vec![Value::String("/service".into()), Value::Time(jan20), Value::Time(jan31)]);
2440
2441 assert_eq!(store.get_facts("active").len(), 3);
2442
2443 store.coalesce_temporal("active");
2444
2445 let facts = store.get_facts("active");
2446 assert_eq!(facts.len(), 1, "after coalesce: expected 1, got {:?}", facts);
2447 assert_eq!(facts[0][1], Value::Time(jan1), "start should be Jan 1");
2448 assert_eq!(facts[0][2], Value::Time(jan31), "end should be Jan 31");
2449 }
2450
2451 #[test]
2453 fn test_coalesce_adjacent() {
2454 let mut store = MemStore::new();
2455 let shift1_start = datetime_nanos(2024, 1, 1, 8, 0, 0, 0);
2456 let shift1_end = datetime_nanos(2024, 1, 1, 16, 0, 0, 0);
2457 let shift2_start = datetime_nanos(2024, 1, 1, 16, 0, 0, 1); let shift2_end = date_nanos(2024, 1, 2);
2459
2460 store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift1_start), Value::Time(shift1_end)]);
2461 store.add_fact("shift", vec![Value::String("/worker".into()), Value::Time(shift2_start), Value::Time(shift2_end)]);
2462
2463 store.coalesce_temporal("shift");
2464
2465 let facts = store.get_facts("shift");
2466 assert_eq!(facts.len(), 1, "adjacent intervals should coalesce");
2467 assert_eq!(facts[0][1], Value::Time(shift1_start));
2468 assert_eq!(facts[0][2], Value::Time(shift2_end));
2469 }
2470
2471 #[test]
2473 fn test_coalesce_non_overlapping() {
2474 let mut store = MemStore::new();
2475 let jan1 = date_nanos(2024, 1, 1);
2476 let jan7 = date_nanos(2024, 1, 7);
2477 let jun1 = date_nanos(2024, 6, 1);
2478 let jun14 = date_nanos(2024, 6, 14);
2479
2480 store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan7)]);
2481 store.add_fact("vacation", vec![Value::String("/alice".into()), Value::Time(jun1), Value::Time(jun14)]);
2482
2483 store.coalesce_temporal("vacation");
2484
2485 let facts = store.get_facts("vacation");
2486 assert_eq!(facts.len(), 2, "non-overlapping intervals should stay separate");
2487 }
2488
2489 #[test]
2491 fn test_coalesce_mixed_granularity() {
2492 let mut store = MemStore::new();
2493 let t1_start = datetime_nanos(2024, 1, 1, 10, 0, 0, 0);
2495 let t1_end = datetime_nanos(2024, 1, 1, 10, 0, 5, 0);
2496 let t2_start = datetime_nanos(2024, 1, 1, 10, 0, 4, 500_000_000);
2498 let t2_end = datetime_nanos(2024, 1, 1, 10, 0, 6, 0);
2499 let t3_start = datetime_nanos(2024, 1, 1, 10, 0, 6, 1);
2501 let t3_end = datetime_nanos(2024, 1, 1, 10, 0, 7, 0);
2502
2503 store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t1_start), Value::Time(t1_end)]);
2504 store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t2_start), Value::Time(t2_end)]);
2505 store.add_fact("event", vec![Value::String("/sensor".into()), Value::Time(t3_start), Value::Time(t3_end)]);
2506
2507 store.coalesce_temporal("event");
2508
2509 let facts = store.get_facts("event");
2510 assert_eq!(facts.len(), 1, "mixed granularity should coalesce to 1");
2511 assert_eq!(facts[0][1], Value::Time(t1_start), "start: 10:00:00");
2512 assert_eq!(facts[0][2], Value::Time(t3_end), "end: 10:00:07");
2513 }
2514
2515 #[test]
2517 fn test_coalesce_multiple_keys() {
2518 let mut store = MemStore::new();
2519 let jan1 = date_nanos(2024, 1, 1);
2520 let jan10 = date_nanos(2024, 1, 10);
2521 let jan5 = date_nanos(2024, 1, 5);
2522 let jan15 = date_nanos(2024, 1, 15);
2523
2524 store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan1), Value::Time(jan10)]);
2526 store.add_fact("employed", vec![Value::String("/alice".into()), Value::Time(jan5), Value::Time(jan15)]);
2527 store.add_fact("employed", vec![Value::String("/bob".into()), Value::Time(jan1), Value::Time(jan15)]);
2529
2530 store.coalesce_temporal("employed");
2531
2532 let facts = store.get_facts("employed");
2533 assert_eq!(facts.len(), 2, "expected 2 facts after coalesce, got {:?}", facts);
2535 }
2536
2537 fn nested_loop_two_way(
2547 ir: &mangle_ir::Ir,
2548 a: NameId,
2549 b: NameId,
2550 result: NameId,
2551 x: NameId,
2552 z: NameId,
2553 y: NameId,
2554 z_right: NameId,
2555 ) -> Op {
2556 use mangle_ir::physical::{CmpOp, Condition, DataSource, Operand};
2557 let _ = ir;
2558 Op::Iterate {
2559 source: DataSource::Scan {
2560 relation: a,
2561 vars: vec![x, z],
2562 },
2563 body: Box::new(Op::Iterate {
2564 source: DataSource::Scan {
2565 relation: b,
2566 vars: vec![z_right, y],
2567 },
2568 body: Box::new(Op::Filter {
2569 cond: Condition::Cmp {
2570 op: CmpOp::Eq,
2571 left: Operand::Var(z),
2572 right: Operand::Var(z_right),
2573 },
2574 body: Box::new(Op::Insert {
2575 relation: result,
2576 args: vec![Operand::Var(x), Operand::Var(y)],
2577 }),
2578 }),
2579 }),
2580 }
2581 }
2582
2583 fn hash_join_two_way(
2585 a: NameId,
2586 b: NameId,
2587 result: NameId,
2588 x: NameId,
2589 z: NameId,
2590 y: NameId,
2591 ) -> Op {
2592 use mangle_ir::physical::{DataSource, Operand};
2593 Op::HashJoin {
2594 build_source: DataSource::Scan {
2595 relation: a,
2596 vars: vec![x, z],
2597 },
2598 probe_source: DataSource::Scan {
2599 relation: b,
2600 vars: vec![z, y],
2601 },
2602 join_keys: vec![z],
2603 body: Box::new(Op::Insert {
2604 relation: result,
2605 args: vec![Operand::Var(x), Operand::Var(y)],
2606 }),
2607 }
2608 }
2609
2610 fn run_plan(ir: &mangle_ir::Ir, facts: &[(&str, Vec<Value>)], op: &Op) -> Vec<Vec<Value>> {
2611 let mut store = Box::new(MemStore::new());
2612 for (rel, t) in facts {
2613 store.add_fact(rel, t.clone());
2614 }
2615 store.create_relation("result");
2616 let mut interp = Interpreter::new(ir, store as Box<dyn Store>);
2617 interp.execute(op).unwrap();
2618 let mut store = interp.into_store();
2620 store.merge_deltas();
2621 store.merge_deltas();
2622 store.scan("result").unwrap().collect()
2623 }
2624
2625 fn sorted(mut v: Vec<Vec<Value>>) -> Vec<Vec<Value>> {
2626 v.sort();
2627 v
2628 }
2629
2630 fn setup_two_way_ir()
2632 -> (mangle_ir::Ir, NameId, NameId, NameId, NameId, NameId, NameId, NameId) {
2633 let mut ir = mangle_ir::Ir::new();
2634 let a = ir.intern_name("a");
2635 let b = ir.intern_name("b");
2636 let result = ir.intern_name("result");
2637 let x = ir.intern_name("X");
2638 let z = ir.intern_name("Z");
2639 let y = ir.intern_name("Y");
2640 let z_right = ir.intern_name("Z_right");
2643 (ir, a, b, result, x, z, y, z_right)
2644 }
2645
2646 #[test]
2647 fn test_hashjoin_matches_nested_loop_basic() {
2648 let (ir, a, b, result, x, z, y, z_right) = setup_two_way_ir();
2649
2650 let facts: Vec<(&str, Vec<Value>)> = vec![
2651 ("a", vec![Value::Number(1), Value::Number(10)]),
2652 ("a", vec![Value::Number(2), Value::Number(20)]),
2653 ("a", vec![Value::Number(3), Value::Number(10)]),
2654 ("b", vec![Value::Number(10), Value::Number(100)]),
2655 ("b", vec![Value::Number(10), Value::Number(101)]),
2656 ("b", vec![Value::Number(20), Value::Number(200)]),
2657 ("b", vec![Value::Number(30), Value::Number(300)]),
2658 ];
2659
2660 let baseline = run_plan(
2661 &ir,
2662 &facts,
2663 &nested_loop_two_way(&ir, a, b, result, x, z, y, z_right),
2664 );
2665 let via_hash = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2666
2667 assert_eq!(
2668 sorted(baseline.clone()),
2669 sorted(via_hash.clone()),
2670 "HashJoin output must match nested-loop baseline"
2671 );
2672 assert_eq!(sorted(via_hash).len(), 5);
2674 }
2675
2676 #[test]
2677 fn test_hashjoin_empty_build() {
2678 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2679 let facts: Vec<(&str, Vec<Value>)> = vec![
2680 ("b", vec![Value::Number(10), Value::Number(100)]),
2681 ("b", vec![Value::Number(20), Value::Number(200)]),
2682 ];
2683 let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2684 assert!(out.is_empty());
2685 }
2686
2687 #[test]
2688 fn test_hashjoin_empty_probe() {
2689 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2690 let facts: Vec<(&str, Vec<Value>)> = vec![
2691 ("a", vec![Value::Number(1), Value::Number(10)]),
2692 ("a", vec![Value::Number(2), Value::Number(20)]),
2693 ];
2694 let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2695 assert!(out.is_empty());
2696 }
2697
2698 #[test]
2699 fn test_hashjoin_no_matches() {
2700 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2701 let facts: Vec<(&str, Vec<Value>)> = vec![
2702 ("a", vec![Value::Number(1), Value::Number(10)]),
2703 ("b", vec![Value::Number(99), Value::Number(200)]),
2704 ];
2705 let out = run_plan(&ir, &facts, &hash_join_two_way(a, b, result, x, z, y));
2706 assert!(out.is_empty());
2707 }
2708
2709 #[test]
2710 fn test_hashjoin_value_variants_as_key() {
2711 let mut ir = mangle_ir::Ir::new();
2714 let a = ir.intern_name("a");
2715 let b = ir.intern_name("b");
2716 let result = ir.intern_name("result");
2717 let x = ir.intern_name("X");
2718 let k = ir.intern_name("K");
2719 let y = ir.intern_name("Y");
2720
2721 let facts: Vec<(&str, Vec<Value>)> = vec![
2722 ("a", vec![Value::Number(1), Value::String("hello".into())]),
2723 ("a", vec![Value::Number(2), Value::Name("/foo".into())]),
2724 ("b", vec![Value::String("hello".into()), Value::Number(100)]),
2725 ("b", vec![Value::Name("/foo".into()), Value::Number(200)]),
2726 (
2728 "b",
2729 vec![Value::String("/foo".into()), Value::Number(999)],
2730 ),
2731 ];
2732 let op = hash_join_two_way(a, b, result, x, k, y);
2733 let out = sorted(run_plan(&ir, &facts, &op));
2734 assert_eq!(
2735 out,
2736 sorted(vec![
2737 vec![Value::Number(1), Value::Number(100)],
2738 vec![Value::Number(2), Value::Number(200)],
2739 ])
2740 );
2741 }
2742
2743 #[test]
2744 fn test_hashjoin_multi_key() {
2745 let mut ir = mangle_ir::Ir::new();
2748 let a = ir.intern_name("a");
2749 let b = ir.intern_name("b");
2750 let result = ir.intern_name("result");
2751 let x = ir.intern_name("X");
2752 let k1 = ir.intern_name("K1");
2753 let k2 = ir.intern_name("K2");
2754 let w = ir.intern_name("W");
2755
2756 let op = Op::HashJoin {
2757 build_source: DataSource::Scan {
2758 relation: a,
2759 vars: vec![x, k1, k2],
2760 },
2761 probe_source: DataSource::Scan {
2762 relation: b,
2763 vars: vec![k1, k2, w],
2764 },
2765 join_keys: vec![k1, k2],
2766 body: Box::new(Op::Insert {
2767 relation: result,
2768 args: vec![Operand::Var(x), Operand::Var(w)],
2769 }),
2770 };
2771
2772 let facts: Vec<(&str, Vec<Value>)> = vec![
2773 (
2774 "a",
2775 vec![Value::Number(1), Value::Number(10), Value::Number(100)],
2776 ),
2777 (
2778 "a",
2779 vec![Value::Number(2), Value::Number(10), Value::Number(200)],
2780 ),
2781 (
2783 "a",
2784 vec![Value::Number(3), Value::Number(10), Value::Number(999)],
2785 ),
2786 (
2787 "b",
2788 vec![Value::Number(10), Value::Number(100), Value::Number(1000)],
2789 ),
2790 (
2791 "b",
2792 vec![Value::Number(10), Value::Number(200), Value::Number(2000)],
2793 ),
2794 ];
2795
2796 let out = sorted(run_plan(&ir, &facts, &op));
2797 assert_eq!(
2798 out,
2799 sorted(vec![
2800 vec![Value::Number(1), Value::Number(1000)],
2801 vec![Value::Number(2), Value::Number(2000)],
2802 ])
2803 );
2804 }
2805
2806 #[test]
2807 fn test_hashjoin_duplicate_build_keys() {
2808 let (ir, a, b, result, x, z, y, _z_right) = setup_two_way_ir();
2811 let facts: Vec<(&str, Vec<Value>)> = vec![
2812 ("a", vec![Value::Number(1), Value::Number(10)]),
2813 ("a", vec![Value::Number(2), Value::Number(10)]),
2814 ("a", vec![Value::Number(3), Value::Number(10)]),
2815 ("b", vec![Value::Number(10), Value::Number(99)]),
2816 ];
2817 let op = hash_join_two_way(a, b, result, x, z, y);
2818 let out = sorted(run_plan(&ir, &facts, &op));
2819 assert_eq!(
2820 out,
2821 sorted(vec![
2822 vec![Value::Number(1), Value::Number(99)],
2823 vec![Value::Number(2), Value::Number(99)],
2824 vec![Value::Number(3), Value::Number(99)],
2825 ])
2826 );
2827 }
2828}