mimir_core/pipeline.rs
1//! Librarian pipeline — end-to-end write path per
2//! `docs/concepts/librarian-pipeline.md`.
3//!
4//! The pipeline compiles agent-emitted Lisp S-expression input into
5//! canonical bytecode records by chaining the five stages:
6//!
7//! ```text
8//! &str ─► [Parse] ─► [Bind] ─► [Semantic] ─► [Emit] ─► Vec<CanonicalRecord>
9//! ```
10//!
11//! (Lex is internal to `parse::parse`, so the public entry points are
12//! unified into a single `Pipeline::compile_batch` call.)
13//!
14//! Per invariant § 11.3 (batch atomicity), a batch either commits in
15//! full or leaves no trace. `Pipeline` holds the workspace's live
16//! `SymbolTable` and a monotonic memory-ID counter; both are cloned at
17//! the start of each batch and swapped back only on full-stage success.
18//!
19//! The pipeline emits:
20//!
21//! - The four memory record kinds (`Sem`, `Epi`, `Pro`, `Inf`).
22//! - `SymbolAlloc` / `SymbolRename` / `SymbolAlias` / `SymbolRetire`
23//! derived from the `bind` mutation journal (spec § 3.4). Symbol
24//! events precede the memory records in the output so log replay
25//! sees allocations before the memory records that reference them.
26//!
27//! Unsupported forms (tracked under the matching spec):
28//!
29//! - `correct` and `promote` return `EmitError::Unsupported` pending
30//! the correction + ephemeral-promotion work on the
31//! `temporal-model` / `episode-semantics` tracks.
32//! - `query` returns `EmitError::Unsupported` pending the
33//! `read-protocol` track.
34//! - ML-callout interface (spec § 6) is not wired; v1 is noop-ML.
35
36use thiserror::Error;
37
38use std::collections::{BTreeMap, BTreeSet};
39
40use crate::bind::{self, BindError, SymbolMutation, SymbolTable};
41use crate::canonical::{
42 CanonicalRecord, Clocks, EdgeRecord, EpiRecord, InfFlags, InfRecord, ProRecord, SemFlags,
43 SemRecord, SymbolEventRecord,
44};
45use crate::clock::ClockTime;
46use crate::dag::{Edge as DagEdge, EdgeKind, SupersessionDag};
47use crate::parse::{self, ParseError};
48use crate::semantic::{self, SemanticError, ValidatedForm};
49use crate::symbol::{SymbolId, SymbolKind};
50
51/// The librarian pipeline — single-writer compiler from Lisp S-expression
52/// input to canonical bytecode.
53///
54/// Holds the workspace's symbol table, memory-ID counter, monotonic
55/// `committed_at` watermark, supersession DAG, and current-state
56/// supersession index; mutations to all five are batch-atomic
57/// (invariant § 11.3, plus `temporal-model.md` § 12 #1 and § 6.2 #1).
58///
59/// Does not derive `Eq` because the stored record vectors carry
60/// `Value`, which contains `f64` and therefore is only `PartialEq`.
61#[derive(Clone, Debug, Default, PartialEq)]
62pub struct Pipeline {
63 table: SymbolTable,
64 next_memory_counter: u64,
65 /// Highest `committed_at` assigned by this pipeline so far, or
66 /// `None` if no batch has committed yet. Per `temporal-model.md`
67 /// § 9.2, the next commit clock is `max(wall_now, self + 1)`.
68 last_committed_at: Option<ClockTime>,
69 /// Workspace-scoped supersession graph. Extended at emit time by
70 /// auto-supersession writes (temporal-model.md § 5).
71 dag: SupersessionDag,
72 /// Current-state index for supersession detection (§ 5).
73 supersession_index: SupersessionIndex,
74 /// Every Semantic memory ever emitted (or replayed from the log),
75 /// in commit order. Feeds the as-of query resolver
76 /// (`temporal-model.md` § 7).
77 semantic_records: Vec<SemRecord>,
78 /// Secondary index over `semantic_records`: `(s, p) → indices`.
79 /// Per `read-protocol.md` § 3.1 the current-state index makes
80 /// single-predicate Semantic lookups O(k) in the size of the
81 /// `(s, p)` history (typically 1–3) instead of O(n) in the
82 /// whole store. The resolver consults this index when `:s` and
83 /// `:p` are both pinned; other paths still scan.
84 semantic_by_sp_history: BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
85 /// Every Episodic memory ever emitted or replayed, in commit
86 /// order. Episodic records do not currently participate in
87 /// supersession, but retaining them lets downstream tooling
88 /// perform exact duplicate checks and future event-oriented reads.
89 episodic_records: Vec<EpiRecord>,
90 /// Every Procedural memory ever emitted or replayed, in commit
91 /// order. Same contract as `semantic_records`.
92 procedural_records: Vec<ProRecord>,
93 /// Secondary index over `procedural_records`: `rule_id → indices`.
94 /// Powers O(k) lookup for `:kind pro` reads once the `rule_id`
95 /// read predicate is wired; for now the resolver iterates the
96 /// full vec from the `resolve_procedural` entry point.
97 procedural_by_rule_history: BTreeMap<SymbolId, Vec<usize>>,
98 /// Every Inferential memory ever emitted or replayed, in commit
99 /// order. Feeds the Inferential resolver per `temporal-model.md`
100 /// § 5.4 + `read-protocol.md` § 3.1 (Inf keyed by `(s, p)` like
101 /// Sem; re-derivation with same `(s, p)` + later `valid_at`
102 /// supersedes the prior).
103 inferential_records: Vec<InfRecord>,
104 /// Secondary index over `inferential_records`: `(s, p) → indices`.
105 /// Mirrors `semantic_by_sp_history`; lets the resolver walk only
106 /// the records at a specific `(s, p)` on a pinned-subject-and-
107 /// predicate read.
108 inferential_by_sp_history: BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
109 /// Decay parameters used by the read path to compute effective
110 /// confidence (`confidence-decay.md`). Defaults to the librarian's
111 /// v1 parameter table; `Store::open` will eventually load an
112 /// `mimir.toml` override and call [`Pipeline::set_decay_config`].
113 decay_config: crate::decay::DecayConfig,
114 /// Committed-at clock for each Episode the pipeline has seen.
115 /// Populated by [`Pipeline::register_episode`] — `Store` calls
116 /// this after a successful `commit_batch` and during log replay
117 /// when a `Checkpoint` record is seen. Used by the read path's
118 /// `:in_episode` / `:after_episode` / `:before_episode`
119 /// predicates (`read-protocol.md` § 4.1).
120 episode_committed_at: BTreeMap<SymbolId, ClockTime>,
121 /// Parent Episode for each Episode that declared one via
122 /// `(episode :start :parent_episode @E)`. Populated by
123 /// [`Pipeline::register_episode_parent`]. Backs the
124 /// `:episode_chain @E` read predicate
125 /// (`read-protocol.md` § 4.1 / `episode-semantics.md` § 5.1).
126 episode_parent: BTreeMap<SymbolId, SymbolId>,
127 /// Metadata captured from an `(episode :start ...)` form during
128 /// the most recent `compile_batch`. Consumed by the store at
129 /// commit time — see [`Pipeline::take_pending_episode_metadata`].
130 pending_episode_metadata: Option<PendingEpisodeMetadata>,
131 /// Memories currently pinned (`confidence-decay.md` § 7).
132 /// Suspends decay at read time via `DecayFlags::pinned`.
133 pinned_memories: BTreeSet<SymbolId>,
134 /// Memories currently flagged operator-authoritative
135 /// (`confidence-decay.md` § 8). Also suspends decay; populates
136 /// `Framing::Authoritative { set_by: OperatorAuthoritative }`.
137 authoritative_memories: BTreeSet<SymbolId>,
138 /// Reverse parent index for Inferential memories: maps a parent
139 /// memory's `SymbolId` to the list of Inferentials that derived
140 /// from it (`temporal-model.md` § 5.4). Populated by
141 /// [`replay_memory_record`] on Inferential records and by emit.
142 /// Used when a Semantic or Procedural parent gets auto-
143 /// superseded — the index tells us which Inferentials to attach
144 /// `StaleParent` edges to. O(log n) lookup rather than scanning
145 /// `semantic_records` / `procedural_records` for every
146 /// supersession.
147 inferentials_by_parent: BTreeMap<SymbolId, Vec<SymbolId>>,
148}
149
150/// Episode metadata captured from an `(episode :start …)` form in
151/// the write surface. Flows out of `compile_batch` via
152/// [`Pipeline::take_pending_episode_metadata`] so the store layer
153/// can attach it to the Episode it's about to emit.
154///
155/// An `(episode :close)` form carries no metadata; the pipeline
156/// still produces `Some(default)` to signal "batch carried an
157/// explicit Episode directive" separately from the no-directive
158/// case.
159#[derive(Clone, Debug, Default, PartialEq, Eq)]
160pub struct PendingEpisodeMetadata {
161 /// Label from `:label`.
162 pub label: Option<String>,
163 /// Parent from `:parent_episode`.
164 pub parent_episode: Option<SymbolId>,
165 /// Retracted Episodes from `:retracts (@E1 …)`.
166 pub retracts: Vec<SymbolId>,
167}
168
169/// Current-state index used by auto-supersession detection.
170///
171/// For each memory-type supersession key (per `temporal-model.md` § 5),
172/// tracks the currently-authoritative memory so new writes can look up
173/// their predecessor in O(log n). The index is rebuilt from the log at
174/// `Store::open`.
175///
176/// Scope: Semantic (§ 5.1), Procedural (§ 5.2), and Inferential
177/// (§ 5.4) supersession on re-derivation. Episodic is out of scope
178/// (no auto-supersession per § 5.3).
179#[derive(Clone, Debug, Default, PartialEq, Eq)]
180struct SupersessionIndex {
181 /// `(subject, predicate) -> (memory_id, valid_at)` for the current
182 /// Semantic memory at that `(s, p)` — `None` if no live memory.
183 semantic_by_sp: BTreeMap<(SymbolId, SymbolId), CurrentSemantic>,
184 /// `(subject, predicate) -> (memory_id, valid_at)` for the current
185 /// Inferential memory at that `(s, p)` per `temporal-model.md`
186 /// § 5.4 (Inf re-derivation supersession mirrors Sem § 5.1). Reuses
187 /// [`CurrentSemantic`] shape — identical `(memory_id, valid_at)`
188 /// record — to avoid a duplicate struct for the same data.
189 inferential_by_sp: BTreeMap<(SymbolId, SymbolId), CurrentSemantic>,
190 /// `rule_id -> (memory_id, committed_at)` for current Procedural
191 /// memories keyed by rule identifier (§ 5.2 primary key).
192 procedural_by_rule: BTreeMap<SymbolId, CurrentProcedural>,
193 /// `(canonical(trigger), scope) -> (memory_id, committed_at)` for
194 /// current Procedural memories keyed by the `(trigger, scope)`
195 /// pair (§ 5.2 secondary key). `Value` has no `Ord` impl because
196 /// of `f64`, so the trigger is keyed by its canonical-byte
197 /// encoding (stable within a process; not persisted).
198 procedural_by_trigger_scope: BTreeMap<(Vec<u8>, SymbolId), CurrentProcedural>,
199 /// Reverse index from `memory_id` back to the two Procedural
200 /// index keys it occupies. Used during supersession to clear out
201 /// the OTHER key of a memory that was matched via only one key
202 /// (spec § 5.2 invalidates on either key matching).
203 procedural_keys_by_memory: BTreeMap<SymbolId, ProceduralKeys>,
204}
205
206/// Index entry for a currently-authoritative Semantic memory.
207#[derive(Copy, Clone, Debug, PartialEq, Eq)]
208struct CurrentSemantic {
209 memory_id: SymbolId,
210 valid_at: ClockTime,
211}
212
213/// Safety cap on Episode-chain traversal. Parent cycles cannot
214/// form via the write path (parent must already be committed before
215/// a child declares it) but replay of a corrupted log could present
216/// one; the cap keeps `episode_chain` bounded in pathological cases.
217pub const MAX_EPISODE_CHAIN_DEPTH: usize = 1024;
218
219/// The two index keys a Procedural memory occupies.
220#[derive(Clone, Debug, PartialEq, Eq)]
221struct ProceduralKeys {
222 rule_id: SymbolId,
223 trigger_scope: (Vec<u8>, SymbolId),
224}
225
226/// Index entry for a currently-authoritative Procedural memory.
227/// Tracks `committed_at` so the emit path can detect intra-batch
228/// conflicts — two Pro writes in the same batch with overlapping
229/// supersession keys share `committed_at`, producing a
230/// zero-duration "supersession" that's almost certainly an agent
231/// bug.
232#[derive(Copy, Clone, Debug, PartialEq, Eq)]
233struct CurrentProcedural {
234 memory_id: SymbolId,
235 committed_at: ClockTime,
236}
237
238impl Pipeline {
239 /// Construct a pipeline with an empty symbol table.
240 #[must_use]
241 pub fn new() -> Self {
242 Self::default()
243 }
244
245 /// Read-only view of the workspace symbol table.
246 #[must_use]
247 pub fn table(&self) -> &SymbolTable {
248 &self.table
249 }
250
251 /// Read-only view of the decay parameters used by the read path.
252 /// Defaults to [`crate::decay::DecayConfig::librarian_defaults`];
253 /// the store layer installs per-workspace overrides when an
254 /// `mimir.toml` is present.
255 #[must_use]
256 pub fn decay_config(&self) -> &crate::decay::DecayConfig {
257 &self.decay_config
258 }
259
260 /// Replace the decay parameters used by the read path. Intended
261 /// for the store layer to wire `mimir.toml` overrides in at
262 /// `Store::open`; tests may also call this directly.
263 pub fn set_decay_config(&mut self, cfg: crate::decay::DecayConfig) {
264 self.decay_config = cfg;
265 }
266
267 /// Replay a `SYMBOL_ALLOC` record into the pipeline's symbol table.
268 /// Thin pass-through to [`SymbolTable::replay_allocate`]; exposed
269 /// for [`Store::open`](crate::store::Store::open).
270 ///
271 /// # Errors
272 ///
273 /// Propagates [`BindError`] variants from the underlying
274 /// `SymbolTable::replay_allocate` call.
275 pub fn replay_allocate(
276 &mut self,
277 id: SymbolId,
278 name: String,
279 kind: SymbolKind,
280 ) -> Result<(), BindError> {
281 self.table.replay_allocate(id, name, kind)
282 }
283
284 /// Replay a `SYMBOL_ALIAS` record.
285 ///
286 /// # Errors
287 ///
288 /// Propagates [`BindError`] variants from the underlying
289 /// `SymbolTable::replay_alias` call.
290 pub fn replay_alias(&mut self, id: SymbolId, alias: String) -> Result<(), BindError> {
291 self.table.replay_alias(id, alias)
292 }
293
294 /// Replay a `SYMBOL_RENAME` record.
295 ///
296 /// # Errors
297 ///
298 /// Propagates [`BindError`] variants from the underlying
299 /// `SymbolTable::replay_rename` call.
300 pub fn replay_rename(&mut self, id: SymbolId, new_canonical: String) -> Result<(), BindError> {
301 self.table.replay_rename(id, new_canonical)
302 }
303
304 /// Replay a `SYMBOL_RETIRE` record.
305 ///
306 /// # Errors
307 ///
308 /// Propagates [`BindError`] variants from the underlying
309 /// `SymbolTable::replay_retire` call.
310 pub fn replay_retire(&mut self, id: SymbolId, name: String) -> Result<(), BindError> {
311 self.table.replay_retire(id, name)
312 }
313
314 /// Set the pipeline's memory-ID counter. Used by
315 /// [`Store::open`](crate::store::Store::open) to restore the
316 /// counter from durable state.
317 pub fn set_next_memory_counter(&mut self, counter: u64) {
318 self.next_memory_counter = counter;
319 }
320
321 /// Read the pipeline's memory-ID counter — the value that the
322 /// next `allocate_memory_id` call will synthesise into
323 /// `__mem_{n}`. Exposed for tests and for inspection tooling.
324 #[must_use]
325 pub fn next_memory_counter(&self) -> u64 {
326 self.next_memory_counter
327 }
328
329 /// The highest `committed_at` this pipeline has assigned, or
330 /// `None` before the first successful batch. Per `temporal-model.md`
331 /// § 9.2 / § 12 invariant #1, every successful commit must exceed
332 /// this value; the pipeline enforces that internally via
333 /// `max(wall_now, last_committed_at + 1)`.
334 #[must_use]
335 pub fn last_committed_at(&self) -> Option<ClockTime> {
336 self.last_committed_at
337 }
338
339 /// Advance the pipeline's monotonic commit watermark to `at`.
340 ///
341 /// Used by [`Store::open`](crate::store::Store) during replay to
342 /// restore the watermark from the highest `committed_at` durably
343 /// recorded in the log. No-op if `at <= last_committed_at`.
344 pub fn advance_last_committed_at(&mut self, at: ClockTime) {
345 if self.last_committed_at.is_none_or(|prev| at > prev) {
346 self.last_committed_at = Some(at);
347 }
348 }
349
350 /// Read-only view of the supersession DAG.
351 #[must_use]
352 pub fn dag(&self) -> &SupersessionDag {
353 &self.dag
354 }
355
356 /// Every Semantic memory this pipeline has committed or replayed,
357 /// in commit order. Consumed by the as-of resolver
358 /// (`crate::resolver`) per `temporal-model.md` § 7.
359 #[must_use]
360 pub fn semantic_records(&self) -> &[SemRecord] {
361 &self.semantic_records
362 }
363
364 /// Every Episodic memory this pipeline has committed or replayed,
365 /// in commit order.
366 #[must_use]
367 pub fn episodic_records(&self) -> &[EpiRecord] {
368 &self.episodic_records
369 }
370
371 /// Every Procedural memory this pipeline has committed or
372 /// replayed, in commit order.
373 #[must_use]
374 pub fn procedural_records(&self) -> &[ProRecord] {
375 &self.procedural_records
376 }
377
378 /// Indices in [`Pipeline::semantic_records`] of every Semantic
379 /// record ever emitted at `(s, p)`. Returns an empty slice when
380 /// the pair has no history. O(log n) lookup; the resolver uses
381 /// this to avoid scanning the full record history.
382 #[must_use]
383 pub fn semantic_history_at(&self, s: SymbolId, p: SymbolId) -> &[usize] {
384 self.semantic_by_sp_history
385 .get(&(s, p))
386 .map_or(&[], Vec::as_slice)
387 }
388
389 /// Indices in [`Pipeline::procedural_records`] of every
390 /// Procedural record ever emitted under `rule_id`. Returns an
391 /// empty slice when the rule has no history.
392 #[must_use]
393 pub fn procedural_history_for(&self, rule_id: SymbolId) -> &[usize] {
394 self.procedural_by_rule_history
395 .get(&rule_id)
396 .map_or(&[], Vec::as_slice)
397 }
398
399 /// Every Inferential memory this pipeline has committed or
400 /// replayed, in commit order. Consumed by the Inferential resolver
401 /// (`crate::resolver::resolve_inferential`) per `temporal-model.md`
402 /// § 5.4 — Inf is keyed by `(s, p)` like Sem, with re-derivation
403 /// as the auto-supersession trigger.
404 #[must_use]
405 pub fn inferential_records(&self) -> &[InfRecord] {
406 &self.inferential_records
407 }
408
409 /// Indices in [`Pipeline::inferential_records`] of every
410 /// Inferential record ever emitted at `(s, p)`. Returns an empty
411 /// slice when the pair has no history. Mirrors
412 /// [`Pipeline::semantic_history_at`].
413 #[must_use]
414 pub fn inferential_history_at(&self, s: SymbolId, p: SymbolId) -> &[usize] {
415 self.inferential_by_sp_history
416 .get(&(s, p))
417 .map_or(&[], Vec::as_slice)
418 }
419
420 /// Register that Episode `episode_id` committed at `at`. Called
421 /// by [`crate::store::Store`] after a successful commit and
422 /// during log replay for every `Checkpoint` record. The mapping
423 /// backs the read path's Episode-scoped predicates per
424 /// `read-protocol.md` § 4.1 — an `(s, p)` → `(episode_id, at)`
425 /// mapping lets `:in_episode @E` resolve by comparing a
426 /// candidate's `committed_at` against `@E`'s registered clock.
427 ///
428 /// Redundant registrations (same id + same clock) are a no-op;
429 /// id collisions with a different clock are ignored on the
430 /// assumption that replay walks records in order and the first
431 /// registration wins.
432 pub fn register_episode(&mut self, episode_id: SymbolId, at: ClockTime) {
433 self.episode_committed_at.entry(episode_id).or_insert(at);
434 }
435
436 /// The `committed_at` clock Episode `episode_id` was committed at,
437 /// or `None` if the pipeline has not seen that Episode. See
438 /// [`Pipeline::register_episode`].
439 #[must_use]
440 pub fn episode_committed_at(&self, episode_id: SymbolId) -> Option<ClockTime> {
441 self.episode_committed_at.get(&episode_id).copied()
442 }
443
444 /// Iterate every Episode the pipeline has registered as
445 /// `(episode_id, committed_at)` pairs.
446 ///
447 /// Iteration order is **unspecified** (the underlying storage is a
448 /// `HashMap`); callers that need a stable order — e.g. paginated
449 /// listing for `mimir-mcp::mimir_list_episodes` — must collect
450 /// and sort. Sorting by `committed_at` is the canonical UI choice
451 /// since it matches the durability-order on disk.
452 pub fn iter_episodes(&self) -> impl Iterator<Item = (SymbolId, ClockTime)> + '_ {
453 self.episode_committed_at.iter().map(|(id, at)| (*id, *at))
454 }
455
456 /// Record that Episode `child` has `parent` as its parent Episode.
457 /// Called by the store after a batch with `(episode :start
458 /// :parent_episode @E)` metadata, and during replay when an
459 /// `EpisodeMeta` record carries `parent_episode_id`. Idempotent
460 /// on duplicate calls with the same parent; a conflicting parent
461 /// on an Episode already registered is ignored (first-write wins,
462 /// matching replay's append-only semantics).
463 pub fn register_episode_parent(&mut self, child: SymbolId, parent: SymbolId) {
464 self.episode_parent.entry(child).or_insert(parent);
465 }
466
467 /// The parent Episode of `episode_id`, or `None` if the Episode
468 /// has no parent (or the pipeline has not seen it).
469 #[must_use]
470 pub fn episode_parent(&self, episode_id: SymbolId) -> Option<SymbolId> {
471 self.episode_parent.get(&episode_id).copied()
472 }
473
474 /// Take the metadata captured from the most recent
475 /// `compile_batch`'s `(episode :start …)` form, if any. Clears
476 /// the pending slot so a subsequent batch without a directive
477 /// doesn't reuse stale metadata.
478 pub fn take_pending_episode_metadata(&mut self) -> Option<PendingEpisodeMetadata> {
479 self.pending_episode_metadata.take()
480 }
481
482 /// `true` if `memory_id` is currently pinned
483 /// (`confidence-decay.md` § 7). Pinned memories skip decay.
484 #[must_use]
485 pub fn is_pinned(&self, memory_id: SymbolId) -> bool {
486 self.pinned_memories.contains(&memory_id)
487 }
488
489 /// `true` if `memory_id` is currently flagged
490 /// operator-authoritative (`confidence-decay.md` § 8).
491 /// Authoritative memories skip decay.
492 #[must_use]
493 pub fn is_authoritative(&self, memory_id: SymbolId) -> bool {
494 self.authoritative_memories.contains(&memory_id)
495 }
496
497 /// Replay a `Pin` / `Unpin` / `AuthoritativeSet` /
498 /// `AuthoritativeClear` flag event from the canonical log.
499 /// Called by [`crate::store::Store::open`] during recovery so
500 /// the pipeline's pin / authoritative sets reflect the durable
501 /// state.
502 pub fn replay_flag(&mut self, record: &CanonicalRecord) {
503 match record {
504 CanonicalRecord::Pin(r) => {
505 self.pinned_memories.insert(r.memory_id);
506 }
507 CanonicalRecord::Unpin(r) => {
508 self.pinned_memories.remove(&r.memory_id);
509 }
510 CanonicalRecord::AuthoritativeSet(r) => {
511 self.authoritative_memories.insert(r.memory_id);
512 }
513 CanonicalRecord::AuthoritativeClear(r) => {
514 self.authoritative_memories.remove(&r.memory_id);
515 }
516 _ => {} // not a flag event — no-op
517 }
518 }
519
520 /// Walk the Episode chain from `episode_id` up through parents.
521 /// Yields `episode_id` first, then its parent, grandparent, and
522 /// so on. Bounded by [`MAX_EPISODE_CHAIN_DEPTH`] to guard
523 /// against pathological or corrupt parent cycles (the binder
524 /// rejects cycles at write time, but replay of a corrupted log
525 /// might still present one).
526 pub fn episode_chain(&self, episode_id: SymbolId) -> impl Iterator<Item = SymbolId> + '_ {
527 let mut current = Some(episode_id);
528 let mut depth = 0_usize;
529 std::iter::from_fn(move || {
530 if depth >= MAX_EPISODE_CHAIN_DEPTH {
531 return None;
532 }
533 let id = current?;
534 depth += 1;
535 current = self.episode_parent.get(&id).copied();
536 Some(id)
537 })
538 }
539
540 /// Replay one edge from the canonical log into the supersession
541 /// DAG. Called by [`Store::open`](crate::store::Store) during
542 /// recovery; the acyclicity check still runs, so a log with a
543 /// corruption-introduced cycle surfaces as an error rather than a
544 /// silent invariant violation.
545 ///
546 /// # Errors
547 ///
548 /// Propagates [`DagError`](crate::dag::DagError) variants from
549 /// `SupersessionDag::add_edge`.
550 pub fn replay_edge(&mut self, edge: crate::dag::Edge) -> Result<(), crate::dag::DagError> {
551 self.dag.add_edge(edge)
552 }
553
554 /// Replay a canonical record into the pipeline's
555 /// supersession-detection indices. Idempotent when records are
556 /// applied in log order; a replayed forward-supersession Sem
557 /// record replaces the prior `(s, p)` entry, a retroactive Sem
558 /// record does not; a Procedural record matching either
559 /// supersession key (§ 5.2) clears the prior entries and inserts
560 /// under both of its keys.
561 ///
562 /// Non-memory records are ignored; callers can pass every record
563 /// and let this method filter internally.
564 ///
565 /// Scope: Semantic + Procedural. Inferential staling (§ 5.4) is
566 /// tracked in issue #29.
567 pub fn replay_memory_record(&mut self, record: &CanonicalRecord) {
568 match record {
569 CanonicalRecord::Sem(sem) => {
570 let key = (sem.s, sem.p);
571 let sem_index = self.semantic_records.len();
572 self.semantic_records.push(sem.clone());
573 self.semantic_by_sp_history
574 .entry(key)
575 .or_default()
576 .push(sem_index);
577 let replace = self
578 .supersession_index
579 .semantic_by_sp
580 .get(&key)
581 .is_none_or(|existing| sem.clocks.valid_at > existing.valid_at);
582 if replace {
583 self.supersession_index.semantic_by_sp.insert(
584 key,
585 CurrentSemantic {
586 memory_id: sem.memory_id,
587 valid_at: sem.clocks.valid_at,
588 },
589 );
590 }
591 }
592 CanonicalRecord::Epi(epi) => {
593 self.episodic_records.push(epi.clone());
594 }
595 CanonicalRecord::Inf(inf) => {
596 // Keep the reverse parent index in sync on replay
597 // so post-open supersessions can emit StaleParent
598 // edges against already-committed Inferentials
599 // (temporal-model.md § 5.4).
600 for parent in &inf.derived_from {
601 self.inferentials_by_parent
602 .entry(*parent)
603 .or_default()
604 .push(inf.memory_id);
605 }
606 // Record + history for the resolver. Mirror Sem: push
607 // the record in commit order, record its index in the
608 // `(s, p)` history, and update the current-state
609 // supersession index so post-open writes can auto-
610 // supersede. Replay skips the conflict check — the
611 // original write time rejected any such conflict and
612 // could not have reached the log.
613 let inf_index = self.inferential_records.len();
614 let key = (inf.s, inf.p);
615 let valid_at = inf.clocks.valid_at;
616 self.inferential_records.push(inf.clone());
617 self.inferential_by_sp_history
618 .entry(key)
619 .or_default()
620 .push(inf_index);
621 let replace = self
622 .supersession_index
623 .inferential_by_sp
624 .get(&key)
625 .is_none_or(|existing| valid_at > existing.valid_at);
626 if replace {
627 self.supersession_index.inferential_by_sp.insert(
628 key,
629 CurrentSemantic {
630 memory_id: inf.memory_id,
631 valid_at,
632 },
633 );
634 }
635 }
636 CanonicalRecord::Pro(pro) => {
637 let pro_index = self.procedural_records.len();
638 self.procedural_records.push(pro.clone());
639 self.procedural_by_rule_history
640 .entry(pro.rule_id)
641 .or_default()
642 .push(pro_index);
643 // Log replay path — skip the intra-batch-conflict
644 // check because any such conflict was already
645 // rejected at the original write time (and therefore
646 // couldn't have reached the log). Returned list of
647 // superseded memories is discarded: replay doesn't
648 // emit edges, the log already carries them as
649 // separate `Supersedes` records that `replay_edge`
650 // handles.
651 replay_procedural_supersession(
652 &mut self.supersession_index,
653 pro.memory_id,
654 pro.clocks.committed_at,
655 pro.rule_id,
656 &pro.trigger,
657 pro.scope,
658 );
659 }
660 _ => {}
661 }
662 }
663
664 /// Allocate a fresh `__ep_{counter}` symbol for use as a
665 /// `CHECKPOINT` record's `episode_id`. The synthesized name follows
666 /// the same reserved-prefix convention as `__mem_{n}` (see
667 /// `allocate_memory_id`'s collision-handling note). Used by the
668 /// [`Store`](crate::store::Store) commit path; exposed as `pub`
669 /// because `Store` composes `Pipeline` rather than inheriting from
670 /// it.
671 ///
672 /// # Errors
673 ///
674 /// - [`EmitError::MemoryIdAllocation`] if the synthesized name
675 /// collides with an existing symbol (same pathological-agent
676 /// case as memory-ID allocation).
677 pub fn allocate_episode_symbol(&mut self, counter: u64) -> Result<SymbolId, EmitError> {
678 let name = format!("__ep_{counter}");
679 self.table
680 .allocate(name.clone(), SymbolKind::Memory)
681 .map_err(|cause| EmitError::MemoryIdAllocation { name, cause })
682 }
683
684 /// Compile one batch of agent input into canonical records.
685 ///
686 /// `wall_now` is the librarian's host wall clock at the start of
687 /// this batch; injected for determinism (tests pass a fixed
688 /// `ClockTime`). The batch's `committed_at` is derived from
689 /// `wall_now` via the monotonic rule in `temporal-model.md` § 9.2:
690 /// `effective_now = max(wall_now, last_committed_at + 1)`. All
691 /// memory records within the batch share that same `effective_now`
692 /// as their `committed_at` and — for non-Episodic kinds — their
693 /// `observed_at`.
694 ///
695 /// Future-validity rejection (semantic stage) uses the raw
696 /// `wall_now`, not the monotonic watermark, so a transiently
697 /// inflated watermark cannot relax the "no future writes without
698 /// `:projected true`" rule.
699 ///
700 /// # Errors
701 ///
702 /// Any stage may return a [`PipelineError`]; on error the workspace
703 /// state (symbol table, memory counter, and commit watermark) is
704 /// untouched. [`PipelineError::ClockExhausted`] fires if the
705 /// monotonic bump would reach the reserved `u64::MAX` sentinel.
706 ///
707 /// # Example
708 ///
709 /// ```
710 /// # #![allow(clippy::unwrap_used)]
711 /// use mimir_core::pipeline::Pipeline;
712 /// use mimir_core::ClockTime;
713 ///
714 /// let mut pipe = Pipeline::new();
715 /// let now = ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel");
716 /// let input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
717 /// let records = pipe.compile_batch(input, now).unwrap();
718 /// // The batch emits a `Sem` memory record preceded by `SymbolAlloc`
719 /// // records for every first-use symbol name.
720 /// assert!(records.iter().any(|r| matches!(r, mimir_core::canonical::CanonicalRecord::Sem(_))));
721 /// ```
722 pub fn compile_batch(
723 &mut self,
724 input: &str,
725 wall_now: ClockTime,
726 ) -> Result<Vec<CanonicalRecord>, PipelineError> {
727 // observability.md: `mimir.pipeline.compile_batch` span. Fields
728 // are `Empty` until the emit stage finishes so the span still
729 // records timing on error paths (counts stay unset).
730 let span = tracing::info_span!(
731 "mimir.pipeline.compile_batch",
732 input_len = input.len(),
733 record_count = tracing::field::Empty,
734 memory_count = tracing::field::Empty,
735 edge_count = tracing::field::Empty,
736 );
737 let _enter = span.enter();
738
739 let forms = parse::parse(input).map_err(PipelineError::Parse)?;
740
741 // Compute the batch's `committed_at` before running stateful
742 // stages so a clock-exhaustion error cannot leave partial work.
743 let effective_now = monotonic_commit_clock(wall_now, self.last_committed_at)?;
744
745 // Clone live state so mid-batch failures cannot leak partial
746 // mutations. Every state field the emit stage may touch lands
747 // here so full-batch rollback is trivial — drop the working
748 // copies and leave `self` untouched.
749 let mut working_table = self.table.clone();
750 let mut working_counter = self.next_memory_counter;
751 let mut working_dag = self.dag.clone();
752 let mut working_index = self.supersession_index.clone();
753 let mut working_sem_records = self.semantic_records.clone();
754 let mut working_sem_by_sp = self.semantic_by_sp_history.clone();
755 let mut working_epi_records = self.episodic_records.clone();
756 let mut working_pro_records = self.procedural_records.clone();
757 let mut working_pro_by_rule = self.procedural_by_rule_history.clone();
758 let mut working_inf_records = self.inferential_records.clone();
759 let mut working_inf_by_sp = self.inferential_by_sp_history.clone();
760
761 let (bound, journal) =
762 bind::bind(forms, &mut working_table).map_err(PipelineError::Bind)?;
763
764 // Semantic validation (future-validity rejection) uses the raw
765 // wall clock, which is the librarian's estimate of "now" for
766 // judging agent clock skew — not the monotonic watermark, which
767 // may transiently lead the wall clock after a regression.
768 let validated =
769 semantic::validate(bound, &working_table, wall_now).map_err(PipelineError::Semantic)?;
770
771 let mut working_pending_meta: Option<PendingEpisodeMetadata> = None;
772 let mut working_pinned = self.pinned_memories.clone();
773 let mut working_authoritative = self.authoritative_memories.clone();
774 let mut working_infs_by_parent = self.inferentials_by_parent.clone();
775 let mut emit_state = EmitState {
776 table: &mut working_table,
777 counter: &mut working_counter,
778 dag: &mut working_dag,
779 index: &mut working_index,
780 semantic_records: &mut working_sem_records,
781 semantic_by_sp: &mut working_sem_by_sp,
782 episodic_records: &mut working_epi_records,
783 procedural_records: &mut working_pro_records,
784 procedural_by_rule: &mut working_pro_by_rule,
785 inferential_records: &mut working_inf_records,
786 inferential_by_sp: &mut working_inf_by_sp,
787 pending_episode: &mut working_pending_meta,
788 pinned: &mut working_pinned,
789 authoritative: &mut working_authoritative,
790 inferentials_by_parent: &mut working_infs_by_parent,
791 now: effective_now,
792 };
793 let records = emit(&validated, &journal, &mut emit_state).map_err(PipelineError::Emit)?;
794
795 // All stages succeeded — commit.
796 self.table = working_table;
797 self.next_memory_counter = working_counter;
798 self.last_committed_at = Some(effective_now);
799 self.dag = working_dag;
800 self.supersession_index = working_index;
801 self.semantic_records = working_sem_records;
802 self.semantic_by_sp_history = working_sem_by_sp;
803 self.episodic_records = working_epi_records;
804 self.procedural_records = working_pro_records;
805 self.procedural_by_rule_history = working_pro_by_rule;
806 self.inferential_records = working_inf_records;
807 self.inferential_by_sp_history = working_inf_by_sp;
808 self.pending_episode_metadata = working_pending_meta;
809 self.pinned_memories = working_pinned;
810 self.authoritative_memories = working_authoritative;
811 self.inferentials_by_parent = working_infs_by_parent;
812
813 let (memory_count, edge_count) = count_memory_and_edge_records(&records);
814 span.record("record_count", records.len());
815 span.record("memory_count", memory_count);
816 span.record("edge_count", edge_count);
817
818 Ok(records)
819 }
820}
821
822/// `(memory_count, edge_count)` tally over a record batch — identifiers
823/// only, no payload inspection. Consumed by the
824/// `mimir.pipeline.compile_batch` span.
825fn count_memory_and_edge_records(records: &[CanonicalRecord]) -> (usize, usize) {
826 let mut memory = 0_usize;
827 let mut edge = 0_usize;
828 for r in records {
829 match r {
830 CanonicalRecord::Sem(_)
831 | CanonicalRecord::Epi(_)
832 | CanonicalRecord::Pro(_)
833 | CanonicalRecord::Inf(_) => memory += 1,
834 CanonicalRecord::Supersedes(_)
835 | CanonicalRecord::Corrects(_)
836 | CanonicalRecord::StaleParent(_)
837 | CanonicalRecord::Reconfirms(_) => edge += 1,
838 _ => {}
839 }
840 }
841 (memory, edge)
842}
843
844/// Mutable write-time state threaded through the emit stage.
845///
846/// Bundles every field that `emit` / `emit_form` may mutate so the
847/// function signatures don't grow a parameter for each. Individual
848/// helpers borrow what they need.
849struct EmitState<'a> {
850 table: &'a mut SymbolTable,
851 counter: &'a mut u64,
852 dag: &'a mut SupersessionDag,
853 index: &'a mut SupersessionIndex,
854 semantic_records: &'a mut Vec<SemRecord>,
855 /// `(s, p) -> indices` over `semantic_records`; see
856 /// [`Pipeline::semantic_by_sp_history`].
857 semantic_by_sp: &'a mut BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
858 episodic_records: &'a mut Vec<EpiRecord>,
859 procedural_records: &'a mut Vec<ProRecord>,
860 /// `rule_id -> indices` over `procedural_records`; see
861 /// [`Pipeline::procedural_by_rule_history`].
862 procedural_by_rule: &'a mut BTreeMap<SymbolId, Vec<usize>>,
863 inferential_records: &'a mut Vec<InfRecord>,
864 /// `(s, p) -> indices` over `inferential_records`; see
865 /// [`Pipeline::inferential_history_at`].
866 inferential_by_sp: &'a mut BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
867 /// Pending batch-level Episode metadata from an
868 /// `(episode :start …)` form, threaded through so emit can
869 /// populate and `compile_batch` can commit. Rolled back with the
870 /// rest of working state on failure.
871 pending_episode: &'a mut Option<PendingEpisodeMetadata>,
872 /// Pinned-memory set (`confidence-decay.md` § 7).
873 pinned: &'a mut BTreeSet<SymbolId>,
874 /// Operator-authoritative memory set
875 /// (`confidence-decay.md` § 8).
876 authoritative: &'a mut BTreeSet<SymbolId>,
877 /// Reverse parent index for Inferential staling
878 /// (`temporal-model.md` § 5.4).
879 inferentials_by_parent: &'a mut BTreeMap<SymbolId, Vec<SymbolId>>,
880 now: ClockTime,
881}
882
883/// Compute the batch commit clock per `temporal-model.md` § 9.2:
884/// `max(wall_now, last_committed_at + 1)`. Returns
885/// [`PipelineError::ClockExhausted`] if bumping past `last_committed_at`
886/// would reach the `u64::MAX` sentinel that [`ClockTime`] refuses.
887fn monotonic_commit_clock(
888 wall_now: ClockTime,
889 last_committed_at: Option<ClockTime>,
890) -> Result<ClockTime, PipelineError> {
891 let Some(prev) = last_committed_at else {
892 return Ok(wall_now);
893 };
894 if wall_now > prev {
895 return Ok(wall_now);
896 }
897 // Wall clock did not advance past the previous commit. Bump by 1ms.
898 let next_raw = prev
899 .as_millis()
900 .checked_add(1)
901 .ok_or(PipelineError::ClockExhausted {
902 last_committed_at: prev,
903 })?;
904 ClockTime::try_from_millis(next_raw).map_err(|_| PipelineError::ClockExhausted {
905 last_committed_at: prev,
906 })
907}
908
909fn emit(
910 forms: &[ValidatedForm],
911 journal: &[SymbolMutation],
912 state: &mut EmitState,
913) -> Result<Vec<CanonicalRecord>, EmitError> {
914 // Symbol events first, so replay sees allocations before the memory
915 // records that reference their IDs.
916 let mut out = Vec::with_capacity(journal.len() + forms.len());
917 for mutation in journal {
918 out.push(emit_symbol_mutation(mutation, state.now));
919 }
920 for form in forms {
921 // Alias / Rename / Retire produce no memory-level record —
922 // their durability comes via the SYMBOL_* canonical records
923 // emitted above from the bind journal.
924 if matches!(
925 form,
926 ValidatedForm::Alias { .. }
927 | ValidatedForm::Rename { .. }
928 | ValidatedForm::Retire { .. }
929 ) {
930 continue;
931 }
932 emit_form(form, state, &mut out)?;
933 }
934 Ok(out)
935}
936
937fn emit_symbol_mutation(mutation: &SymbolMutation, now: ClockTime) -> CanonicalRecord {
938 match mutation {
939 SymbolMutation::Allocate { id, name, kind } => {
940 CanonicalRecord::SymbolAlloc(SymbolEventRecord {
941 symbol_id: *id,
942 name: name.clone(),
943 symbol_kind: *kind,
944 at: now,
945 })
946 }
947 SymbolMutation::Rename {
948 id,
949 new_canonical,
950 kind,
951 } => CanonicalRecord::SymbolRename(SymbolEventRecord {
952 symbol_id: *id,
953 name: new_canonical.clone(),
954 symbol_kind: *kind,
955 at: now,
956 }),
957 SymbolMutation::Alias { id, alias, kind } => {
958 CanonicalRecord::SymbolAlias(SymbolEventRecord {
959 symbol_id: *id,
960 name: alias.clone(),
961 symbol_kind: *kind,
962 at: now,
963 })
964 }
965 SymbolMutation::Retire { id, name, kind } => {
966 CanonicalRecord::SymbolRetire(SymbolEventRecord {
967 symbol_id: *id,
968 name: name.clone(),
969 symbol_kind: *kind,
970 at: now,
971 })
972 }
973 }
974}
975
976#[allow(clippy::too_many_lines)]
977fn emit_form(
978 form: &ValidatedForm,
979 state: &mut EmitState,
980 out: &mut Vec<CanonicalRecord>,
981) -> Result<(), EmitError> {
982 match form {
983 ValidatedForm::Sem {
984 s,
985 p,
986 o,
987 source,
988 confidence,
989 valid_at,
990 projected,
991 ..
992 } => {
993 let memory_id = allocate_memory_id(state, out)?;
994 // Auto-supersession per temporal-model.md § 5.1: look up
995 // an existing Semantic memory at `(s, p)` and decide
996 // forward / retroactive / conflict.
997 let (record_invalid_at, supersession) =
998 resolve_semantic_supersession(state.index, memory_id, *s, *p, *valid_at)?;
999 let sem = SemRecord {
1000 memory_id,
1001 s: *s,
1002 p: *p,
1003 o: o.clone(),
1004 source: *source,
1005 confidence: *confidence,
1006 clocks: Clocks {
1007 valid_at: *valid_at,
1008 observed_at: state.now,
1009 committed_at: state.now,
1010 invalid_at: record_invalid_at,
1011 },
1012 flags: SemFlags {
1013 projected: *projected,
1014 },
1015 };
1016 let sem_index = state.semantic_records.len();
1017 state.semantic_records.push(sem.clone());
1018 state
1019 .semantic_by_sp
1020 .entry((*s, *p))
1021 .or_default()
1022 .push(sem_index);
1023 out.push(CanonicalRecord::Sem(sem));
1024 // Emit the Supersedes edge *after* the new memory so a
1025 // log reader sees the memory first, then the edge that
1026 // refers to it.
1027 if let Some(target) = supersession {
1028 emit_supersedes_edge(state, out, memory_id, target)?;
1029 }
1030 }
1031 ValidatedForm::Epi {
1032 event_id,
1033 kind,
1034 participants,
1035 location,
1036 at_time,
1037 observed_at,
1038 source,
1039 confidence,
1040 ..
1041 } => {
1042 let memory_id = allocate_memory_id(state, out)?;
1043 let epi = EpiRecord {
1044 memory_id,
1045 event_id: *event_id,
1046 kind: *kind,
1047 participants: participants.clone(),
1048 location: *location,
1049 at_time: *at_time,
1050 observed_at: *observed_at,
1051 source: *source,
1052 confidence: *confidence,
1053 committed_at: state.now,
1054 invalid_at: None,
1055 };
1056 state.episodic_records.push(epi.clone());
1057 out.push(CanonicalRecord::Epi(epi));
1058 }
1059 ValidatedForm::Pro {
1060 rule_id,
1061 trigger,
1062 action,
1063 precondition,
1064 scope,
1065 source,
1066 confidence,
1067 ..
1068 } => {
1069 let memory_id = allocate_memory_id(state, out)?;
1070 let pro = ProRecord {
1071 memory_id,
1072 rule_id: *rule_id,
1073 trigger: trigger.clone(),
1074 action: action.clone(),
1075 precondition: precondition.clone(),
1076 scope: *scope,
1077 source: *source,
1078 confidence: *confidence,
1079 clocks: Clocks {
1080 valid_at: state.now,
1081 observed_at: state.now,
1082 committed_at: state.now,
1083 invalid_at: None,
1084 },
1085 };
1086 let pro_index = state.procedural_records.len();
1087 state.procedural_records.push(pro.clone());
1088 state
1089 .procedural_by_rule
1090 .entry(*rule_id)
1091 .or_default()
1092 .push(pro_index);
1093 out.push(CanonicalRecord::Pro(pro));
1094 // Auto-supersession per § 5.2: dedup by `rule_id` OR
1095 // `(trigger, scope)`. Either match triggers supersession;
1096 // if both match distinct prior memories, both get edges.
1097 let superseded = apply_procedural_supersession(
1098 state.index,
1099 memory_id,
1100 state.now,
1101 *rule_id,
1102 trigger,
1103 *scope,
1104 )?;
1105 for old in superseded {
1106 emit_supersedes_edge(state, out, memory_id, old)?;
1107 }
1108 }
1109 ValidatedForm::Inf {
1110 s,
1111 p,
1112 o,
1113 derived_from,
1114 method,
1115 confidence,
1116 valid_at,
1117 projected,
1118 } => {
1119 let memory_id = allocate_memory_id(state, out)?;
1120 // Write-time stale flag (`temporal-model.md` § 5.4):
1121 // an Inferential derived from an already-superseded
1122 // parent is born stale. A parent is superseded iff the
1123 // DAG carries an incoming `Supersedes` edge on it.
1124 let born_stale = derived_from
1125 .iter()
1126 .any(|parent| parent_is_superseded(state.dag, *parent));
1127 // Auto-supersession per temporal-model.md § 5.4 ("auto-
1128 // supersession rule as if Inferential were Semantic —
1129 // same (s, p) later valid_at"). Shares the Sem § 5.1
1130 // forward / retroactive / conflict logic.
1131 let (record_invalid_at, supersession) =
1132 resolve_inferential_supersession(state.index, memory_id, *s, *p, *valid_at)?;
1133 let inf = InfRecord {
1134 memory_id,
1135 s: *s,
1136 p: *p,
1137 o: o.clone(),
1138 derived_from: derived_from.clone(),
1139 method: *method,
1140 confidence: *confidence,
1141 clocks: Clocks {
1142 valid_at: *valid_at,
1143 observed_at: state.now,
1144 committed_at: state.now,
1145 invalid_at: record_invalid_at,
1146 },
1147 flags: InfFlags {
1148 projected: *projected,
1149 stale: born_stale,
1150 },
1151 };
1152 let inf_index = state.inferential_records.len();
1153 state.inferential_records.push(inf.clone());
1154 state
1155 .inferential_by_sp
1156 .entry((*s, *p))
1157 .or_default()
1158 .push(inf_index);
1159 out.push(CanonicalRecord::Inf(inf));
1160 // Register in the reverse-parent index so future
1161 // supersessions on any parent can emit `StaleParent`
1162 // edges against this Inferential without scanning the
1163 // full history.
1164 for parent in derived_from {
1165 state
1166 .inferentials_by_parent
1167 .entry(*parent)
1168 .or_default()
1169 .push(memory_id);
1170 }
1171 // Emit the Supersedes edge *after* the new memory so a
1172 // log reader sees the memory first, then the edge that
1173 // refers to it. Mirrors Sem § 5.1 ordering.
1174 if let Some(target) = supersession {
1175 emit_supersedes_edge(state, out, memory_id, target)?;
1176 }
1177 }
1178 // Alias / Rename / Retire are filtered out by emit() before
1179 // reaching here; their canonical form is the SYMBOL_* record
1180 // from the bind journal.
1181 ValidatedForm::Alias { .. }
1182 | ValidatedForm::Rename { .. }
1183 | ValidatedForm::Retire { .. } => {
1184 return Err(EmitError::Unsupported {
1185 form: "symbol-event-form-without-journal",
1186 })
1187 }
1188 ValidatedForm::Correct { .. } => return Err(EmitError::Unsupported { form: "correct" }),
1189 ValidatedForm::Promote { .. } => return Err(EmitError::Unsupported { form: "promote" }),
1190 ValidatedForm::Query { .. } => return Err(EmitError::Unsupported { form: "query" }),
1191 ValidatedForm::Episode {
1192 action,
1193 label,
1194 parent_episode,
1195 retracts,
1196 } => {
1197 // Episode forms emit no canonical record at this layer.
1198 // `:start` deposits metadata in the batch's pending slot
1199 // for Store to attach to the CHECKPOINT; `:close` is a
1200 // no-op under the single-batch-per-Episode model (spec
1201 // § 3.1 — the compile_batch return implicitly closes).
1202 // The semantic stage guarantees at most one Episode
1203 // form per batch, so we can unconditionally overwrite.
1204 if matches!(action, crate::parse::EpisodeAction::Start) {
1205 *state.pending_episode = Some(PendingEpisodeMetadata {
1206 label: label.clone(),
1207 parent_episode: *parent_episode,
1208 retracts: retracts.clone(),
1209 });
1210 } else {
1211 // `:close` — still clear any stale pending metadata
1212 // so the batch signals "no new Episode metadata."
1213 *state.pending_episode = None;
1214 }
1215 }
1216 ValidatedForm::Flag {
1217 action,
1218 memory,
1219 actor,
1220 } => {
1221 let record = crate::canonical::FlagEventRecord {
1222 memory_id: *memory,
1223 at: state.now,
1224 actor_symbol: *actor,
1225 };
1226 // Flip the pipeline's pin / auth sets alongside the
1227 // canonical emission so the read path picks up the new
1228 // state without needing a round-trip through replay.
1229 match action {
1230 crate::parse::FlagAction::Pin => {
1231 state.pinned.insert(*memory);
1232 out.push(CanonicalRecord::Pin(record));
1233 }
1234 crate::parse::FlagAction::Unpin => {
1235 state.pinned.remove(memory);
1236 out.push(CanonicalRecord::Unpin(record));
1237 }
1238 crate::parse::FlagAction::AuthoritativeSet => {
1239 state.authoritative.insert(*memory);
1240 out.push(CanonicalRecord::AuthoritativeSet(record));
1241 }
1242 crate::parse::FlagAction::AuthoritativeClear => {
1243 state.authoritative.remove(memory);
1244 out.push(CanonicalRecord::AuthoritativeClear(record));
1245 }
1246 }
1247 }
1248 }
1249 Ok(())
1250}
1251
1252/// Resolve the auto-supersession decision for a new Semantic memory
1253/// against the current-state index. Per `temporal-model.md` § 5.1:
1254///
1255/// - No prior at `(s, p)`: insert, no edge.
1256/// - New `valid_at > old.valid_at` (forward): replace the index
1257/// entry; the caller emits a Supersedes edge `new → old`.
1258/// - New `valid_at < old.valid_at` (retroactive correction): the new
1259/// memory is valid only for the period up to `old.valid_at`, so
1260/// its `invalid_at` is set at write time and the index entry for
1261/// `(s, p)` stays pointed at `old`. A Supersedes edge still records
1262/// the temporal relationship.
1263/// - Equal `valid_at`: two memories at the same `(s, p)` claiming
1264/// identical validity start cannot both be authoritative under the
1265/// single-writer-per-workspace invariant. Surface as an emit-time
1266/// error so the agent picks a distinct `valid_at` and re-batches.
1267fn resolve_semantic_supersession(
1268 index: &mut SupersessionIndex,
1269 new_memory_id: SymbolId,
1270 s: SymbolId,
1271 p: SymbolId,
1272 new_valid_at: ClockTime,
1273) -> Result<(Option<ClockTime>, Option<SymbolId>), EmitError> {
1274 let key = (s, p);
1275 let Some(old) = index.semantic_by_sp.get(&key).copied() else {
1276 index.semantic_by_sp.insert(
1277 key,
1278 CurrentSemantic {
1279 memory_id: new_memory_id,
1280 valid_at: new_valid_at,
1281 },
1282 );
1283 return Ok((None, None));
1284 };
1285 match new_valid_at.cmp(&old.valid_at) {
1286 std::cmp::Ordering::Greater => {
1287 // Forward supersession.
1288 index.semantic_by_sp.insert(
1289 key,
1290 CurrentSemantic {
1291 memory_id: new_memory_id,
1292 valid_at: new_valid_at,
1293 },
1294 );
1295 tracing::info!(
1296 target: "mimir.supersession",
1297 kind = "semantic",
1298 direction = "forward",
1299 s = %s,
1300 p = %p,
1301 old_memory_id = %old.memory_id,
1302 new_memory_id = %new_memory_id,
1303 "semantic auto-supersession",
1304 );
1305 Ok((None, Some(old.memory_id)))
1306 }
1307 std::cmp::Ordering::Less => {
1308 // Retroactive: new memory's validity closes at old's start.
1309 // Index entry preserves `old` as current.
1310 //
1311 // Note the asymmetry vs forward supersession: § 6.2 #4
1312 // says `to.invalid_at = from.valid_at` for every
1313 // Supersedes edge, but § 5.1 retroactive instead sets
1314 // `from.invalid_at = to.valid_at` (the NEW memory's
1315 // invalid_at, not the old one's). We follow § 5.1 here;
1316 // 6.4's as-of resolver must not derive invalid_at from
1317 // edges alone — it has to read it from the memory record
1318 // directly.
1319 tracing::info!(
1320 target: "mimir.supersession",
1321 kind = "semantic",
1322 direction = "retroactive",
1323 s = %s,
1324 p = %p,
1325 old_memory_id = %old.memory_id,
1326 new_memory_id = %new_memory_id,
1327 "semantic auto-supersession",
1328 );
1329 Ok((Some(old.valid_at), Some(old.memory_id)))
1330 }
1331 std::cmp::Ordering::Equal => Err(EmitError::SemanticSupersessionConflict {
1332 s,
1333 p,
1334 valid_at: new_valid_at,
1335 existing: old.memory_id,
1336 }),
1337 }
1338}
1339
1340/// Resolve auto-supersession for a new Inferential memory against
1341/// the current-state index per `temporal-model.md` § 5.4. Mirrors
1342/// [`resolve_semantic_supersession`]: a re-derivation with the same
1343/// `(s, p)` and a later `valid_at` forward-supersedes; an earlier
1344/// `valid_at` is retroactive and closes the *new* memory's validity
1345/// at the existing record's `valid_at`; equal `valid_at` is a
1346/// conflict under the single-writer invariant.
1347fn resolve_inferential_supersession(
1348 index: &mut SupersessionIndex,
1349 new_memory_id: SymbolId,
1350 s: SymbolId,
1351 p: SymbolId,
1352 new_valid_at: ClockTime,
1353) -> Result<(Option<ClockTime>, Option<SymbolId>), EmitError> {
1354 let key = (s, p);
1355 let Some(old) = index.inferential_by_sp.get(&key).copied() else {
1356 index.inferential_by_sp.insert(
1357 key,
1358 CurrentSemantic {
1359 memory_id: new_memory_id,
1360 valid_at: new_valid_at,
1361 },
1362 );
1363 return Ok((None, None));
1364 };
1365 match new_valid_at.cmp(&old.valid_at) {
1366 std::cmp::Ordering::Greater => {
1367 index.inferential_by_sp.insert(
1368 key,
1369 CurrentSemantic {
1370 memory_id: new_memory_id,
1371 valid_at: new_valid_at,
1372 },
1373 );
1374 tracing::info!(
1375 target: "mimir.supersession",
1376 kind = "inferential",
1377 direction = "forward",
1378 s = %s,
1379 p = %p,
1380 old_memory_id = %old.memory_id,
1381 new_memory_id = %new_memory_id,
1382 "inferential auto-supersession",
1383 );
1384 Ok((None, Some(old.memory_id)))
1385 }
1386 std::cmp::Ordering::Less => {
1387 // Retroactive: new memory's validity closes at old's
1388 // start. Index entry preserves `old` as current.
1389 tracing::info!(
1390 target: "mimir.supersession",
1391 kind = "inferential",
1392 direction = "retroactive",
1393 s = %s,
1394 p = %p,
1395 old_memory_id = %old.memory_id,
1396 new_memory_id = %new_memory_id,
1397 "inferential auto-supersession",
1398 );
1399 Ok((Some(old.valid_at), Some(old.memory_id)))
1400 }
1401 std::cmp::Ordering::Equal => Err(EmitError::InferentialSupersessionConflict {
1402 s,
1403 p,
1404 valid_at: new_valid_at,
1405 existing: old.memory_id,
1406 }),
1407 }
1408}
1409
1410/// Resolve auto-supersession for a new Procedural memory against
1411/// the current-state index per `temporal-model.md` § 5.2.
1412///
1413/// Either a `rule_id` match or a `(trigger, scope)` match triggers
1414/// supersession. If both keys match distinct existing memories (i.e.
1415/// the new memory's `rule_id` points to memory A and its
1416/// `(trigger, scope)` points to memory B ≠ A), BOTH are superseded
1417/// and the caller emits a `Supersedes` edge for each. The case
1418/// where both lookups converge to the same memory (a write that
1419/// duplicates an existing Pro on both keys) dedupes to a single
1420/// edge.
1421///
1422/// Side effects: removes the superseded memories' index entries
1423/// under both of their keys (once superseded, a memory is no longer
1424/// current on *any* key) and inserts the new memory under both of
1425/// its keys.
1426///
1427/// # Errors
1428///
1429/// [`EmitError::ProceduralSupersessionConflict`] if any matched
1430/// predecessor shares `committed_at` with the new memory — i.e.,
1431/// both were emitted in the same batch. This is the Pro analog of
1432/// Semantic's equal-`valid_at` rejection: silently accepting the
1433/// edge would produce a zero-duration "supersession" that's almost
1434/// certainly an agent bug.
1435fn apply_procedural_supersession(
1436 index: &mut SupersessionIndex,
1437 new_memory_id: SymbolId,
1438 new_committed_at: ClockTime,
1439 rule_id: SymbolId,
1440 trigger: &crate::Value,
1441 scope: SymbolId,
1442) -> Result<Vec<SymbolId>, EmitError> {
1443 let (superseded, trigger_scope_key) = procedural_lookup(index, rule_id, trigger, scope);
1444
1445 // Reject intra-batch conflict before mutating the index — equal
1446 // committed_at means NEW and OLD were both produced in the same
1447 // batch's emit pass, which would yield a zero-duration
1448 // supersession.
1449 for old in &superseded {
1450 if old.committed_at == new_committed_at {
1451 return Err(EmitError::ProceduralSupersessionConflict {
1452 rule_id,
1453 existing: old.memory_id,
1454 });
1455 }
1456 }
1457
1458 procedural_install(
1459 index,
1460 &superseded,
1461 new_memory_id,
1462 new_committed_at,
1463 rule_id,
1464 trigger_scope_key,
1465 );
1466 if !superseded.is_empty() {
1467 tracing::info!(
1468 target: "mimir.supersession",
1469 kind = "procedural",
1470 rule_id = %rule_id,
1471 new_memory_id = %new_memory_id,
1472 superseded_count = superseded.len(),
1473 "procedural auto-supersession",
1474 );
1475 }
1476 Ok(superseded.into_iter().map(|c| c.memory_id).collect())
1477}
1478
1479/// Replay entrypoint — applies the same index mutations as the emit
1480/// path but skips the intra-batch-conflict check. Safe because the
1481/// emit path rejected any such conflict at original write time, so
1482/// no log record can contain one.
1483fn replay_procedural_supersession(
1484 index: &mut SupersessionIndex,
1485 new_memory_id: SymbolId,
1486 new_committed_at: ClockTime,
1487 rule_id: SymbolId,
1488 trigger: &crate::Value,
1489 scope: SymbolId,
1490) {
1491 let (superseded, trigger_scope_key) = procedural_lookup(index, rule_id, trigger, scope);
1492 procedural_install(
1493 index,
1494 &superseded,
1495 new_memory_id,
1496 new_committed_at,
1497 rule_id,
1498 trigger_scope_key,
1499 );
1500}
1501
1502/// Look up the (up to two) prior Procedural memories matched by
1503/// `rule_id` and `(trigger, scope)`. Returns the canonical
1504/// `(trigger_bytes, scope)` key alongside the matches so callers can
1505/// reuse it for insertion without re-encoding.
1506fn procedural_lookup(
1507 index: &SupersessionIndex,
1508 rule_id: SymbolId,
1509 trigger: &crate::Value,
1510 scope: SymbolId,
1511) -> (Vec<CurrentProcedural>, (Vec<u8>, SymbolId)) {
1512 let trigger_scope_key = (trigger.index_key_bytes(), scope);
1513 let by_rule = index.procedural_by_rule.get(&rule_id).copied();
1514 let by_ts = index
1515 .procedural_by_trigger_scope
1516 .get(&trigger_scope_key)
1517 .copied();
1518
1519 let mut superseded: Vec<CurrentProcedural> = Vec::new();
1520 if let Some(old) = by_rule {
1521 superseded.push(old);
1522 }
1523 if let Some(old) = by_ts {
1524 if !superseded
1525 .iter()
1526 .any(|existing| existing.memory_id == old.memory_id)
1527 {
1528 superseded.push(old);
1529 }
1530 }
1531 (superseded, trigger_scope_key)
1532}
1533
1534/// Clear every superseded memory's BOTH keys and insert NEW under
1535/// both of its keys. Shared by emit + replay.
1536fn procedural_install(
1537 index: &mut SupersessionIndex,
1538 superseded: &[CurrentProcedural],
1539 new_memory_id: SymbolId,
1540 new_committed_at: ClockTime,
1541 rule_id: SymbolId,
1542 trigger_scope_key: (Vec<u8>, SymbolId),
1543) {
1544 for old in superseded {
1545 if let Some(keys) = index.procedural_keys_by_memory.remove(&old.memory_id) {
1546 index.procedural_by_rule.remove(&keys.rule_id);
1547 index
1548 .procedural_by_trigger_scope
1549 .remove(&keys.trigger_scope);
1550 }
1551 }
1552 let new_entry = CurrentProcedural {
1553 memory_id: new_memory_id,
1554 committed_at: new_committed_at,
1555 };
1556 let new_keys = ProceduralKeys {
1557 rule_id,
1558 trigger_scope: trigger_scope_key.clone(),
1559 };
1560 index.procedural_by_rule.insert(rule_id, new_entry);
1561 index
1562 .procedural_by_trigger_scope
1563 .insert(trigger_scope_key, new_entry);
1564 index
1565 .procedural_keys_by_memory
1566 .insert(new_memory_id, new_keys);
1567}
1568
1569/// Push a `Supersedes` edge record into the output stream and add
1570/// the matching in-memory edge to the working DAG. Also emits a
1571/// `StaleParent` edge from each Inferential derived from `to` —
1572/// per `temporal-model.md` § 5.4, a superseded parent invalidates
1573/// every dependent Inferential at the supersession instant.
1574fn emit_supersedes_edge(
1575 state: &mut EmitState,
1576 out: &mut Vec<CanonicalRecord>,
1577 from: SymbolId,
1578 to: SymbolId,
1579) -> Result<(), EmitError> {
1580 out.push(CanonicalRecord::Supersedes(EdgeRecord {
1581 from,
1582 to,
1583 at: state.now,
1584 }));
1585 state
1586 .dag
1587 .add_edge(DagEdge {
1588 kind: EdgeKind::Supersedes,
1589 from,
1590 to,
1591 at: state.now,
1592 })
1593 .map_err(EmitError::SupersessionDag)?;
1594 // Inferential staling: every Inferential that derived from the
1595 // now-superseded `to` gets a `StaleParent` edge committed in the
1596 // same batch. The reverse index is populated at Inf emit + log
1597 // replay so this lookup is O(log n) + O(k) in the dependent
1598 // count, typically ≤ 3.
1599 if let Some(dependents) = state.inferentials_by_parent.get(&to).cloned() {
1600 for inf_id in dependents {
1601 emit_stale_parent_edge(state, out, inf_id, to)?;
1602 }
1603 }
1604 Ok(())
1605}
1606
1607/// Emit a `StaleParent` edge (`inf → parent`) as part of the
1608/// Inferential-staling retroactive propagation. Added to both the
1609/// output stream (for durability) and the working DAG (so further
1610/// batch-local logic sees it).
1611fn emit_stale_parent_edge(
1612 state: &mut EmitState,
1613 out: &mut Vec<CanonicalRecord>,
1614 inf_id: SymbolId,
1615 parent_id: SymbolId,
1616) -> Result<(), EmitError> {
1617 out.push(CanonicalRecord::StaleParent(EdgeRecord {
1618 from: inf_id,
1619 to: parent_id,
1620 at: state.now,
1621 }));
1622 state
1623 .dag
1624 .add_edge(DagEdge {
1625 kind: EdgeKind::StaleParent,
1626 from: inf_id,
1627 to: parent_id,
1628 at: state.now,
1629 })
1630 .map_err(EmitError::SupersessionDag)?;
1631 Ok(())
1632}
1633
1634/// True if `parent` has an incoming `Supersedes` edge in `dag` —
1635/// i.e. it has been superseded. Used at Inf emit time for the
1636/// write-time `stale` flag per `temporal-model.md` § 5.4.
1637fn parent_is_superseded(dag: &SupersessionDag, parent: SymbolId) -> bool {
1638 dag.edges_to(parent)
1639 .any(|e| matches!(e.kind, EdgeKind::Supersedes))
1640}
1641
1642fn allocate_memory_id(
1643 state: &mut EmitState,
1644 out: &mut Vec<CanonicalRecord>,
1645) -> Result<SymbolId, EmitError> {
1646 // `__mem_{n}` is the librarian's conventional memory-ID name; the
1647 // identifier grammar (ir-write-surface.md § 3.1) does not formally
1648 // reserve the `__` prefix, so a pathological agent could in
1649 // principle land a collision — if so, bind surfaces it as
1650 // `EmitError::MemoryIdAllocation` (the collision does not silently
1651 // overwrite an existing symbol).
1652 let name = format!("__mem_{}", *state.counter);
1653 *state.counter += 1;
1654 let id = state
1655 .table
1656 .allocate(name.clone(), SymbolKind::Memory)
1657 .map_err(|cause| EmitError::MemoryIdAllocation {
1658 name: name.clone(),
1659 cause,
1660 })?;
1661 // Emit a SymbolAlloc canonical record so the memory symbol is
1662 // recoverable on log replay (same constraint that applies to the
1663 // bind-journal-derived SymbolAlloc records).
1664 out.push(CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1665 symbol_id: id,
1666 name,
1667 symbol_kind: SymbolKind::Memory,
1668 at: state.now,
1669 }));
1670 Ok(id)
1671}
1672
1673/// Pipeline-level error — tags the stage at which compilation failed.
1674/// Every variant means the batch did **not** commit; the workspace
1675/// state is unchanged.
1676#[derive(Debug, Error, PartialEq)]
1677pub enum PipelineError {
1678 /// Lex/parse stage failure.
1679 #[error("parse error: {0}")]
1680 Parse(#[from] ParseError),
1681
1682 /// Bind stage failure.
1683 #[error("bind error: {0}")]
1684 Bind(#[from] BindError),
1685
1686 /// Semantic stage failure.
1687 #[error("semantic error: {0}")]
1688 Semantic(#[from] SemanticError),
1689
1690 /// Emit stage failure.
1691 #[error("emit error: {0}")]
1692 Emit(#[from] EmitError),
1693
1694 /// The monotonic `committed_at` rule would advance into the
1695 /// reserved `u64::MAX` sentinel that `ClockTime` refuses to
1696 /// represent. Per `temporal-model.md` § 9.1, `ClockTime::MAX - 1`
1697 /// reaches year ≈584 000 000, so this error is effectively a guard
1698 /// against pathological inputs rather than a real-world condition.
1699 #[error(
1700 "committed_at clock exhausted: monotonic advance past {last_committed_at} would hit reserved sentinel"
1701 )]
1702 ClockExhausted {
1703 /// The previous commit clock before the attempted advance.
1704 last_committed_at: ClockTime,
1705 },
1706}
1707
1708/// Errors produced by the Emit stage. An `EmitError` always means an
1709/// invariant should have been caught earlier or the form is not yet
1710/// supported at this milestone.
1711#[derive(Debug, Error, PartialEq)]
1712pub enum EmitError {
1713 /// A form shape is not yet wired to a canonical record. See the
1714 /// module-level scope notes for which forms emit records in this
1715 /// milestone.
1716 #[error("form {form} is not yet emitted by this pipeline milestone")]
1717 Unsupported {
1718 /// Form name (`alias`, `rename`, `retire`, `correct`, `promote`, `query`).
1719 form: &'static str,
1720 },
1721
1722 /// Allocating the synthesized `__mem_{n}` symbol failed. The only
1723 /// realistic cause is a name collision with an agent-emitted symbol
1724 /// that used the reserved `__mem_` prefix, which would itself be a
1725 /// prior bug; preserved as a typed variant for diagnosability.
1726 #[error("memory-id allocation failed for {name}: {cause}")]
1727 MemoryIdAllocation {
1728 /// The synthesized name that collided.
1729 name: String,
1730 /// Underlying bind error.
1731 cause: BindError,
1732 },
1733
1734 /// Two Semantic writes land at the same `(s, p)` with identical
1735 /// `valid_at`. Per `temporal-model.md` § 5.1 two memories cannot
1736 /// both be authoritative at the same conflict key and identical
1737 /// validity start under the single-writer invariant — surface as
1738 /// a deterministic emit-time error so the agent can correct the
1739 /// `valid_at` or choose a different `(s, p)`.
1740 #[error(
1741 "semantic supersession conflict at (s={s:?}, p={p:?}) valid_at={valid_at}: new memory has the same valid_at as existing memory {existing:?}"
1742 )]
1743 SemanticSupersessionConflict {
1744 /// Subject of the conflicting write.
1745 s: SymbolId,
1746 /// Predicate of the conflicting write.
1747 p: SymbolId,
1748 /// Shared `valid_at` that triggered the conflict.
1749 valid_at: ClockTime,
1750 /// Memory ID of the existing current-state memory.
1751 existing: SymbolId,
1752 },
1753
1754 /// Two Inferential writes land at the same `(s, p)` with identical
1755 /// `valid_at`. Per `temporal-model.md` § 5.4 Inferential
1756 /// supersession mirrors Semantic § 5.1 — equal `valid_at` against
1757 /// the same `(s, p)` is a deterministic write conflict the agent
1758 /// must resolve by choosing a distinct `valid_at` or re-keying.
1759 #[error(
1760 "inferential supersession conflict at (s={s:?}, p={p:?}) valid_at={valid_at}: new memory has the same valid_at as existing memory {existing:?}"
1761 )]
1762 InferentialSupersessionConflict {
1763 /// Subject of the conflicting write.
1764 s: SymbolId,
1765 /// Predicate of the conflicting write.
1766 p: SymbolId,
1767 /// Shared `valid_at` that triggered the conflict.
1768 valid_at: ClockTime,
1769 /// Memory ID of the existing current-state memory.
1770 existing: SymbolId,
1771 },
1772
1773 /// An auto-supersession edge failed the DAG's acyclicity check.
1774 /// Cannot happen from auto-supersession alone (new memories have
1775 /// fresh IDs that cannot appear as ancestors), but the DAG's
1776 /// contract surfaces any violation rather than silently accepting.
1777 #[error("supersession DAG rejected edge: {0}")]
1778 SupersessionDag(#[from] crate::dag::DagError),
1779
1780 /// Two Procedural writes in the same batch land on overlapping
1781 /// supersession keys (same `rule_id` or same `(trigger, scope)`).
1782 /// They would share `committed_at` per § 9.2 monotonic-batch
1783 /// semantics, producing a zero-duration Supersedes edge. Per the
1784 /// Semantic analog at § 5.1 this is a deterministic write
1785 /// conflict rather than a silent accept — the agent should split
1786 /// the batch or choose distinct keys.
1787 #[error(
1788 "procedural supersession conflict: batch contains two Pro writes at the same supersession key (rule_id={rule_id:?}), first bound to {existing:?}"
1789 )]
1790 ProceduralSupersessionConflict {
1791 /// The `rule_id` under conflict.
1792 rule_id: SymbolId,
1793 /// The first in-batch Pro memory that occupies the key.
1794 existing: SymbolId,
1795 },
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800 use super::*;
1801 use crate::canonical::Opcode;
1802
1803 fn now() -> ClockTime {
1804 ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
1805 }
1806
1807 const SEM_OK: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
1808
1809 fn memory_records(records: &[CanonicalRecord]) -> Vec<&CanonicalRecord> {
1810 records
1811 .iter()
1812 .filter(|r| {
1813 matches!(
1814 r.opcode(),
1815 Opcode::Sem | Opcode::Epi | Opcode::Pro | Opcode::Inf
1816 )
1817 })
1818 .collect()
1819 }
1820
1821 #[test]
1822 fn pathological_agent_collides_with_mem_counter() {
1823 // A pathological agent mentions `@__mem_0` as a symbol in
1824 // the same batch that produces a librarian-synthesised
1825 // `__mem_0` memory-id. `allocate_memory_id` surfaces the
1826 // collision as `EmitError::MemoryIdAllocation` and rolls
1827 // back the batch — no silent skip, no overwrite.
1828 let mut pipe = Pipeline::new();
1829 let input = "(sem @alice @knows @__mem_0 :src @observation :c 0.8 :v 2024-01-15)";
1830 let err = pipe
1831 .compile_batch(input, now())
1832 .expect_err("__mem_0 collision");
1833 let PipelineError::Emit(EmitError::MemoryIdAllocation { name, .. }) = &err else {
1834 panic!("expected MemoryIdAllocation error, got {err:?}");
1835 };
1836 assert_eq!(name, "__mem_0");
1837 // Batch rolled back: neither the agent's @__mem_0 symbol
1838 // nor the other batch allocations committed.
1839 assert!(pipe.table.lookup("__mem_0").is_none());
1840 assert!(pipe.table.lookup("alice").is_none());
1841 assert_eq!(pipe.next_memory_counter, 0);
1842 }
1843
1844 #[test]
1845 fn single_sem_form_roundtrips_through_pipeline() {
1846 let mut pipe = Pipeline::new();
1847 let records = pipe.compile_batch(SEM_OK, now()).expect("compile");
1848 // SYMBOL_ALLOC records for @alice, @knows, @bob, @observation,
1849 // and __mem_0 precede the memory record.
1850 let mems = memory_records(&records);
1851 assert_eq!(mems.len(), 1);
1852 assert_eq!(mems[0].opcode(), Opcode::Sem);
1853 // Every non-memory record in this batch is a SymbolAlloc.
1854 for r in &records {
1855 assert!(matches!(r.opcode(), Opcode::Sem | Opcode::SymbolAlloc));
1856 }
1857 }
1858
1859 #[test]
1860 fn epi_records_are_retained_after_compile() {
1861 let mut pipe = Pipeline::new();
1862 let input = "(epi @evt_001 @rename (@old @new) @github \
1863 :at 2024-01-15T10:00:00Z :obs 2024-01-15T10:00:05Z \
1864 :src @observation :c 0.9)";
1865
1866 let records = pipe.compile_batch(input, now()).expect("compile");
1867
1868 let mems = memory_records(&records);
1869 assert_eq!(mems.len(), 1);
1870 assert_eq!(mems[0].opcode(), Opcode::Epi);
1871 assert_eq!(pipe.episodic_records().len(), 1);
1872 let retained = &pipe.episodic_records()[0];
1873 assert_eq!(
1874 retained.event_id,
1875 pipe.table.lookup("evt_001").expect("evt")
1876 );
1877 assert_eq!(retained.kind, pipe.table.lookup("rename").expect("kind"));
1878 assert_eq!(retained.participants.len(), 2);
1879 }
1880
1881 #[test]
1882 fn multi_form_batch_emits_in_input_order() {
1883 let mut pipe = Pipeline::new();
1884 let input = "
1885 (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
1886 (sem @alice @knows @carol :src @observation :c 0.7 :v 2024-01-16)
1887 ";
1888 let records = pipe.compile_batch(input, now()).expect("compile");
1889 let mems = memory_records(&records);
1890 assert_eq!(mems.len(), 2);
1891 for r in &mems {
1892 assert_eq!(r.opcode(), Opcode::Sem);
1893 }
1894 }
1895
1896 #[test]
1897 fn empty_input_is_a_no_op_batch() {
1898 // The semantics choice: `compile_batch("")` parses as zero
1899 // forms and produces zero records. It does NOT error. No
1900 // symbols allocated, no memory-id counter advancement, no
1901 // records emitted. This locks in the "empty is valid" path
1902 // so later callers (e.g. batched wire transport) don't
1903 // accidentally introduce a different behavior.
1904 let mut pipe = Pipeline::new();
1905 let records = pipe.compile_batch("", now()).expect("empty compiles");
1906 assert!(records.is_empty());
1907 assert_eq!(pipe.next_memory_counter, 0);
1908 // Empty batches DO advance the commit watermark — the batch
1909 // still ran, and a subsequent batch submitted with the same
1910 // wall clock must bump past it. `Store::commit_batch` relies
1911 // on this: it reads `pipeline.last_committed_at()` to stamp
1912 // the CHECKPOINT + episode alloc, so an empty batch that
1913 // silently skipped the advance would let the checkpoint
1914 // regress under a repeated wall clock.
1915 assert_eq!(pipe.last_committed_at(), Some(now()));
1916 // Whitespace-only input is equivalent.
1917 let records = pipe
1918 .compile_batch(" \n\t ", now())
1919 .expect("whitespace compiles");
1920 assert!(records.is_empty());
1921 assert_eq!(pipe.next_memory_counter, 0);
1922 // Second empty batch bumped the watermark by 1 ms.
1923 assert_eq!(
1924 pipe.last_committed_at().expect("set").as_millis(),
1925 now().as_millis() + 1
1926 );
1927 }
1928
1929 #[test]
1930 fn parse_error_does_not_mutate_table() {
1931 let mut pipe = Pipeline::new();
1932 let before_table = pipe.table.clone();
1933 let err = pipe.compile_batch("(sem @a", now()).expect_err("malformed");
1934 assert!(matches!(err, PipelineError::Parse(_)));
1935 assert_eq!(pipe.table, before_table);
1936 assert_eq!(pipe.next_memory_counter, 0);
1937 }
1938
1939 #[test]
1940 fn bind_error_in_mid_batch_rolls_back_all_prior_allocations() {
1941 let mut pipe = Pipeline::new();
1942 // First form allocates @x as Agent (subject slot) and @rel as
1943 // Predicate. Second form reuses @x in the predicate slot, where
1944 // it must be Predicate — kind mismatch.
1945 let input = "
1946 (sem @x @rel @y :src @observation :c 0.8 :v 2024-01-15)
1947 (sem @alice @x @z :src @observation :c 0.8 :v 2024-01-16)
1948 ";
1949 let err = pipe.compile_batch(input, now()).expect_err("kind mismatch");
1950 assert!(matches!(err, PipelineError::Bind(_)));
1951 // Batch rollback: no symbol from this batch committed.
1952 assert!(pipe.table.lookup("x").is_none());
1953 assert!(pipe.table.lookup("rel").is_none());
1954 assert!(pipe.table.lookup("y").is_none());
1955 assert_eq!(pipe.next_memory_counter, 0);
1956 }
1957
1958 #[test]
1959 fn semantic_error_rolls_back_all_prior_allocations() {
1960 let mut pipe = Pipeline::new();
1961 // Registry bound is 0.98; request 0.99 — semantic rejects.
1962 let input = "(sem @a @knows @b :src @registry :c 0.99 :v 2024-01-15)";
1963 let err = pipe
1964 .compile_batch(input, now())
1965 .expect_err("conf over bound");
1966 assert!(matches!(err, PipelineError::Semantic(_)));
1967 assert!(pipe.table.lookup("a").is_none());
1968 assert!(pipe.table.lookup("b").is_none());
1969 }
1970
1971 #[test]
1972 fn successful_batch_commits_table_and_counter() {
1973 let mut pipe = Pipeline::new();
1974 let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
1975 assert!(pipe.table.lookup("alice").is_some());
1976 assert_eq!(pipe.next_memory_counter, 1);
1977
1978 let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1979 let _ = pipe.compile_batch(input2, now()).expect("second");
1980 // Alice from first batch persisted — reused, not reallocated.
1981 assert_eq!(pipe.next_memory_counter, 2);
1982 }
1983
1984 #[test]
1985 fn successive_calls_produce_distinct_memory_ids() {
1986 let mut pipe = Pipeline::new();
1987 let r1 = pipe.compile_batch(SEM_OK, now()).expect("first");
1988 let r2 = pipe
1989 .compile_batch(
1990 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-01-16)",
1991 now(),
1992 )
1993 .expect("second");
1994 let Some(CanonicalRecord::Sem(s1)) = memory_records(&r1).first().copied() else {
1995 panic!("expected Sem in first batch");
1996 };
1997 let Some(CanonicalRecord::Sem(s2)) = memory_records(&r2).first().copied() else {
1998 panic!("expected Sem in second batch");
1999 };
2000 assert_ne!(s1.memory_id, s2.memory_id);
2001 }
2002
2003 #[test]
2004 fn retire_form_emits_symbol_retire_not_error() {
2005 // A retire form commits as a SymbolRetire canonical record via
2006 // the bind journal; it does NOT surface a memory-level record
2007 // and does NOT return `EmitError::Unsupported`.
2008 let mut pipe = Pipeline::new();
2009 let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
2010 let records = pipe
2011 .compile_batch("(retire @alice)", now())
2012 .expect("retire supported");
2013 // Exactly one SymbolRetire; no memory records.
2014 assert!(records.iter().any(|r| r.opcode() == Opcode::SymbolRetire));
2015 assert!(memory_records(&records).is_empty());
2016 }
2017
2018 #[test]
2019 fn same_input_produces_byte_identical_records() {
2020 let input = "
2021 (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
2022 (sem @alice @knows @carol :src @observation :c 0.7 :v 2024-01-16)
2023 ";
2024 let fixed_now = now();
2025 let mut pipe_a = Pipeline::new();
2026 let mut pipe_b = Pipeline::new();
2027 let a = pipe_a.compile_batch(input, fixed_now).expect("a");
2028 let b = pipe_b.compile_batch(input, fixed_now).expect("b");
2029 assert_eq!(a, b);
2030 }
2031
2032 #[test]
2033 fn clocks_populated_from_now_parameter() {
2034 let mut pipe = Pipeline::new();
2035 // `now` must be >= the form's `:v` date or semantic rejects as
2036 // future-validity; set both consistently.
2037 let t = now();
2038 let records = pipe.compile_batch(SEM_OK, t).expect("compile");
2039 let Some(CanonicalRecord::Sem(sem)) = memory_records(&records).first().copied() else {
2040 panic!("expected Sem");
2041 };
2042 assert_eq!(sem.clocks.observed_at, t);
2043 assert_eq!(sem.clocks.committed_at, t);
2044 assert_eq!(sem.clocks.invalid_at, None);
2045 }
2046
2047 #[test]
2048 fn first_batch_uses_wall_clock_as_committed_at() {
2049 let mut pipe = Pipeline::new();
2050 assert_eq!(pipe.last_committed_at(), None);
2051 let t = now();
2052 let _ = pipe.compile_batch(SEM_OK, t).expect("compile");
2053 assert_eq!(pipe.last_committed_at(), Some(t));
2054 }
2055
2056 #[test]
2057 fn monotonic_commit_clock_bumps_past_regressing_wall_clock() {
2058 // temporal-model.md § 9.2 / § 12 #1: committed_at must be
2059 // strictly monotonic per workspace even when the host wall
2060 // clock regresses (NTP correction, VM clock warp, etc.).
2061 let mut pipe = Pipeline::new();
2062 let t1 = ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel");
2063 let t_regressed = ClockTime::try_from_millis(1_713_350_300_000).expect("non-sentinel");
2064
2065 let first = pipe
2066 .compile_batch(SEM_OK, t1)
2067 .expect("first batch commits at t1");
2068 let first_sem = first.iter().find_map(|r| match r {
2069 CanonicalRecord::Sem(s) => Some(s),
2070 _ => None,
2071 });
2072 assert_eq!(first_sem.expect("sem").clocks.committed_at, t1);
2073
2074 // Second batch submitted with wall clock that regressed; the
2075 // librarian must bump committed_at to t1 + 1 ms. Use a
2076 // distinct predicate so auto-supersession doesn't interfere.
2077 let second = pipe
2078 .compile_batch(
2079 "(sem @alice @likes @carol :src @observation :c 0.8 :v 2024-01-15)",
2080 t_regressed,
2081 )
2082 .expect("second batch");
2083 let second_sem = second.iter().find_map(|r| match r {
2084 CanonicalRecord::Sem(s) => Some(s),
2085 _ => None,
2086 });
2087 let expected = ClockTime::try_from_millis(t1.as_millis() + 1).expect("non-sentinel");
2088 assert_eq!(second_sem.expect("sem").clocks.committed_at, expected);
2089 assert_eq!(pipe.last_committed_at(), Some(expected));
2090 }
2091
2092 #[test]
2093 fn identical_wall_clock_across_batches_still_bumps_committed_at() {
2094 let mut pipe = Pipeline::new();
2095 let t = now();
2096 let _ = pipe.compile_batch(SEM_OK, t).expect("first");
2097 // Same `t` on purpose — two batches in the same millisecond.
2098 // Use a distinct predicate so Semantic auto-supersession
2099 // doesn't reject this as a (s, p, valid_at) conflict — we're
2100 // testing the commit-clock bump, not supersession.
2101 let second = pipe
2102 .compile_batch(
2103 "(sem @alice @likes @dave :src @observation :c 0.8 :v 2024-01-15)",
2104 t,
2105 )
2106 .expect("second");
2107 let second_sem = second.iter().find_map(|r| match r {
2108 CanonicalRecord::Sem(s) => Some(s),
2109 _ => None,
2110 });
2111 assert_eq!(
2112 second_sem.expect("sem").clocks.committed_at.as_millis(),
2113 t.as_millis() + 1
2114 );
2115 }
2116
2117 #[test]
2118 fn failed_batch_does_not_advance_commit_watermark() {
2119 let mut pipe = Pipeline::new();
2120 let t1 = now();
2121 let _ = pipe.compile_batch(SEM_OK, t1).expect("seed");
2122 let watermark_before = pipe.last_committed_at();
2123 // This batch fails in semantic (confidence > registry bound).
2124 let t2 = ClockTime::try_from_millis(t1.as_millis() + 10_000).expect("non-sentinel");
2125 let err = pipe
2126 .compile_batch(
2127 "(sem @alice @knows @bob :src @registry :c 0.99 :v 2024-01-15)",
2128 t2,
2129 )
2130 .expect_err("semantic reject");
2131 assert!(matches!(err, PipelineError::Semantic(_)));
2132 // Watermark must not have advanced — the monotonic commit
2133 // clock is part of the batch-atomic state per § 11.3.
2134 assert_eq!(pipe.last_committed_at(), watermark_before);
2135 }
2136
2137 #[test]
2138 fn monotonic_commit_clock_helper_returns_wall_clock_when_unset() {
2139 let t = now();
2140 assert_eq!(monotonic_commit_clock(t, None).expect("fresh"), t);
2141 }
2142
2143 #[test]
2144 fn monotonic_commit_clock_helper_bumps_when_wall_clock_not_ahead() {
2145 let prev = ClockTime::try_from_millis(1_000_000).expect("non-sentinel");
2146 // Wall clock exactly at prev.
2147 let at = monotonic_commit_clock(prev, Some(prev)).expect("bump");
2148 assert_eq!(at.as_millis(), 1_000_001);
2149 // Wall clock behind prev.
2150 let behind = ClockTime::try_from_millis(500_000).expect("non-sentinel");
2151 let at = monotonic_commit_clock(behind, Some(prev)).expect("bump");
2152 assert_eq!(at.as_millis(), 1_000_001);
2153 }
2154
2155 #[test]
2156 fn clock_exhaustion_returns_typed_error() {
2157 // `ClockTime::try_from_millis` refuses `u64::MAX` (reserved
2158 // sentinel), so the bump rule at `MAX - 1` must surface the
2159 // typed error rather than panic.
2160 let max_valid = ClockTime::try_from_millis(u64::MAX - 1).expect("non-sentinel");
2161 let err = monotonic_commit_clock(max_valid, Some(max_valid))
2162 .expect_err("must exhaust past MAX - 1");
2163 let PipelineError::ClockExhausted { last_committed_at } = err else {
2164 panic!("expected ClockExhausted, got {err:?}");
2165 };
2166 assert_eq!(last_committed_at, max_valid);
2167 }
2168
2169 #[test]
2170 fn semantic_future_validity_uses_wall_clock_not_monotonic_watermark() {
2171 // Discriminating scenario: place `valid_at` strictly between
2172 // the regressed wall clock and the inflated monotonic
2173 // watermark.
2174 //
2175 // wall_now < valid_at < watermark = effective_now
2176 //
2177 // Under the wall-clock choice (the spec's intent per § 9.3 —
2178 // "is this future relative to the librarian's estimate of
2179 // current time?"), semantic must reject with `FutureValidity`.
2180 // Under an (incorrect) monotonic-watermark choice, the check
2181 // would pass because valid_at < watermark. This test fails if
2182 // a future refactor silently swaps to the watermark.
2183 let mut pipe = Pipeline::new();
2184 // Inflate the watermark to 2024-01-15T00:10:00Z.
2185 let seed_wall = ClockTime::try_from_millis(1_705_277_400_000).expect("non-sentinel");
2186 let _ = pipe
2187 .compile_batch(
2188 "(sem @seed_a @seed_r @seed_b :src @observation :c 0.8 :v 2024-01-14)",
2189 seed_wall,
2190 )
2191 .expect("seed");
2192 let watermark = pipe.last_committed_at().expect("set");
2193
2194 // Regress wall clock to 2024-01-15T00:00:00Z.
2195 let regressed_wall = ClockTime::try_from_millis(1_705_276_800_000).expect("non-sentinel");
2196 assert!(regressed_wall < watermark);
2197
2198 // valid_at = 2024-01-15T00:05:00Z — future relative to
2199 // regressed_wall but past relative to watermark. Wall-clock
2200 // semantic must reject.
2201 let err = pipe
2202 .compile_batch(
2203 "(sem @alice @knows @emma :src @observation :c 0.8 :v 2024-01-15T00:05:00Z)",
2204 regressed_wall,
2205 )
2206 .expect_err("must reject future valid_at under regressed wall clock");
2207 assert!(
2208 matches!(
2209 err,
2210 PipelineError::Semantic(SemanticError::FutureValidity { .. })
2211 ),
2212 "expected FutureValidity, got {err:?}"
2213 );
2214
2215 // Sanity: the same form with a past valid_at must still be
2216 // accepted, and committed_at must advance past the watermark.
2217 let records = pipe
2218 .compile_batch(
2219 "(sem @alice @knows @emma :src @observation :c 0.8 :v 2024-01-14)",
2220 regressed_wall,
2221 )
2222 .expect("past valid_at under regressed wall clock must succeed");
2223 let sem = records.iter().find_map(|r| match r {
2224 CanonicalRecord::Sem(s) => Some(s),
2225 _ => None,
2226 });
2227 assert!(sem.expect("sem").clocks.committed_at > watermark);
2228 }
2229
2230 // ----------------------------------------------------------------
2231 // 6.3a — Semantic auto-supersession
2232 // ----------------------------------------------------------------
2233
2234 fn sem_records(records: &[CanonicalRecord]) -> Vec<&SemRecord> {
2235 records
2236 .iter()
2237 .filter_map(|r| match r {
2238 CanonicalRecord::Sem(s) => Some(s),
2239 _ => None,
2240 })
2241 .collect()
2242 }
2243
2244 fn supersedes_edges(records: &[CanonicalRecord]) -> Vec<&EdgeRecord> {
2245 records
2246 .iter()
2247 .filter_map(|r| match r {
2248 CanonicalRecord::Supersedes(e) => Some(e),
2249 _ => None,
2250 })
2251 .collect()
2252 }
2253
2254 fn stale_parent_edges(records: &[CanonicalRecord]) -> Vec<&EdgeRecord> {
2255 records
2256 .iter()
2257 .filter_map(|r| match r {
2258 CanonicalRecord::StaleParent(e) => Some(e),
2259 _ => None,
2260 })
2261 .collect()
2262 }
2263
2264 #[test]
2265 fn inf_against_current_parent_is_not_born_stale() {
2266 let mut pipe = Pipeline::new();
2267 pipe.compile_batch(SEM_OK, now()).expect("sem");
2268 let inf_src = "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)";
2269 let records = pipe
2270 .compile_batch(inf_src, later_now())
2271 .expect("inferential");
2272 let inf = records
2273 .iter()
2274 .find_map(|r| match r {
2275 CanonicalRecord::Inf(i) => Some(i),
2276 _ => None,
2277 })
2278 .expect("inf emitted");
2279 assert!(!inf.flags.stale, "parent still current — not born stale");
2280 }
2281
2282 #[test]
2283 fn supersession_emits_stale_parent_edges_to_dependent_inferentials() {
2284 let mut pipe = Pipeline::new();
2285 pipe.compile_batch(SEM_OK, now()).expect("first sem");
2286 // Inferential derived from the first sem (__mem_0).
2287 pipe.compile_batch(
2288 "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)",
2289 later_now(),
2290 )
2291 .expect("inf");
2292 // Supersede the first Sem with a newer one at same (s, p).
2293 let records = pipe
2294 .compile_batch(
2295 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-02-01)",
2296 even_later_now(),
2297 )
2298 .expect("supersede");
2299 let stales = stale_parent_edges(&records);
2300 assert_eq!(
2301 stales.len(),
2302 1,
2303 "one dependent Inferential should receive a StaleParent edge"
2304 );
2305 let inf_id = pipe.table().lookup("__mem_1").expect("inf id");
2306 let old_parent = pipe.table().lookup("__mem_0").expect("parent id");
2307 assert_eq!(stales[0].from, inf_id);
2308 assert_eq!(stales[0].to, old_parent);
2309 }
2310
2311 #[test]
2312 fn inf_born_stale_when_parent_already_superseded() {
2313 let mut pipe = Pipeline::new();
2314 pipe.compile_batch(SEM_OK, now()).expect("first sem");
2315 pipe.compile_batch(
2316 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-02-01)",
2317 later_now(),
2318 )
2319 .expect("supersede — __mem_0 is now superseded");
2320 // Now create an Inf against the already-superseded parent.
2321 let records = pipe
2322 .compile_batch(
2323 "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)",
2324 even_later_now(),
2325 )
2326 .expect("inf");
2327 let inf = records
2328 .iter()
2329 .find_map(|r| match r {
2330 CanonicalRecord::Inf(i) => Some(i),
2331 _ => None,
2332 })
2333 .expect("inf");
2334 assert!(
2335 inf.flags.stale,
2336 "Inferential born from already-superseded parent must carry stale=true"
2337 );
2338 }
2339
2340 fn later_now() -> ClockTime {
2341 ClockTime::try_from_millis(1_713_350_400_000 + 1_000).expect("non-sentinel")
2342 }
2343
2344 fn even_later_now() -> ClockTime {
2345 ClockTime::try_from_millis(1_713_350_400_000 + 2_000).expect("non-sentinel")
2346 }
2347
2348 #[test]
2349 fn sem_with_fresh_sp_does_not_emit_supersedes_edge() {
2350 let mut pipe = Pipeline::new();
2351 let records = pipe.compile_batch(SEM_OK, now()).expect("first");
2352 assert!(
2353 supersedes_edges(&records).is_empty(),
2354 "first write at (s, p) has nothing to supersede"
2355 );
2356 assert_eq!(pipe.dag().len(), 0);
2357 }
2358
2359 #[test]
2360 fn forward_sem_emits_supersedes_edge_and_updates_index() {
2361 // First write `@alice @knows @bob` at 2024-01-15; second at
2362 // 2024-03-01 with same (s, p) supersedes forward. Both
2363 // valid_ats are before `now()` (2024-04-17) so semantic
2364 // future-validity doesn't reject.
2365 let mut pipe = Pipeline::new();
2366 let first = pipe.compile_batch(SEM_OK, now()).expect("first");
2367 let first_mem = sem_records(&first)[0].memory_id;
2368
2369 let second_input = "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)";
2370 let second = pipe.compile_batch(second_input, now()).expect("second");
2371 let sems = sem_records(&second);
2372 assert_eq!(sems.len(), 1);
2373 let second_mem = sems[0].memory_id;
2374 // Forward supersession: new's invalid_at stays None.
2375 assert_eq!(sems[0].clocks.invalid_at, None);
2376
2377 let edges = supersedes_edges(&second);
2378 assert_eq!(edges.len(), 1, "exactly one Supersedes edge");
2379 assert_eq!(edges[0].from, second_mem);
2380 assert_eq!(edges[0].to, first_mem);
2381 // DAG mirrors the edge.
2382 assert_eq!(pipe.dag().len(), 1);
2383 }
2384
2385 #[test]
2386 fn retroactive_sem_sets_invalid_at_and_preserves_existing_as_current() {
2387 // First write with valid_at 2024-03-01. Second with valid_at
2388 // 2024-01-15 (earlier) is a retroactive correction: it's
2389 // valid only up to 2024-03-01 (`new.invalid_at = old.valid_at`),
2390 // and the index keeps pointing to the newer-valid_at memory.
2391 let mut pipe = Pipeline::new();
2392 let first_input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-03-01)";
2393 let first = pipe.compile_batch(first_input, now()).expect("first");
2394 let first_mem = sem_records(&first)[0].memory_id;
2395 let first_valid_at = sem_records(&first)[0].clocks.valid_at;
2396
2397 let retro_input = "(sem @alice @knows @zoe :src @observation :c 0.8 :v 2024-01-15)";
2398 let retro = pipe.compile_batch(retro_input, now()).expect("retro");
2399 let sems = sem_records(&retro);
2400 let retro_mem = sems[0].memory_id;
2401 assert_eq!(
2402 sems[0].clocks.invalid_at,
2403 Some(first_valid_at),
2404 "retroactive new memory's invalid_at closes at old's valid_at"
2405 );
2406
2407 let edges = supersedes_edges(&retro);
2408 assert_eq!(edges.len(), 1);
2409 assert_eq!(edges[0].from, retro_mem);
2410 assert_eq!(edges[0].to, first_mem);
2411
2412 // A third forward write at 2024-04-01 must still supersede
2413 // `first` (not `retro`) — the index preserved `first` as
2414 // current after the retroactive insert.
2415 let third_input = "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-04-01)";
2416 let third = pipe.compile_batch(third_input, now()).expect("third");
2417 let third_mem = sem_records(&third)[0].memory_id;
2418 let third_edges = supersedes_edges(&third);
2419 assert_eq!(third_edges.len(), 1);
2420 assert_eq!(third_edges[0].from, third_mem);
2421 assert_eq!(
2422 third_edges[0].to, first_mem,
2423 "forward supersession targets the most-recent-valid_at memory, not the retroactive one"
2424 );
2425 }
2426
2427 #[test]
2428 fn equal_valid_at_at_same_sp_returns_supersession_conflict() {
2429 let mut pipe = Pipeline::new();
2430 let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
2431 // Same (@alice, @knows) and same valid_at 2024-01-15 — single
2432 // writer per workspace, so this is a deterministic error.
2433 let err = pipe
2434 .compile_batch(SEM_OK, now())
2435 .expect_err("equal valid_at conflicts");
2436 assert!(
2437 matches!(
2438 err,
2439 PipelineError::Emit(EmitError::SemanticSupersessionConflict { .. })
2440 ),
2441 "expected SemanticSupersessionConflict, got {err:?}"
2442 );
2443 }
2444
2445 #[test]
2446 fn disjoint_sp_pairs_do_not_supersede_each_other() {
2447 let mut pipe = Pipeline::new();
2448 let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
2449 // Different predicate — no supersession.
2450 let other = pipe
2451 .compile_batch(
2452 "(sem @alice @likes @bob :src @observation :c 0.8 :v 2024-01-15)",
2453 now(),
2454 )
2455 .expect("disjoint");
2456 assert!(supersedes_edges(&other).is_empty());
2457 assert_eq!(pipe.dag().len(), 0);
2458 }
2459
2460 #[test]
2461 fn forward_chain_produces_edge_per_link() {
2462 let mut pipe = Pipeline::new();
2463 let vs = ["2024-01-15", "2024-02-15", "2024-03-15", "2024-04-15"];
2464 for v in vs {
2465 let input = format!("(sem @alice @knows @bob :src @observation :c 0.8 :v {v})");
2466 let _ = pipe.compile_batch(&input, now()).expect("compile");
2467 }
2468 // Three supersession events across four writes.
2469 assert_eq!(pipe.dag().len(), 3);
2470 }
2471
2472 #[test]
2473 fn failed_batch_does_not_leak_edge_or_index_mutation() {
2474 // Discriminating test for the 6.2 clone-on-write contract as
2475 // applied to 6.3a's DAG + index mutations: form 1 passes all
2476 // stages and mutates both `working_dag` (edge) and
2477 // `working_index` (new (s, p) entry); form 2 passes parse +
2478 // bind + semantic but fails at EMIT because it lands on the
2479 // same (s, p, valid_at) that form 1 just wrote into
2480 // `working_index`. Per batch atomicity, BOTH mutations must
2481 // be dropped — neither the edge nor the index update can
2482 // survive into `self`.
2483 //
2484 // Without the DAG + index clones in `compile_batch`, form
2485 // 1's edge and index entry would leak past form 2's failure.
2486 let mut pipe = Pipeline::new();
2487 let _ = pipe.compile_batch(SEM_OK, now()).expect("seed");
2488 assert_eq!(pipe.dag().len(), 0, "seed did not supersede");
2489
2490 // Both forms pass semantic (confidence ≤ bound, valid_at is
2491 // past). Form 1 forward-supersedes the seed; form 2 hits the
2492 // same (s, p, valid_at) form 1 just wrote into working_index.
2493 let two_forms = "\
2494 (sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)\n\
2495 (sem @alice @knows @dan :src @observation :c 0.7 :v 2024-03-01)";
2496 let err = pipe
2497 .compile_batch(two_forms, now())
2498 .expect_err("emit conflict");
2499 assert!(
2500 matches!(
2501 err,
2502 PipelineError::Emit(EmitError::SemanticSupersessionConflict { .. })
2503 ),
2504 "expected SemanticSupersessionConflict from form 2, got {err:?}"
2505 );
2506
2507 // Form 1's edge did NOT land in self.dag.
2508 assert_eq!(pipe.dag().len(), 0, "failed batch must not leak edge");
2509
2510 // Form 1's index update did NOT land either — a fresh
2511 // follow-up commit at (alice, knows, 2024-03-01) must target
2512 // the SEED as its predecessor (not carol, which form 1 would
2513 // have made current).
2514 let clean = pipe
2515 .compile_batch(
2516 "(sem @alice @knows @eve :src @observation :c 0.8 :v 2024-03-01)",
2517 now(),
2518 )
2519 .expect("clean follow-up");
2520 assert_eq!(pipe.dag().len(), 1);
2521 let seed_memory = pipe.table.lookup("__mem_0").expect("seed mem alloc");
2522 assert_eq!(
2523 pipe.dag().edges()[0].to,
2524 seed_memory,
2525 "post-rollback commit still sees SEED as predecessor"
2526 );
2527 assert_eq!(sem_records(&clean)[0].memory_id, pipe.dag().edges()[0].from);
2528 }
2529
2530 // ----------------------------------------------------------------
2531 // 6.3b — Procedural auto-supersession
2532 // ----------------------------------------------------------------
2533
2534 const PRO_OK: &str = r#"(pro @rule_route "agent_write" "route_via_librarian"
2535 :scp @mimir :src @policy :c 1.0)"#;
2536
2537 fn pro_records(records: &[CanonicalRecord]) -> Vec<&ProRecord> {
2538 records
2539 .iter()
2540 .filter_map(|r| match r {
2541 CanonicalRecord::Pro(p) => Some(p),
2542 _ => None,
2543 })
2544 .collect()
2545 }
2546
2547 #[test]
2548 fn pro_fresh_rule_does_not_supersede() {
2549 let mut pipe = Pipeline::new();
2550 let records = pipe.compile_batch(PRO_OK, now()).expect("first pro");
2551 assert_eq!(pro_records(&records).len(), 1);
2552 assert!(supersedes_edges(&records).is_empty());
2553 assert_eq!(pipe.dag().len(), 0);
2554 }
2555
2556 #[test]
2557 fn pro_same_rule_id_triggers_supersession() {
2558 let mut pipe = Pipeline::new();
2559 let first = pipe.compile_batch(PRO_OK, now()).expect("first");
2560 let first_mem = pro_records(&first)[0].memory_id;
2561
2562 let second_input = r#"(pro @rule_route "other_trigger" "other_action"
2563 :scp @other_scope :src @policy :c 0.9)"#;
2564 let second = pipe.compile_batch(second_input, now()).expect("second");
2565 let second_mem = pro_records(&second)[0].memory_id;
2566
2567 let edges = supersedes_edges(&second);
2568 assert_eq!(edges.len(), 1, "same rule_id → one Supersedes edge");
2569 assert_eq!(edges[0].from, second_mem);
2570 assert_eq!(edges[0].to, first_mem);
2571 assert_eq!(pipe.dag().len(), 1);
2572 }
2573
2574 #[test]
2575 fn pro_same_trigger_scope_triggers_supersession() {
2576 let mut pipe = Pipeline::new();
2577 let first = pipe.compile_batch(PRO_OK, now()).expect("first");
2578 let first_mem = pro_records(&first)[0].memory_id;
2579
2580 // Different rule_id but identical trigger + scope ("agent_write" + @mimir) — § 5.2 secondary key.
2581 let second_input = r#"(pro @rule_other "agent_write" "different_action"
2582 :scp @mimir :src @policy :c 0.9)"#;
2583 let second = pipe.compile_batch(second_input, now()).expect("second");
2584 let second_mem = pro_records(&second)[0].memory_id;
2585
2586 let edges = supersedes_edges(&second);
2587 assert_eq!(edges.len(), 1, "same (trigger, scope) → one edge");
2588 assert_eq!(edges[0].from, second_mem);
2589 assert_eq!(edges[0].to, first_mem);
2590 }
2591
2592 #[test]
2593 fn pro_dual_key_match_supersedes_both_distinct_olds() {
2594 // OLD1: rule @rule_a, trigger "t_a", scope @scope_a
2595 // OLD2: rule @rule_b, trigger "t_b", scope @scope_b
2596 // NEW: rule @rule_a, trigger "t_b", scope @scope_b
2597 // NEW matches OLD1 by rule_id AND OLD2 by (trigger, scope) — both must be superseded.
2598 let mut pipe = Pipeline::new();
2599 let old1 = pipe
2600 .compile_batch(
2601 r#"(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)"#,
2602 now(),
2603 )
2604 .expect("old1");
2605 let old1_mem = pro_records(&old1)[0].memory_id;
2606 let old2 = pipe
2607 .compile_batch(
2608 r#"(pro @rule_b "t_b" "act_b" :scp @scope_b :src @policy :c 1.0)"#,
2609 now(),
2610 )
2611 .expect("old2");
2612 let old2_mem = pro_records(&old2)[0].memory_id;
2613
2614 let new_input = r#"(pro @rule_a "t_b" "act_new" :scp @scope_b :src @policy :c 1.0)"#;
2615 let new = pipe.compile_batch(new_input, now()).expect("new");
2616 let new_mem = pro_records(&new)[0].memory_id;
2617
2618 let edges = supersedes_edges(&new);
2619 assert_eq!(edges.len(), 2, "dual-key match → two Supersedes edges");
2620 let targets: std::collections::BTreeSet<_> = edges.iter().map(|e| e.to).collect();
2621 assert!(targets.contains(&old1_mem));
2622 assert!(targets.contains(&old2_mem));
2623 for e in &edges {
2624 assert_eq!(e.from, new_mem);
2625 }
2626 }
2627
2628 #[test]
2629 fn pro_duplicate_cross_batch_commit_emits_one_edge() {
2630 // Commit the exact same Pro in two successive batches. OLD
2631 // sits at both indices (rule_id + (trigger, scope)); NEW's
2632 // two lookups converge to OLD. The dedup branch collapses
2633 // them to a single Supersedes edge — the only observable
2634 // shape of "both keys converge to same old," since non-dup
2635 // Pros are inserted under both their own keys and a second
2636 // Pro can only converge both lookups onto a single
2637 // predecessor by duplicating all of its keys.
2638 let mut pipe = Pipeline::new();
2639 let _ = pipe.compile_batch(PRO_OK, now()).expect("seed");
2640 let second = pipe.compile_batch(PRO_OK, now()).expect("same again");
2641 let edges = supersedes_edges(&second);
2642 assert_eq!(edges.len(), 1, "same memory matched twice → one edge");
2643 }
2644
2645 #[test]
2646 fn pro_intra_batch_same_rule_id_is_rejected() {
2647 // Two Pro forms in the same batch with identical rule_id
2648 // would share `committed_at`, producing a zero-duration
2649 // supersession. Per the Semantic analog (equal valid_at →
2650 // SemanticSupersessionConflict), v1 surfaces this
2651 // deterministically rather than silently accepting.
2652 let mut pipe = Pipeline::new();
2653 let two_forms = r#"
2654 (pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)
2655 (pro @rule_a "t_b" "act_b" :scp @scope_b :src @policy :c 1.0)
2656 "#;
2657 let err = pipe
2658 .compile_batch(two_forms, now())
2659 .expect_err("intra-batch rule_id conflict");
2660 assert!(
2661 matches!(
2662 err,
2663 PipelineError::Emit(EmitError::ProceduralSupersessionConflict { .. })
2664 ),
2665 "expected ProceduralSupersessionConflict, got {err:?}"
2666 );
2667 // Atomicity — form 1's index + DAG mutations must have rolled back.
2668 assert_eq!(pipe.dag().len(), 0);
2669 }
2670
2671 #[test]
2672 fn pro_intra_batch_same_trigger_scope_is_rejected() {
2673 // Same sanity check for the secondary key — two Pros in one
2674 // batch sharing (trigger, scope) but with distinct rule_ids.
2675 let mut pipe = Pipeline::new();
2676 let two_forms = r#"
2677 (pro @rule_a "shared_t" "act_a" :scp @shared_scope :src @policy :c 1.0)
2678 (pro @rule_b "shared_t" "act_b" :scp @shared_scope :src @policy :c 1.0)
2679 "#;
2680 let err = pipe
2681 .compile_batch(two_forms, now())
2682 .expect_err("intra-batch (trigger, scope) conflict");
2683 assert!(matches!(
2684 err,
2685 PipelineError::Emit(EmitError::ProceduralSupersessionConflict { .. })
2686 ));
2687 assert_eq!(pipe.dag().len(), 0);
2688 }
2689
2690 #[test]
2691 fn pro_supersession_clears_old_from_both_keys() {
2692 // After NEW supersedes OLD via rule_id match, OLD must no
2693 // longer be reachable via its OTHER key either.
2694 let mut pipe = Pipeline::new();
2695 let old = pipe
2696 .compile_batch(
2697 r#"(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)"#,
2698 now(),
2699 )
2700 .expect("old");
2701 let old_mem = pro_records(&old)[0].memory_id;
2702
2703 // Supersede by rule_id.
2704 let _ = pipe
2705 .compile_batch(
2706 r#"(pro @rule_a "different" "new_act" :scp @different :src @policy :c 1.0)"#,
2707 now(),
2708 )
2709 .expect("super by rule");
2710
2711 // A third write that matches OLD's (trigger, scope) only.
2712 // OLD was cleared from both indices when superseded, so no
2713 // edge from this write should point to OLD.
2714 let third = pipe
2715 .compile_batch(
2716 r#"(pro @rule_fresh "t_a" "act_x" :scp @scope_a :src @policy :c 1.0)"#,
2717 now(),
2718 )
2719 .expect("third");
2720 for edge in supersedes_edges(&third) {
2721 assert_ne!(
2722 edge.to, old_mem,
2723 "already-superseded OLD must not be superseded again"
2724 );
2725 }
2726 }
2727}