use thiserror::Error;
use std::collections::{BTreeMap, BTreeSet};
use crate::bind::{self, BindError, SymbolMutation, SymbolTable};
use crate::canonical::{
CanonicalRecord, Clocks, EdgeRecord, EpiRecord, InfFlags, InfRecord, ProRecord, SemFlags,
SemRecord, SymbolEventRecord,
};
use crate::clock::ClockTime;
use crate::dag::{Edge as DagEdge, EdgeKind, SupersessionDag};
use crate::parse::{self, ParseError};
use crate::semantic::{self, SemanticError, ValidatedForm};
use crate::symbol::{SymbolId, SymbolKind};
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Pipeline {
table: SymbolTable,
next_memory_counter: u64,
last_committed_at: Option<ClockTime>,
dag: SupersessionDag,
supersession_index: SupersessionIndex,
semantic_records: Vec<SemRecord>,
semantic_by_sp_history: BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
episodic_records: Vec<EpiRecord>,
procedural_records: Vec<ProRecord>,
procedural_by_rule_history: BTreeMap<SymbolId, Vec<usize>>,
inferential_records: Vec<InfRecord>,
inferential_by_sp_history: BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
decay_config: crate::decay::DecayConfig,
episode_committed_at: BTreeMap<SymbolId, ClockTime>,
episode_parent: BTreeMap<SymbolId, SymbolId>,
pending_episode_metadata: Option<PendingEpisodeMetadata>,
pinned_memories: BTreeSet<SymbolId>,
authoritative_memories: BTreeSet<SymbolId>,
inferentials_by_parent: BTreeMap<SymbolId, Vec<SymbolId>>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct PendingEpisodeMetadata {
pub label: Option<String>,
pub parent_episode: Option<SymbolId>,
pub retracts: Vec<SymbolId>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct SupersessionIndex {
semantic_by_sp: BTreeMap<(SymbolId, SymbolId), CurrentSemantic>,
inferential_by_sp: BTreeMap<(SymbolId, SymbolId), CurrentSemantic>,
procedural_by_rule: BTreeMap<SymbolId, CurrentProcedural>,
procedural_by_trigger_scope: BTreeMap<(Vec<u8>, SymbolId), CurrentProcedural>,
procedural_keys_by_memory: BTreeMap<SymbolId, ProceduralKeys>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct CurrentSemantic {
memory_id: SymbolId,
valid_at: ClockTime,
}
pub const MAX_EPISODE_CHAIN_DEPTH: usize = 1024;
#[derive(Clone, Debug, PartialEq, Eq)]
struct ProceduralKeys {
rule_id: SymbolId,
trigger_scope: (Vec<u8>, SymbolId),
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct CurrentProcedural {
memory_id: SymbolId,
committed_at: ClockTime,
}
impl Pipeline {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn table(&self) -> &SymbolTable {
&self.table
}
#[must_use]
pub fn decay_config(&self) -> &crate::decay::DecayConfig {
&self.decay_config
}
pub fn set_decay_config(&mut self, cfg: crate::decay::DecayConfig) {
self.decay_config = cfg;
}
pub fn replay_allocate(
&mut self,
id: SymbolId,
name: String,
kind: SymbolKind,
) -> Result<(), BindError> {
self.table.replay_allocate(id, name, kind)
}
pub fn replay_alias(&mut self, id: SymbolId, alias: String) -> Result<(), BindError> {
self.table.replay_alias(id, alias)
}
pub fn replay_rename(&mut self, id: SymbolId, new_canonical: String) -> Result<(), BindError> {
self.table.replay_rename(id, new_canonical)
}
pub fn replay_retire(&mut self, id: SymbolId, name: String) -> Result<(), BindError> {
self.table.replay_retire(id, name)
}
pub fn set_next_memory_counter(&mut self, counter: u64) {
self.next_memory_counter = counter;
}
#[must_use]
pub fn next_memory_counter(&self) -> u64 {
self.next_memory_counter
}
#[must_use]
pub fn last_committed_at(&self) -> Option<ClockTime> {
self.last_committed_at
}
pub fn advance_last_committed_at(&mut self, at: ClockTime) {
if self.last_committed_at.is_none_or(|prev| at > prev) {
self.last_committed_at = Some(at);
}
}
#[must_use]
pub fn dag(&self) -> &SupersessionDag {
&self.dag
}
#[must_use]
pub fn semantic_records(&self) -> &[SemRecord] {
&self.semantic_records
}
#[must_use]
pub fn episodic_records(&self) -> &[EpiRecord] {
&self.episodic_records
}
#[must_use]
pub fn procedural_records(&self) -> &[ProRecord] {
&self.procedural_records
}
#[must_use]
pub fn semantic_history_at(&self, s: SymbolId, p: SymbolId) -> &[usize] {
self.semantic_by_sp_history
.get(&(s, p))
.map_or(&[], Vec::as_slice)
}
#[must_use]
pub fn procedural_history_for(&self, rule_id: SymbolId) -> &[usize] {
self.procedural_by_rule_history
.get(&rule_id)
.map_or(&[], Vec::as_slice)
}
#[must_use]
pub fn inferential_records(&self) -> &[InfRecord] {
&self.inferential_records
}
#[must_use]
pub fn inferential_history_at(&self, s: SymbolId, p: SymbolId) -> &[usize] {
self.inferential_by_sp_history
.get(&(s, p))
.map_or(&[], Vec::as_slice)
}
pub fn register_episode(&mut self, episode_id: SymbolId, at: ClockTime) {
self.episode_committed_at.entry(episode_id).or_insert(at);
}
#[must_use]
pub fn episode_committed_at(&self, episode_id: SymbolId) -> Option<ClockTime> {
self.episode_committed_at.get(&episode_id).copied()
}
pub fn iter_episodes(&self) -> impl Iterator<Item = (SymbolId, ClockTime)> + '_ {
self.episode_committed_at.iter().map(|(id, at)| (*id, *at))
}
pub fn register_episode_parent(&mut self, child: SymbolId, parent: SymbolId) {
self.episode_parent.entry(child).or_insert(parent);
}
#[must_use]
pub fn episode_parent(&self, episode_id: SymbolId) -> Option<SymbolId> {
self.episode_parent.get(&episode_id).copied()
}
pub fn take_pending_episode_metadata(&mut self) -> Option<PendingEpisodeMetadata> {
self.pending_episode_metadata.take()
}
#[must_use]
pub fn is_pinned(&self, memory_id: SymbolId) -> bool {
self.pinned_memories.contains(&memory_id)
}
#[must_use]
pub fn is_authoritative(&self, memory_id: SymbolId) -> bool {
self.authoritative_memories.contains(&memory_id)
}
pub fn replay_flag(&mut self, record: &CanonicalRecord) {
match record {
CanonicalRecord::Pin(r) => {
self.pinned_memories.insert(r.memory_id);
}
CanonicalRecord::Unpin(r) => {
self.pinned_memories.remove(&r.memory_id);
}
CanonicalRecord::AuthoritativeSet(r) => {
self.authoritative_memories.insert(r.memory_id);
}
CanonicalRecord::AuthoritativeClear(r) => {
self.authoritative_memories.remove(&r.memory_id);
}
_ => {} }
}
pub fn episode_chain(&self, episode_id: SymbolId) -> impl Iterator<Item = SymbolId> + '_ {
let mut current = Some(episode_id);
let mut depth = 0_usize;
std::iter::from_fn(move || {
if depth >= MAX_EPISODE_CHAIN_DEPTH {
return None;
}
let id = current?;
depth += 1;
current = self.episode_parent.get(&id).copied();
Some(id)
})
}
pub fn replay_edge(&mut self, edge: crate::dag::Edge) -> Result<(), crate::dag::DagError> {
self.dag.add_edge(edge)
}
pub fn replay_memory_record(&mut self, record: &CanonicalRecord) {
match record {
CanonicalRecord::Sem(sem) => {
let key = (sem.s, sem.p);
let sem_index = self.semantic_records.len();
self.semantic_records.push(sem.clone());
self.semantic_by_sp_history
.entry(key)
.or_default()
.push(sem_index);
let replace = self
.supersession_index
.semantic_by_sp
.get(&key)
.is_none_or(|existing| sem.clocks.valid_at > existing.valid_at);
if replace {
self.supersession_index.semantic_by_sp.insert(
key,
CurrentSemantic {
memory_id: sem.memory_id,
valid_at: sem.clocks.valid_at,
},
);
}
}
CanonicalRecord::Epi(epi) => {
self.episodic_records.push(epi.clone());
}
CanonicalRecord::Inf(inf) => {
for parent in &inf.derived_from {
self.inferentials_by_parent
.entry(*parent)
.or_default()
.push(inf.memory_id);
}
let inf_index = self.inferential_records.len();
let key = (inf.s, inf.p);
let valid_at = inf.clocks.valid_at;
self.inferential_records.push(inf.clone());
self.inferential_by_sp_history
.entry(key)
.or_default()
.push(inf_index);
let replace = self
.supersession_index
.inferential_by_sp
.get(&key)
.is_none_or(|existing| valid_at > existing.valid_at);
if replace {
self.supersession_index.inferential_by_sp.insert(
key,
CurrentSemantic {
memory_id: inf.memory_id,
valid_at,
},
);
}
}
CanonicalRecord::Pro(pro) => {
let pro_index = self.procedural_records.len();
self.procedural_records.push(pro.clone());
self.procedural_by_rule_history
.entry(pro.rule_id)
.or_default()
.push(pro_index);
replay_procedural_supersession(
&mut self.supersession_index,
pro.memory_id,
pro.clocks.committed_at,
pro.rule_id,
&pro.trigger,
pro.scope,
);
}
_ => {}
}
}
pub fn allocate_episode_symbol(&mut self, counter: u64) -> Result<SymbolId, EmitError> {
let name = format!("__ep_{counter}");
self.table
.allocate(name.clone(), SymbolKind::Memory)
.map_err(|cause| EmitError::MemoryIdAllocation { name, cause })
}
pub fn compile_batch(
&mut self,
input: &str,
wall_now: ClockTime,
) -> Result<Vec<CanonicalRecord>, PipelineError> {
let span = tracing::info_span!(
"mimir.pipeline.compile_batch",
input_len = input.len(),
record_count = tracing::field::Empty,
memory_count = tracing::field::Empty,
edge_count = tracing::field::Empty,
);
let _enter = span.enter();
let forms = parse::parse(input).map_err(PipelineError::Parse)?;
let effective_now = monotonic_commit_clock(wall_now, self.last_committed_at)?;
let mut working_table = self.table.clone();
let mut working_counter = self.next_memory_counter;
let mut working_dag = self.dag.clone();
let mut working_index = self.supersession_index.clone();
let mut working_sem_records = self.semantic_records.clone();
let mut working_sem_by_sp = self.semantic_by_sp_history.clone();
let mut working_epi_records = self.episodic_records.clone();
let mut working_pro_records = self.procedural_records.clone();
let mut working_pro_by_rule = self.procedural_by_rule_history.clone();
let mut working_inf_records = self.inferential_records.clone();
let mut working_inf_by_sp = self.inferential_by_sp_history.clone();
let (bound, journal) =
bind::bind(forms, &mut working_table).map_err(PipelineError::Bind)?;
let validated =
semantic::validate(bound, &working_table, wall_now).map_err(PipelineError::Semantic)?;
let mut working_pending_meta: Option<PendingEpisodeMetadata> = None;
let mut working_pinned = self.pinned_memories.clone();
let mut working_authoritative = self.authoritative_memories.clone();
let mut working_infs_by_parent = self.inferentials_by_parent.clone();
let mut emit_state = EmitState {
table: &mut working_table,
counter: &mut working_counter,
dag: &mut working_dag,
index: &mut working_index,
semantic_records: &mut working_sem_records,
semantic_by_sp: &mut working_sem_by_sp,
episodic_records: &mut working_epi_records,
procedural_records: &mut working_pro_records,
procedural_by_rule: &mut working_pro_by_rule,
inferential_records: &mut working_inf_records,
inferential_by_sp: &mut working_inf_by_sp,
pending_episode: &mut working_pending_meta,
pinned: &mut working_pinned,
authoritative: &mut working_authoritative,
inferentials_by_parent: &mut working_infs_by_parent,
now: effective_now,
};
let records = emit(&validated, &journal, &mut emit_state).map_err(PipelineError::Emit)?;
self.table = working_table;
self.next_memory_counter = working_counter;
self.last_committed_at = Some(effective_now);
self.dag = working_dag;
self.supersession_index = working_index;
self.semantic_records = working_sem_records;
self.semantic_by_sp_history = working_sem_by_sp;
self.episodic_records = working_epi_records;
self.procedural_records = working_pro_records;
self.procedural_by_rule_history = working_pro_by_rule;
self.inferential_records = working_inf_records;
self.inferential_by_sp_history = working_inf_by_sp;
self.pending_episode_metadata = working_pending_meta;
self.pinned_memories = working_pinned;
self.authoritative_memories = working_authoritative;
self.inferentials_by_parent = working_infs_by_parent;
let (memory_count, edge_count) = count_memory_and_edge_records(&records);
span.record("record_count", records.len());
span.record("memory_count", memory_count);
span.record("edge_count", edge_count);
Ok(records)
}
}
fn count_memory_and_edge_records(records: &[CanonicalRecord]) -> (usize, usize) {
let mut memory = 0_usize;
let mut edge = 0_usize;
for r in records {
match r {
CanonicalRecord::Sem(_)
| CanonicalRecord::Epi(_)
| CanonicalRecord::Pro(_)
| CanonicalRecord::Inf(_) => memory += 1,
CanonicalRecord::Supersedes(_)
| CanonicalRecord::Corrects(_)
| CanonicalRecord::StaleParent(_)
| CanonicalRecord::Reconfirms(_) => edge += 1,
_ => {}
}
}
(memory, edge)
}
struct EmitState<'a> {
table: &'a mut SymbolTable,
counter: &'a mut u64,
dag: &'a mut SupersessionDag,
index: &'a mut SupersessionIndex,
semantic_records: &'a mut Vec<SemRecord>,
semantic_by_sp: &'a mut BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
episodic_records: &'a mut Vec<EpiRecord>,
procedural_records: &'a mut Vec<ProRecord>,
procedural_by_rule: &'a mut BTreeMap<SymbolId, Vec<usize>>,
inferential_records: &'a mut Vec<InfRecord>,
inferential_by_sp: &'a mut BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
pending_episode: &'a mut Option<PendingEpisodeMetadata>,
pinned: &'a mut BTreeSet<SymbolId>,
authoritative: &'a mut BTreeSet<SymbolId>,
inferentials_by_parent: &'a mut BTreeMap<SymbolId, Vec<SymbolId>>,
now: ClockTime,
}
fn monotonic_commit_clock(
wall_now: ClockTime,
last_committed_at: Option<ClockTime>,
) -> Result<ClockTime, PipelineError> {
let Some(prev) = last_committed_at else {
return Ok(wall_now);
};
if wall_now > prev {
return Ok(wall_now);
}
let next_raw = prev
.as_millis()
.checked_add(1)
.ok_or(PipelineError::ClockExhausted {
last_committed_at: prev,
})?;
ClockTime::try_from_millis(next_raw).map_err(|_| PipelineError::ClockExhausted {
last_committed_at: prev,
})
}
fn emit(
forms: &[ValidatedForm],
journal: &[SymbolMutation],
state: &mut EmitState,
) -> Result<Vec<CanonicalRecord>, EmitError> {
let mut out = Vec::with_capacity(journal.len() + forms.len());
for mutation in journal {
out.push(emit_symbol_mutation(mutation, state.now));
}
for form in forms {
if matches!(
form,
ValidatedForm::Alias { .. }
| ValidatedForm::Rename { .. }
| ValidatedForm::Retire { .. }
) {
continue;
}
emit_form(form, state, &mut out)?;
}
Ok(out)
}
fn emit_symbol_mutation(mutation: &SymbolMutation, now: ClockTime) -> CanonicalRecord {
match mutation {
SymbolMutation::Allocate { id, name, kind } => {
CanonicalRecord::SymbolAlloc(SymbolEventRecord {
symbol_id: *id,
name: name.clone(),
symbol_kind: *kind,
at: now,
})
}
SymbolMutation::Rename {
id,
new_canonical,
kind,
} => CanonicalRecord::SymbolRename(SymbolEventRecord {
symbol_id: *id,
name: new_canonical.clone(),
symbol_kind: *kind,
at: now,
}),
SymbolMutation::Alias { id, alias, kind } => {
CanonicalRecord::SymbolAlias(SymbolEventRecord {
symbol_id: *id,
name: alias.clone(),
symbol_kind: *kind,
at: now,
})
}
SymbolMutation::Retire { id, name, kind } => {
CanonicalRecord::SymbolRetire(SymbolEventRecord {
symbol_id: *id,
name: name.clone(),
symbol_kind: *kind,
at: now,
})
}
}
}
#[allow(clippy::too_many_lines)]
fn emit_form(
form: &ValidatedForm,
state: &mut EmitState,
out: &mut Vec<CanonicalRecord>,
) -> Result<(), EmitError> {
match form {
ValidatedForm::Sem {
s,
p,
o,
source,
confidence,
valid_at,
projected,
..
} => {
let memory_id = allocate_memory_id(state, out)?;
let (record_invalid_at, supersession) =
resolve_semantic_supersession(state.index, memory_id, *s, *p, *valid_at)?;
let sem = SemRecord {
memory_id,
s: *s,
p: *p,
o: o.clone(),
source: *source,
confidence: *confidence,
clocks: Clocks {
valid_at: *valid_at,
observed_at: state.now,
committed_at: state.now,
invalid_at: record_invalid_at,
},
flags: SemFlags {
projected: *projected,
},
};
let sem_index = state.semantic_records.len();
state.semantic_records.push(sem.clone());
state
.semantic_by_sp
.entry((*s, *p))
.or_default()
.push(sem_index);
out.push(CanonicalRecord::Sem(sem));
if let Some(target) = supersession {
emit_supersedes_edge(state, out, memory_id, target)?;
}
}
ValidatedForm::Epi {
event_id,
kind,
participants,
location,
at_time,
observed_at,
source,
confidence,
..
} => {
let memory_id = allocate_memory_id(state, out)?;
let epi = EpiRecord {
memory_id,
event_id: *event_id,
kind: *kind,
participants: participants.clone(),
location: *location,
at_time: *at_time,
observed_at: *observed_at,
source: *source,
confidence: *confidence,
committed_at: state.now,
invalid_at: None,
};
state.episodic_records.push(epi.clone());
out.push(CanonicalRecord::Epi(epi));
}
ValidatedForm::Pro {
rule_id,
trigger,
action,
precondition,
scope,
source,
confidence,
..
} => {
let memory_id = allocate_memory_id(state, out)?;
let pro = ProRecord {
memory_id,
rule_id: *rule_id,
trigger: trigger.clone(),
action: action.clone(),
precondition: precondition.clone(),
scope: *scope,
source: *source,
confidence: *confidence,
clocks: Clocks {
valid_at: state.now,
observed_at: state.now,
committed_at: state.now,
invalid_at: None,
},
};
let pro_index = state.procedural_records.len();
state.procedural_records.push(pro.clone());
state
.procedural_by_rule
.entry(*rule_id)
.or_default()
.push(pro_index);
out.push(CanonicalRecord::Pro(pro));
let superseded = apply_procedural_supersession(
state.index,
memory_id,
state.now,
*rule_id,
trigger,
*scope,
)?;
for old in superseded {
emit_supersedes_edge(state, out, memory_id, old)?;
}
}
ValidatedForm::Inf {
s,
p,
o,
derived_from,
method,
confidence,
valid_at,
projected,
} => {
let memory_id = allocate_memory_id(state, out)?;
let born_stale = derived_from
.iter()
.any(|parent| parent_is_superseded(state.dag, *parent));
let (record_invalid_at, supersession) =
resolve_inferential_supersession(state.index, memory_id, *s, *p, *valid_at)?;
let inf = InfRecord {
memory_id,
s: *s,
p: *p,
o: o.clone(),
derived_from: derived_from.clone(),
method: *method,
confidence: *confidence,
clocks: Clocks {
valid_at: *valid_at,
observed_at: state.now,
committed_at: state.now,
invalid_at: record_invalid_at,
},
flags: InfFlags {
projected: *projected,
stale: born_stale,
},
};
let inf_index = state.inferential_records.len();
state.inferential_records.push(inf.clone());
state
.inferential_by_sp
.entry((*s, *p))
.or_default()
.push(inf_index);
out.push(CanonicalRecord::Inf(inf));
for parent in derived_from {
state
.inferentials_by_parent
.entry(*parent)
.or_default()
.push(memory_id);
}
if let Some(target) = supersession {
emit_supersedes_edge(state, out, memory_id, target)?;
}
}
ValidatedForm::Alias { .. }
| ValidatedForm::Rename { .. }
| ValidatedForm::Retire { .. } => {
return Err(EmitError::Unsupported {
form: "symbol-event-form-without-journal",
})
}
ValidatedForm::Correct { .. } => return Err(EmitError::Unsupported { form: "correct" }),
ValidatedForm::Promote { .. } => return Err(EmitError::Unsupported { form: "promote" }),
ValidatedForm::Query { .. } => return Err(EmitError::Unsupported { form: "query" }),
ValidatedForm::Episode {
action,
label,
parent_episode,
retracts,
} => {
if matches!(action, crate::parse::EpisodeAction::Start) {
*state.pending_episode = Some(PendingEpisodeMetadata {
label: label.clone(),
parent_episode: *parent_episode,
retracts: retracts.clone(),
});
} else {
*state.pending_episode = None;
}
}
ValidatedForm::Flag {
action,
memory,
actor,
} => {
let record = crate::canonical::FlagEventRecord {
memory_id: *memory,
at: state.now,
actor_symbol: *actor,
};
match action {
crate::parse::FlagAction::Pin => {
state.pinned.insert(*memory);
out.push(CanonicalRecord::Pin(record));
}
crate::parse::FlagAction::Unpin => {
state.pinned.remove(memory);
out.push(CanonicalRecord::Unpin(record));
}
crate::parse::FlagAction::AuthoritativeSet => {
state.authoritative.insert(*memory);
out.push(CanonicalRecord::AuthoritativeSet(record));
}
crate::parse::FlagAction::AuthoritativeClear => {
state.authoritative.remove(memory);
out.push(CanonicalRecord::AuthoritativeClear(record));
}
}
}
}
Ok(())
}
fn resolve_semantic_supersession(
index: &mut SupersessionIndex,
new_memory_id: SymbolId,
s: SymbolId,
p: SymbolId,
new_valid_at: ClockTime,
) -> Result<(Option<ClockTime>, Option<SymbolId>), EmitError> {
let key = (s, p);
let Some(old) = index.semantic_by_sp.get(&key).copied() else {
index.semantic_by_sp.insert(
key,
CurrentSemantic {
memory_id: new_memory_id,
valid_at: new_valid_at,
},
);
return Ok((None, None));
};
match new_valid_at.cmp(&old.valid_at) {
std::cmp::Ordering::Greater => {
index.semantic_by_sp.insert(
key,
CurrentSemantic {
memory_id: new_memory_id,
valid_at: new_valid_at,
},
);
tracing::info!(
target: "mimir.supersession",
kind = "semantic",
direction = "forward",
s = %s,
p = %p,
old_memory_id = %old.memory_id,
new_memory_id = %new_memory_id,
"semantic auto-supersession",
);
Ok((None, Some(old.memory_id)))
}
std::cmp::Ordering::Less => {
tracing::info!(
target: "mimir.supersession",
kind = "semantic",
direction = "retroactive",
s = %s,
p = %p,
old_memory_id = %old.memory_id,
new_memory_id = %new_memory_id,
"semantic auto-supersession",
);
Ok((Some(old.valid_at), Some(old.memory_id)))
}
std::cmp::Ordering::Equal => Err(EmitError::SemanticSupersessionConflict {
s,
p,
valid_at: new_valid_at,
existing: old.memory_id,
}),
}
}
fn resolve_inferential_supersession(
index: &mut SupersessionIndex,
new_memory_id: SymbolId,
s: SymbolId,
p: SymbolId,
new_valid_at: ClockTime,
) -> Result<(Option<ClockTime>, Option<SymbolId>), EmitError> {
let key = (s, p);
let Some(old) = index.inferential_by_sp.get(&key).copied() else {
index.inferential_by_sp.insert(
key,
CurrentSemantic {
memory_id: new_memory_id,
valid_at: new_valid_at,
},
);
return Ok((None, None));
};
match new_valid_at.cmp(&old.valid_at) {
std::cmp::Ordering::Greater => {
index.inferential_by_sp.insert(
key,
CurrentSemantic {
memory_id: new_memory_id,
valid_at: new_valid_at,
},
);
tracing::info!(
target: "mimir.supersession",
kind = "inferential",
direction = "forward",
s = %s,
p = %p,
old_memory_id = %old.memory_id,
new_memory_id = %new_memory_id,
"inferential auto-supersession",
);
Ok((None, Some(old.memory_id)))
}
std::cmp::Ordering::Less => {
tracing::info!(
target: "mimir.supersession",
kind = "inferential",
direction = "retroactive",
s = %s,
p = %p,
old_memory_id = %old.memory_id,
new_memory_id = %new_memory_id,
"inferential auto-supersession",
);
Ok((Some(old.valid_at), Some(old.memory_id)))
}
std::cmp::Ordering::Equal => Err(EmitError::InferentialSupersessionConflict {
s,
p,
valid_at: new_valid_at,
existing: old.memory_id,
}),
}
}
fn apply_procedural_supersession(
index: &mut SupersessionIndex,
new_memory_id: SymbolId,
new_committed_at: ClockTime,
rule_id: SymbolId,
trigger: &crate::Value,
scope: SymbolId,
) -> Result<Vec<SymbolId>, EmitError> {
let (superseded, trigger_scope_key) = procedural_lookup(index, rule_id, trigger, scope);
for old in &superseded {
if old.committed_at == new_committed_at {
return Err(EmitError::ProceduralSupersessionConflict {
rule_id,
existing: old.memory_id,
});
}
}
procedural_install(
index,
&superseded,
new_memory_id,
new_committed_at,
rule_id,
trigger_scope_key,
);
if !superseded.is_empty() {
tracing::info!(
target: "mimir.supersession",
kind = "procedural",
rule_id = %rule_id,
new_memory_id = %new_memory_id,
superseded_count = superseded.len(),
"procedural auto-supersession",
);
}
Ok(superseded.into_iter().map(|c| c.memory_id).collect())
}
fn replay_procedural_supersession(
index: &mut SupersessionIndex,
new_memory_id: SymbolId,
new_committed_at: ClockTime,
rule_id: SymbolId,
trigger: &crate::Value,
scope: SymbolId,
) {
let (superseded, trigger_scope_key) = procedural_lookup(index, rule_id, trigger, scope);
procedural_install(
index,
&superseded,
new_memory_id,
new_committed_at,
rule_id,
trigger_scope_key,
);
}
fn procedural_lookup(
index: &SupersessionIndex,
rule_id: SymbolId,
trigger: &crate::Value,
scope: SymbolId,
) -> (Vec<CurrentProcedural>, (Vec<u8>, SymbolId)) {
let trigger_scope_key = (trigger.index_key_bytes(), scope);
let by_rule = index.procedural_by_rule.get(&rule_id).copied();
let by_ts = index
.procedural_by_trigger_scope
.get(&trigger_scope_key)
.copied();
let mut superseded: Vec<CurrentProcedural> = Vec::new();
if let Some(old) = by_rule {
superseded.push(old);
}
if let Some(old) = by_ts {
if !superseded
.iter()
.any(|existing| existing.memory_id == old.memory_id)
{
superseded.push(old);
}
}
(superseded, trigger_scope_key)
}
fn procedural_install(
index: &mut SupersessionIndex,
superseded: &[CurrentProcedural],
new_memory_id: SymbolId,
new_committed_at: ClockTime,
rule_id: SymbolId,
trigger_scope_key: (Vec<u8>, SymbolId),
) {
for old in superseded {
if let Some(keys) = index.procedural_keys_by_memory.remove(&old.memory_id) {
index.procedural_by_rule.remove(&keys.rule_id);
index
.procedural_by_trigger_scope
.remove(&keys.trigger_scope);
}
}
let new_entry = CurrentProcedural {
memory_id: new_memory_id,
committed_at: new_committed_at,
};
let new_keys = ProceduralKeys {
rule_id,
trigger_scope: trigger_scope_key.clone(),
};
index.procedural_by_rule.insert(rule_id, new_entry);
index
.procedural_by_trigger_scope
.insert(trigger_scope_key, new_entry);
index
.procedural_keys_by_memory
.insert(new_memory_id, new_keys);
}
fn emit_supersedes_edge(
state: &mut EmitState,
out: &mut Vec<CanonicalRecord>,
from: SymbolId,
to: SymbolId,
) -> Result<(), EmitError> {
out.push(CanonicalRecord::Supersedes(EdgeRecord {
from,
to,
at: state.now,
}));
state
.dag
.add_edge(DagEdge {
kind: EdgeKind::Supersedes,
from,
to,
at: state.now,
})
.map_err(EmitError::SupersessionDag)?;
if let Some(dependents) = state.inferentials_by_parent.get(&to).cloned() {
for inf_id in dependents {
emit_stale_parent_edge(state, out, inf_id, to)?;
}
}
Ok(())
}
fn emit_stale_parent_edge(
state: &mut EmitState,
out: &mut Vec<CanonicalRecord>,
inf_id: SymbolId,
parent_id: SymbolId,
) -> Result<(), EmitError> {
out.push(CanonicalRecord::StaleParent(EdgeRecord {
from: inf_id,
to: parent_id,
at: state.now,
}));
state
.dag
.add_edge(DagEdge {
kind: EdgeKind::StaleParent,
from: inf_id,
to: parent_id,
at: state.now,
})
.map_err(EmitError::SupersessionDag)?;
Ok(())
}
fn parent_is_superseded(dag: &SupersessionDag, parent: SymbolId) -> bool {
dag.edges_to(parent)
.any(|e| matches!(e.kind, EdgeKind::Supersedes))
}
fn allocate_memory_id(
state: &mut EmitState,
out: &mut Vec<CanonicalRecord>,
) -> Result<SymbolId, EmitError> {
let name = format!("__mem_{}", *state.counter);
*state.counter += 1;
let id = state
.table
.allocate(name.clone(), SymbolKind::Memory)
.map_err(|cause| EmitError::MemoryIdAllocation {
name: name.clone(),
cause,
})?;
out.push(CanonicalRecord::SymbolAlloc(SymbolEventRecord {
symbol_id: id,
name,
symbol_kind: SymbolKind::Memory,
at: state.now,
}));
Ok(id)
}
#[derive(Debug, Error, PartialEq)]
pub enum PipelineError {
#[error("parse error: {0}")]
Parse(#[from] ParseError),
#[error("bind error: {0}")]
Bind(#[from] BindError),
#[error("semantic error: {0}")]
Semantic(#[from] SemanticError),
#[error("emit error: {0}")]
Emit(#[from] EmitError),
#[error(
"committed_at clock exhausted: monotonic advance past {last_committed_at} would hit reserved sentinel"
)]
ClockExhausted {
last_committed_at: ClockTime,
},
}
#[derive(Debug, Error, PartialEq)]
pub enum EmitError {
#[error("form {form} is not yet emitted by this pipeline milestone")]
Unsupported {
form: &'static str,
},
#[error("memory-id allocation failed for {name}: {cause}")]
MemoryIdAllocation {
name: String,
cause: BindError,
},
#[error(
"semantic supersession conflict at (s={s:?}, p={p:?}) valid_at={valid_at}: new memory has the same valid_at as existing memory {existing:?}"
)]
SemanticSupersessionConflict {
s: SymbolId,
p: SymbolId,
valid_at: ClockTime,
existing: SymbolId,
},
#[error(
"inferential supersession conflict at (s={s:?}, p={p:?}) valid_at={valid_at}: new memory has the same valid_at as existing memory {existing:?}"
)]
InferentialSupersessionConflict {
s: SymbolId,
p: SymbolId,
valid_at: ClockTime,
existing: SymbolId,
},
#[error("supersession DAG rejected edge: {0}")]
SupersessionDag(#[from] crate::dag::DagError),
#[error(
"procedural supersession conflict: batch contains two Pro writes at the same supersession key (rule_id={rule_id:?}), first bound to {existing:?}"
)]
ProceduralSupersessionConflict {
rule_id: SymbolId,
existing: SymbolId,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::canonical::Opcode;
fn now() -> ClockTime {
ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
}
const SEM_OK: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
fn memory_records(records: &[CanonicalRecord]) -> Vec<&CanonicalRecord> {
records
.iter()
.filter(|r| {
matches!(
r.opcode(),
Opcode::Sem | Opcode::Epi | Opcode::Pro | Opcode::Inf
)
})
.collect()
}
#[test]
fn pathological_agent_collides_with_mem_counter() {
let mut pipe = Pipeline::new();
let input = "(sem @alice @knows @__mem_0 :src @observation :c 0.8 :v 2024-01-15)";
let err = pipe
.compile_batch(input, now())
.expect_err("__mem_0 collision");
let PipelineError::Emit(EmitError::MemoryIdAllocation { name, .. }) = &err else {
panic!("expected MemoryIdAllocation error, got {err:?}");
};
assert_eq!(name, "__mem_0");
assert!(pipe.table.lookup("__mem_0").is_none());
assert!(pipe.table.lookup("alice").is_none());
assert_eq!(pipe.next_memory_counter, 0);
}
#[test]
fn single_sem_form_roundtrips_through_pipeline() {
let mut pipe = Pipeline::new();
let records = pipe.compile_batch(SEM_OK, now()).expect("compile");
let mems = memory_records(&records);
assert_eq!(mems.len(), 1);
assert_eq!(mems[0].opcode(), Opcode::Sem);
for r in &records {
assert!(matches!(r.opcode(), Opcode::Sem | Opcode::SymbolAlloc));
}
}
#[test]
fn epi_records_are_retained_after_compile() {
let mut pipe = Pipeline::new();
let input = "(epi @evt_001 @rename (@old @new) @github \
:at 2024-01-15T10:00:00Z :obs 2024-01-15T10:00:05Z \
:src @observation :c 0.9)";
let records = pipe.compile_batch(input, now()).expect("compile");
let mems = memory_records(&records);
assert_eq!(mems.len(), 1);
assert_eq!(mems[0].opcode(), Opcode::Epi);
assert_eq!(pipe.episodic_records().len(), 1);
let retained = &pipe.episodic_records()[0];
assert_eq!(
retained.event_id,
pipe.table.lookup("evt_001").expect("evt")
);
assert_eq!(retained.kind, pipe.table.lookup("rename").expect("kind"));
assert_eq!(retained.participants.len(), 2);
}
#[test]
fn multi_form_batch_emits_in_input_order() {
let mut pipe = Pipeline::new();
let input = "
(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
(sem @alice @knows @carol :src @observation :c 0.7 :v 2024-01-16)
";
let records = pipe.compile_batch(input, now()).expect("compile");
let mems = memory_records(&records);
assert_eq!(mems.len(), 2);
for r in &mems {
assert_eq!(r.opcode(), Opcode::Sem);
}
}
#[test]
fn empty_input_is_a_no_op_batch() {
let mut pipe = Pipeline::new();
let records = pipe.compile_batch("", now()).expect("empty compiles");
assert!(records.is_empty());
assert_eq!(pipe.next_memory_counter, 0);
assert_eq!(pipe.last_committed_at(), Some(now()));
let records = pipe
.compile_batch(" \n\t ", now())
.expect("whitespace compiles");
assert!(records.is_empty());
assert_eq!(pipe.next_memory_counter, 0);
assert_eq!(
pipe.last_committed_at().expect("set").as_millis(),
now().as_millis() + 1
);
}
#[test]
fn parse_error_does_not_mutate_table() {
let mut pipe = Pipeline::new();
let before_table = pipe.table.clone();
let err = pipe.compile_batch("(sem @a", now()).expect_err("malformed");
assert!(matches!(err, PipelineError::Parse(_)));
assert_eq!(pipe.table, before_table);
assert_eq!(pipe.next_memory_counter, 0);
}
#[test]
fn bind_error_in_mid_batch_rolls_back_all_prior_allocations() {
let mut pipe = Pipeline::new();
let input = "
(sem @x @rel @y :src @observation :c 0.8 :v 2024-01-15)
(sem @alice @x @z :src @observation :c 0.8 :v 2024-01-16)
";
let err = pipe.compile_batch(input, now()).expect_err("kind mismatch");
assert!(matches!(err, PipelineError::Bind(_)));
assert!(pipe.table.lookup("x").is_none());
assert!(pipe.table.lookup("rel").is_none());
assert!(pipe.table.lookup("y").is_none());
assert_eq!(pipe.next_memory_counter, 0);
}
#[test]
fn semantic_error_rolls_back_all_prior_allocations() {
let mut pipe = Pipeline::new();
let input = "(sem @a @knows @b :src @registry :c 0.99 :v 2024-01-15)";
let err = pipe
.compile_batch(input, now())
.expect_err("conf over bound");
assert!(matches!(err, PipelineError::Semantic(_)));
assert!(pipe.table.lookup("a").is_none());
assert!(pipe.table.lookup("b").is_none());
}
#[test]
fn successful_batch_commits_table_and_counter() {
let mut pipe = Pipeline::new();
let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
assert!(pipe.table.lookup("alice").is_some());
assert_eq!(pipe.next_memory_counter, 1);
let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
let _ = pipe.compile_batch(input2, now()).expect("second");
assert_eq!(pipe.next_memory_counter, 2);
}
#[test]
fn successive_calls_produce_distinct_memory_ids() {
let mut pipe = Pipeline::new();
let r1 = pipe.compile_batch(SEM_OK, now()).expect("first");
let r2 = pipe
.compile_batch(
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-01-16)",
now(),
)
.expect("second");
let Some(CanonicalRecord::Sem(s1)) = memory_records(&r1).first().copied() else {
panic!("expected Sem in first batch");
};
let Some(CanonicalRecord::Sem(s2)) = memory_records(&r2).first().copied() else {
panic!("expected Sem in second batch");
};
assert_ne!(s1.memory_id, s2.memory_id);
}
#[test]
fn retire_form_emits_symbol_retire_not_error() {
let mut pipe = Pipeline::new();
let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
let records = pipe
.compile_batch("(retire @alice)", now())
.expect("retire supported");
assert!(records.iter().any(|r| r.opcode() == Opcode::SymbolRetire));
assert!(memory_records(&records).is_empty());
}
#[test]
fn same_input_produces_byte_identical_records() {
let input = "
(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
(sem @alice @knows @carol :src @observation :c 0.7 :v 2024-01-16)
";
let fixed_now = now();
let mut pipe_a = Pipeline::new();
let mut pipe_b = Pipeline::new();
let a = pipe_a.compile_batch(input, fixed_now).expect("a");
let b = pipe_b.compile_batch(input, fixed_now).expect("b");
assert_eq!(a, b);
}
#[test]
fn clocks_populated_from_now_parameter() {
let mut pipe = Pipeline::new();
let t = now();
let records = pipe.compile_batch(SEM_OK, t).expect("compile");
let Some(CanonicalRecord::Sem(sem)) = memory_records(&records).first().copied() else {
panic!("expected Sem");
};
assert_eq!(sem.clocks.observed_at, t);
assert_eq!(sem.clocks.committed_at, t);
assert_eq!(sem.clocks.invalid_at, None);
}
#[test]
fn first_batch_uses_wall_clock_as_committed_at() {
let mut pipe = Pipeline::new();
assert_eq!(pipe.last_committed_at(), None);
let t = now();
let _ = pipe.compile_batch(SEM_OK, t).expect("compile");
assert_eq!(pipe.last_committed_at(), Some(t));
}
#[test]
fn monotonic_commit_clock_bumps_past_regressing_wall_clock() {
let mut pipe = Pipeline::new();
let t1 = ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel");
let t_regressed = ClockTime::try_from_millis(1_713_350_300_000).expect("non-sentinel");
let first = pipe
.compile_batch(SEM_OK, t1)
.expect("first batch commits at t1");
let first_sem = first.iter().find_map(|r| match r {
CanonicalRecord::Sem(s) => Some(s),
_ => None,
});
assert_eq!(first_sem.expect("sem").clocks.committed_at, t1);
let second = pipe
.compile_batch(
"(sem @alice @likes @carol :src @observation :c 0.8 :v 2024-01-15)",
t_regressed,
)
.expect("second batch");
let second_sem = second.iter().find_map(|r| match r {
CanonicalRecord::Sem(s) => Some(s),
_ => None,
});
let expected = ClockTime::try_from_millis(t1.as_millis() + 1).expect("non-sentinel");
assert_eq!(second_sem.expect("sem").clocks.committed_at, expected);
assert_eq!(pipe.last_committed_at(), Some(expected));
}
#[test]
fn identical_wall_clock_across_batches_still_bumps_committed_at() {
let mut pipe = Pipeline::new();
let t = now();
let _ = pipe.compile_batch(SEM_OK, t).expect("first");
let second = pipe
.compile_batch(
"(sem @alice @likes @dave :src @observation :c 0.8 :v 2024-01-15)",
t,
)
.expect("second");
let second_sem = second.iter().find_map(|r| match r {
CanonicalRecord::Sem(s) => Some(s),
_ => None,
});
assert_eq!(
second_sem.expect("sem").clocks.committed_at.as_millis(),
t.as_millis() + 1
);
}
#[test]
fn failed_batch_does_not_advance_commit_watermark() {
let mut pipe = Pipeline::new();
let t1 = now();
let _ = pipe.compile_batch(SEM_OK, t1).expect("seed");
let watermark_before = pipe.last_committed_at();
let t2 = ClockTime::try_from_millis(t1.as_millis() + 10_000).expect("non-sentinel");
let err = pipe
.compile_batch(
"(sem @alice @knows @bob :src @registry :c 0.99 :v 2024-01-15)",
t2,
)
.expect_err("semantic reject");
assert!(matches!(err, PipelineError::Semantic(_)));
assert_eq!(pipe.last_committed_at(), watermark_before);
}
#[test]
fn monotonic_commit_clock_helper_returns_wall_clock_when_unset() {
let t = now();
assert_eq!(monotonic_commit_clock(t, None).expect("fresh"), t);
}
#[test]
fn monotonic_commit_clock_helper_bumps_when_wall_clock_not_ahead() {
let prev = ClockTime::try_from_millis(1_000_000).expect("non-sentinel");
let at = monotonic_commit_clock(prev, Some(prev)).expect("bump");
assert_eq!(at.as_millis(), 1_000_001);
let behind = ClockTime::try_from_millis(500_000).expect("non-sentinel");
let at = monotonic_commit_clock(behind, Some(prev)).expect("bump");
assert_eq!(at.as_millis(), 1_000_001);
}
#[test]
fn clock_exhaustion_returns_typed_error() {
let max_valid = ClockTime::try_from_millis(u64::MAX - 1).expect("non-sentinel");
let err = monotonic_commit_clock(max_valid, Some(max_valid))
.expect_err("must exhaust past MAX - 1");
let PipelineError::ClockExhausted { last_committed_at } = err else {
panic!("expected ClockExhausted, got {err:?}");
};
assert_eq!(last_committed_at, max_valid);
}
#[test]
fn semantic_future_validity_uses_wall_clock_not_monotonic_watermark() {
let mut pipe = Pipeline::new();
let seed_wall = ClockTime::try_from_millis(1_705_277_400_000).expect("non-sentinel");
let _ = pipe
.compile_batch(
"(sem @seed_a @seed_r @seed_b :src @observation :c 0.8 :v 2024-01-14)",
seed_wall,
)
.expect("seed");
let watermark = pipe.last_committed_at().expect("set");
let regressed_wall = ClockTime::try_from_millis(1_705_276_800_000).expect("non-sentinel");
assert!(regressed_wall < watermark);
let err = pipe
.compile_batch(
"(sem @alice @knows @emma :src @observation :c 0.8 :v 2024-01-15T00:05:00Z)",
regressed_wall,
)
.expect_err("must reject future valid_at under regressed wall clock");
assert!(
matches!(
err,
PipelineError::Semantic(SemanticError::FutureValidity { .. })
),
"expected FutureValidity, got {err:?}"
);
let records = pipe
.compile_batch(
"(sem @alice @knows @emma :src @observation :c 0.8 :v 2024-01-14)",
regressed_wall,
)
.expect("past valid_at under regressed wall clock must succeed");
let sem = records.iter().find_map(|r| match r {
CanonicalRecord::Sem(s) => Some(s),
_ => None,
});
assert!(sem.expect("sem").clocks.committed_at > watermark);
}
fn sem_records(records: &[CanonicalRecord]) -> Vec<&SemRecord> {
records
.iter()
.filter_map(|r| match r {
CanonicalRecord::Sem(s) => Some(s),
_ => None,
})
.collect()
}
fn supersedes_edges(records: &[CanonicalRecord]) -> Vec<&EdgeRecord> {
records
.iter()
.filter_map(|r| match r {
CanonicalRecord::Supersedes(e) => Some(e),
_ => None,
})
.collect()
}
fn stale_parent_edges(records: &[CanonicalRecord]) -> Vec<&EdgeRecord> {
records
.iter()
.filter_map(|r| match r {
CanonicalRecord::StaleParent(e) => Some(e),
_ => None,
})
.collect()
}
#[test]
fn inf_against_current_parent_is_not_born_stale() {
let mut pipe = Pipeline::new();
pipe.compile_batch(SEM_OK, now()).expect("sem");
let inf_src = "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)";
let records = pipe
.compile_batch(inf_src, later_now())
.expect("inferential");
let inf = records
.iter()
.find_map(|r| match r {
CanonicalRecord::Inf(i) => Some(i),
_ => None,
})
.expect("inf emitted");
assert!(!inf.flags.stale, "parent still current — not born stale");
}
#[test]
fn supersession_emits_stale_parent_edges_to_dependent_inferentials() {
let mut pipe = Pipeline::new();
pipe.compile_batch(SEM_OK, now()).expect("first sem");
pipe.compile_batch(
"(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)",
later_now(),
)
.expect("inf");
let records = pipe
.compile_batch(
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-02-01)",
even_later_now(),
)
.expect("supersede");
let stales = stale_parent_edges(&records);
assert_eq!(
stales.len(),
1,
"one dependent Inferential should receive a StaleParent edge"
);
let inf_id = pipe.table().lookup("__mem_1").expect("inf id");
let old_parent = pipe.table().lookup("__mem_0").expect("parent id");
assert_eq!(stales[0].from, inf_id);
assert_eq!(stales[0].to, old_parent);
}
#[test]
fn inf_born_stale_when_parent_already_superseded() {
let mut pipe = Pipeline::new();
pipe.compile_batch(SEM_OK, now()).expect("first sem");
pipe.compile_batch(
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-02-01)",
later_now(),
)
.expect("supersede — __mem_0 is now superseded");
let records = pipe
.compile_batch(
"(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)",
even_later_now(),
)
.expect("inf");
let inf = records
.iter()
.find_map(|r| match r {
CanonicalRecord::Inf(i) => Some(i),
_ => None,
})
.expect("inf");
assert!(
inf.flags.stale,
"Inferential born from already-superseded parent must carry stale=true"
);
}
fn later_now() -> ClockTime {
ClockTime::try_from_millis(1_713_350_400_000 + 1_000).expect("non-sentinel")
}
fn even_later_now() -> ClockTime {
ClockTime::try_from_millis(1_713_350_400_000 + 2_000).expect("non-sentinel")
}
#[test]
fn sem_with_fresh_sp_does_not_emit_supersedes_edge() {
let mut pipe = Pipeline::new();
let records = pipe.compile_batch(SEM_OK, now()).expect("first");
assert!(
supersedes_edges(&records).is_empty(),
"first write at (s, p) has nothing to supersede"
);
assert_eq!(pipe.dag().len(), 0);
}
#[test]
fn forward_sem_emits_supersedes_edge_and_updates_index() {
let mut pipe = Pipeline::new();
let first = pipe.compile_batch(SEM_OK, now()).expect("first");
let first_mem = sem_records(&first)[0].memory_id;
let second_input = "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)";
let second = pipe.compile_batch(second_input, now()).expect("second");
let sems = sem_records(&second);
assert_eq!(sems.len(), 1);
let second_mem = sems[0].memory_id;
assert_eq!(sems[0].clocks.invalid_at, None);
let edges = supersedes_edges(&second);
assert_eq!(edges.len(), 1, "exactly one Supersedes edge");
assert_eq!(edges[0].from, second_mem);
assert_eq!(edges[0].to, first_mem);
assert_eq!(pipe.dag().len(), 1);
}
#[test]
fn retroactive_sem_sets_invalid_at_and_preserves_existing_as_current() {
let mut pipe = Pipeline::new();
let first_input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-03-01)";
let first = pipe.compile_batch(first_input, now()).expect("first");
let first_mem = sem_records(&first)[0].memory_id;
let first_valid_at = sem_records(&first)[0].clocks.valid_at;
let retro_input = "(sem @alice @knows @zoe :src @observation :c 0.8 :v 2024-01-15)";
let retro = pipe.compile_batch(retro_input, now()).expect("retro");
let sems = sem_records(&retro);
let retro_mem = sems[0].memory_id;
assert_eq!(
sems[0].clocks.invalid_at,
Some(first_valid_at),
"retroactive new memory's invalid_at closes at old's valid_at"
);
let edges = supersedes_edges(&retro);
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].from, retro_mem);
assert_eq!(edges[0].to, first_mem);
let third_input = "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-04-01)";
let third = pipe.compile_batch(third_input, now()).expect("third");
let third_mem = sem_records(&third)[0].memory_id;
let third_edges = supersedes_edges(&third);
assert_eq!(third_edges.len(), 1);
assert_eq!(third_edges[0].from, third_mem);
assert_eq!(
third_edges[0].to, first_mem,
"forward supersession targets the most-recent-valid_at memory, not the retroactive one"
);
}
#[test]
fn equal_valid_at_at_same_sp_returns_supersession_conflict() {
let mut pipe = Pipeline::new();
let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
let err = pipe
.compile_batch(SEM_OK, now())
.expect_err("equal valid_at conflicts");
assert!(
matches!(
err,
PipelineError::Emit(EmitError::SemanticSupersessionConflict { .. })
),
"expected SemanticSupersessionConflict, got {err:?}"
);
}
#[test]
fn disjoint_sp_pairs_do_not_supersede_each_other() {
let mut pipe = Pipeline::new();
let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
let other = pipe
.compile_batch(
"(sem @alice @likes @bob :src @observation :c 0.8 :v 2024-01-15)",
now(),
)
.expect("disjoint");
assert!(supersedes_edges(&other).is_empty());
assert_eq!(pipe.dag().len(), 0);
}
#[test]
fn forward_chain_produces_edge_per_link() {
let mut pipe = Pipeline::new();
let vs = ["2024-01-15", "2024-02-15", "2024-03-15", "2024-04-15"];
for v in vs {
let input = format!("(sem @alice @knows @bob :src @observation :c 0.8 :v {v})");
let _ = pipe.compile_batch(&input, now()).expect("compile");
}
assert_eq!(pipe.dag().len(), 3);
}
#[test]
fn failed_batch_does_not_leak_edge_or_index_mutation() {
let mut pipe = Pipeline::new();
let _ = pipe.compile_batch(SEM_OK, now()).expect("seed");
assert_eq!(pipe.dag().len(), 0, "seed did not supersede");
let two_forms = "\
(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)\n\
(sem @alice @knows @dan :src @observation :c 0.7 :v 2024-03-01)";
let err = pipe
.compile_batch(two_forms, now())
.expect_err("emit conflict");
assert!(
matches!(
err,
PipelineError::Emit(EmitError::SemanticSupersessionConflict { .. })
),
"expected SemanticSupersessionConflict from form 2, got {err:?}"
);
assert_eq!(pipe.dag().len(), 0, "failed batch must not leak edge");
let clean = pipe
.compile_batch(
"(sem @alice @knows @eve :src @observation :c 0.8 :v 2024-03-01)",
now(),
)
.expect("clean follow-up");
assert_eq!(pipe.dag().len(), 1);
let seed_memory = pipe.table.lookup("__mem_0").expect("seed mem alloc");
assert_eq!(
pipe.dag().edges()[0].to,
seed_memory,
"post-rollback commit still sees SEED as predecessor"
);
assert_eq!(sem_records(&clean)[0].memory_id, pipe.dag().edges()[0].from);
}
const PRO_OK: &str = r#"(pro @rule_route "agent_write" "route_via_librarian"
:scp @mimir :src @policy :c 1.0)"#;
fn pro_records(records: &[CanonicalRecord]) -> Vec<&ProRecord> {
records
.iter()
.filter_map(|r| match r {
CanonicalRecord::Pro(p) => Some(p),
_ => None,
})
.collect()
}
#[test]
fn pro_fresh_rule_does_not_supersede() {
let mut pipe = Pipeline::new();
let records = pipe.compile_batch(PRO_OK, now()).expect("first pro");
assert_eq!(pro_records(&records).len(), 1);
assert!(supersedes_edges(&records).is_empty());
assert_eq!(pipe.dag().len(), 0);
}
#[test]
fn pro_same_rule_id_triggers_supersession() {
let mut pipe = Pipeline::new();
let first = pipe.compile_batch(PRO_OK, now()).expect("first");
let first_mem = pro_records(&first)[0].memory_id;
let second_input = r#"(pro @rule_route "other_trigger" "other_action"
:scp @other_scope :src @policy :c 0.9)"#;
let second = pipe.compile_batch(second_input, now()).expect("second");
let second_mem = pro_records(&second)[0].memory_id;
let edges = supersedes_edges(&second);
assert_eq!(edges.len(), 1, "same rule_id → one Supersedes edge");
assert_eq!(edges[0].from, second_mem);
assert_eq!(edges[0].to, first_mem);
assert_eq!(pipe.dag().len(), 1);
}
#[test]
fn pro_same_trigger_scope_triggers_supersession() {
let mut pipe = Pipeline::new();
let first = pipe.compile_batch(PRO_OK, now()).expect("first");
let first_mem = pro_records(&first)[0].memory_id;
let second_input = r#"(pro @rule_other "agent_write" "different_action"
:scp @mimir :src @policy :c 0.9)"#;
let second = pipe.compile_batch(second_input, now()).expect("second");
let second_mem = pro_records(&second)[0].memory_id;
let edges = supersedes_edges(&second);
assert_eq!(edges.len(), 1, "same (trigger, scope) → one edge");
assert_eq!(edges[0].from, second_mem);
assert_eq!(edges[0].to, first_mem);
}
#[test]
fn pro_dual_key_match_supersedes_both_distinct_olds() {
let mut pipe = Pipeline::new();
let old1 = pipe
.compile_batch(
r#"(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)"#,
now(),
)
.expect("old1");
let old1_mem = pro_records(&old1)[0].memory_id;
let old2 = pipe
.compile_batch(
r#"(pro @rule_b "t_b" "act_b" :scp @scope_b :src @policy :c 1.0)"#,
now(),
)
.expect("old2");
let old2_mem = pro_records(&old2)[0].memory_id;
let new_input = r#"(pro @rule_a "t_b" "act_new" :scp @scope_b :src @policy :c 1.0)"#;
let new = pipe.compile_batch(new_input, now()).expect("new");
let new_mem = pro_records(&new)[0].memory_id;
let edges = supersedes_edges(&new);
assert_eq!(edges.len(), 2, "dual-key match → two Supersedes edges");
let targets: std::collections::BTreeSet<_> = edges.iter().map(|e| e.to).collect();
assert!(targets.contains(&old1_mem));
assert!(targets.contains(&old2_mem));
for e in &edges {
assert_eq!(e.from, new_mem);
}
}
#[test]
fn pro_duplicate_cross_batch_commit_emits_one_edge() {
let mut pipe = Pipeline::new();
let _ = pipe.compile_batch(PRO_OK, now()).expect("seed");
let second = pipe.compile_batch(PRO_OK, now()).expect("same again");
let edges = supersedes_edges(&second);
assert_eq!(edges.len(), 1, "same memory matched twice → one edge");
}
#[test]
fn pro_intra_batch_same_rule_id_is_rejected() {
let mut pipe = Pipeline::new();
let two_forms = r#"
(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)
(pro @rule_a "t_b" "act_b" :scp @scope_b :src @policy :c 1.0)
"#;
let err = pipe
.compile_batch(two_forms, now())
.expect_err("intra-batch rule_id conflict");
assert!(
matches!(
err,
PipelineError::Emit(EmitError::ProceduralSupersessionConflict { .. })
),
"expected ProceduralSupersessionConflict, got {err:?}"
);
assert_eq!(pipe.dag().len(), 0);
}
#[test]
fn pro_intra_batch_same_trigger_scope_is_rejected() {
let mut pipe = Pipeline::new();
let two_forms = r#"
(pro @rule_a "shared_t" "act_a" :scp @shared_scope :src @policy :c 1.0)
(pro @rule_b "shared_t" "act_b" :scp @shared_scope :src @policy :c 1.0)
"#;
let err = pipe
.compile_batch(two_forms, now())
.expect_err("intra-batch (trigger, scope) conflict");
assert!(matches!(
err,
PipelineError::Emit(EmitError::ProceduralSupersessionConflict { .. })
));
assert_eq!(pipe.dag().len(), 0);
}
#[test]
fn pro_supersession_clears_old_from_both_keys() {
let mut pipe = Pipeline::new();
let old = pipe
.compile_batch(
r#"(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)"#,
now(),
)
.expect("old");
let old_mem = pro_records(&old)[0].memory_id;
let _ = pipe
.compile_batch(
r#"(pro @rule_a "different" "new_act" :scp @different :src @policy :c 1.0)"#,
now(),
)
.expect("super by rule");
let third = pipe
.compile_batch(
r#"(pro @rule_fresh "t_a" "act_x" :scp @scope_a :src @policy :c 1.0)"#,
now(),
)
.expect("third");
for edge in supersedes_edges(&third) {
assert_ne!(
edge.to, old_mem,
"already-superseded OLD must not be superseded again"
);
}
}
}