Skip to main content

mimir_core/
pipeline.rs

1//! Librarian pipeline — end-to-end write path per
2//! `docs/concepts/librarian-pipeline.md`.
3//!
4//! The pipeline compiles agent-emitted Lisp S-expression input into
5//! canonical bytecode records by chaining the five stages:
6//!
7//! ```text
8//! &str ─► [Parse] ─► [Bind] ─► [Semantic] ─► [Emit] ─► Vec<CanonicalRecord>
9//! ```
10//!
11//! (Lex is internal to `parse::parse`, so the public entry points are
12//! unified into a single `Pipeline::compile_batch` call.)
13//!
14//! Per invariant § 11.3 (batch atomicity), a batch either commits in
15//! full or leaves no trace. `Pipeline` holds the workspace's live
16//! `SymbolTable` and a monotonic memory-ID counter; both are cloned at
17//! the start of each batch and swapped back only on full-stage success.
18//!
19//! The pipeline emits:
20//!
21//! - The four memory record kinds (`Sem`, `Epi`, `Pro`, `Inf`).
22//! - `SymbolAlloc` / `SymbolRename` / `SymbolAlias` / `SymbolRetire`
23//!   derived from the `bind` mutation journal (spec § 3.4). Symbol
24//!   events precede the memory records in the output so log replay
25//!   sees allocations before the memory records that reference them.
26//!
27//! Unsupported forms (tracked under the matching spec):
28//!
29//! - `correct` and `promote` return `EmitError::Unsupported` pending
30//!   the correction + ephemeral-promotion work on the
31//!   `temporal-model` / `episode-semantics` tracks.
32//! - `query` returns `EmitError::Unsupported` pending the
33//!   `read-protocol` track.
34//! - ML-callout interface (spec § 6) is not wired; v1 is noop-ML.
35
36use thiserror::Error;
37
38use std::collections::{BTreeMap, BTreeSet};
39
40use crate::bind::{self, BindError, SymbolMutation, SymbolTable};
41use crate::canonical::{
42    CanonicalRecord, Clocks, EdgeRecord, EpiRecord, InfFlags, InfRecord, ProRecord, SemFlags,
43    SemRecord, SymbolEventRecord,
44};
45use crate::clock::ClockTime;
46use crate::dag::{Edge as DagEdge, EdgeKind, SupersessionDag};
47use crate::parse::{self, ParseError};
48use crate::semantic::{self, SemanticError, ValidatedForm};
49use crate::symbol::{SymbolId, SymbolKind};
50
51/// The librarian pipeline — single-writer compiler from Lisp S-expression
52/// input to canonical bytecode.
53///
54/// Holds the workspace's symbol table, memory-ID counter, monotonic
55/// `committed_at` watermark, supersession DAG, and current-state
56/// supersession index; mutations to all five are batch-atomic
57/// (invariant § 11.3, plus `temporal-model.md` § 12 #1 and § 6.2 #1).
58///
59/// Does not derive `Eq` because the stored record vectors carry
60/// `Value`, which contains `f64` and therefore is only `PartialEq`.
61#[derive(Clone, Debug, Default, PartialEq)]
62pub struct Pipeline {
63    table: SymbolTable,
64    next_memory_counter: u64,
65    /// Highest `committed_at` assigned by this pipeline so far, or
66    /// `None` if no batch has committed yet. Per `temporal-model.md`
67    /// § 9.2, the next commit clock is `max(wall_now, self + 1)`.
68    last_committed_at: Option<ClockTime>,
69    /// Workspace-scoped supersession graph. Extended at emit time by
70    /// auto-supersession writes (temporal-model.md § 5).
71    dag: SupersessionDag,
72    /// Current-state index for supersession detection (§ 5).
73    supersession_index: SupersessionIndex,
74    /// Every Semantic memory ever emitted (or replayed from the log),
75    /// in commit order. Feeds the as-of query resolver
76    /// (`temporal-model.md` § 7).
77    semantic_records: Vec<SemRecord>,
78    /// Secondary index over `semantic_records`: `(s, p) → indices`.
79    /// Per `read-protocol.md` § 3.1 the current-state index makes
80    /// single-predicate Semantic lookups O(k) in the size of the
81    /// `(s, p)` history (typically 1–3) instead of O(n) in the
82    /// whole store. The resolver consults this index when `:s` and
83    /// `:p` are both pinned; other paths still scan.
84    semantic_by_sp_history: BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
85    /// Every Episodic memory ever emitted or replayed, in commit
86    /// order. Episodic records do not currently participate in
87    /// supersession, but retaining them lets downstream tooling
88    /// perform exact duplicate checks and future event-oriented reads.
89    episodic_records: Vec<EpiRecord>,
90    /// Every Procedural memory ever emitted or replayed, in commit
91    /// order. Same contract as `semantic_records`.
92    procedural_records: Vec<ProRecord>,
93    /// Secondary index over `procedural_records`: `rule_id → indices`.
94    /// Powers O(k) lookup for `:kind pro` reads once the `rule_id`
95    /// read predicate is wired; for now the resolver iterates the
96    /// full vec from the `resolve_procedural` entry point.
97    procedural_by_rule_history: BTreeMap<SymbolId, Vec<usize>>,
98    /// Every Inferential memory ever emitted or replayed, in commit
99    /// order. Feeds the Inferential resolver per `temporal-model.md`
100    /// § 5.4 + `read-protocol.md` § 3.1 (Inf keyed by `(s, p)` like
101    /// Sem; re-derivation with same `(s, p)` + later `valid_at`
102    /// supersedes the prior).
103    inferential_records: Vec<InfRecord>,
104    /// Secondary index over `inferential_records`: `(s, p) → indices`.
105    /// Mirrors `semantic_by_sp_history`; lets the resolver walk only
106    /// the records at a specific `(s, p)` on a pinned-subject-and-
107    /// predicate read.
108    inferential_by_sp_history: BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
109    /// Decay parameters used by the read path to compute effective
110    /// confidence (`confidence-decay.md`). Defaults to the librarian's
111    /// v1 parameter table; `Store::open` will eventually load an
112    /// `mimir.toml` override and call [`Pipeline::set_decay_config`].
113    decay_config: crate::decay::DecayConfig,
114    /// Committed-at clock for each Episode the pipeline has seen.
115    /// Populated by [`Pipeline::register_episode`] — `Store` calls
116    /// this after a successful `commit_batch` and during log replay
117    /// when a `Checkpoint` record is seen. Used by the read path's
118    /// `:in_episode` / `:after_episode` / `:before_episode`
119    /// predicates (`read-protocol.md` § 4.1).
120    episode_committed_at: BTreeMap<SymbolId, ClockTime>,
121    /// Parent Episode for each Episode that declared one via
122    /// `(episode :start :parent_episode @E)`. Populated by
123    /// [`Pipeline::register_episode_parent`]. Backs the
124    /// `:episode_chain @E` read predicate
125    /// (`read-protocol.md` § 4.1 / `episode-semantics.md` § 5.1).
126    episode_parent: BTreeMap<SymbolId, SymbolId>,
127    /// Metadata captured from an `(episode :start ...)` form during
128    /// the most recent `compile_batch`. Consumed by the store at
129    /// commit time — see [`Pipeline::take_pending_episode_metadata`].
130    pending_episode_metadata: Option<PendingEpisodeMetadata>,
131    /// Memories currently pinned (`confidence-decay.md` § 7).
132    /// Suspends decay at read time via `DecayFlags::pinned`.
133    pinned_memories: BTreeSet<SymbolId>,
134    /// Memories currently flagged operator-authoritative
135    /// (`confidence-decay.md` § 8). Also suspends decay; populates
136    /// `Framing::Authoritative { set_by: OperatorAuthoritative }`.
137    authoritative_memories: BTreeSet<SymbolId>,
138    /// Reverse parent index for Inferential memories: maps a parent
139    /// memory's `SymbolId` to the list of Inferentials that derived
140    /// from it (`temporal-model.md` § 5.4). Populated by
141    /// [`replay_memory_record`] on Inferential records and by emit.
142    /// Used when a Semantic or Procedural parent gets auto-
143    /// superseded — the index tells us which Inferentials to attach
144    /// `StaleParent` edges to. O(log n) lookup rather than scanning
145    /// `semantic_records` / `procedural_records` for every
146    /// supersession.
147    inferentials_by_parent: BTreeMap<SymbolId, Vec<SymbolId>>,
148}
149
150/// Episode metadata captured from an `(episode :start …)` form in
151/// the write surface. Flows out of `compile_batch` via
152/// [`Pipeline::take_pending_episode_metadata`] so the store layer
153/// can attach it to the Episode it's about to emit.
154///
155/// An `(episode :close)` form carries no metadata; the pipeline
156/// still produces `Some(default)` to signal "batch carried an
157/// explicit Episode directive" separately from the no-directive
158/// case.
159#[derive(Clone, Debug, Default, PartialEq, Eq)]
160pub struct PendingEpisodeMetadata {
161    /// Label from `:label`.
162    pub label: Option<String>,
163    /// Parent from `:parent_episode`.
164    pub parent_episode: Option<SymbolId>,
165    /// Retracted Episodes from `:retracts (@E1 …)`.
166    pub retracts: Vec<SymbolId>,
167}
168
169/// Current-state index used by auto-supersession detection.
170///
171/// For each memory-type supersession key (per `temporal-model.md` § 5),
172/// tracks the currently-authoritative memory so new writes can look up
173/// their predecessor in O(log n). The index is rebuilt from the log at
174/// `Store::open`.
175///
176/// Scope: Semantic (§ 5.1), Procedural (§ 5.2), and Inferential
177/// (§ 5.4) supersession on re-derivation. Episodic is out of scope
178/// (no auto-supersession per § 5.3).
179#[derive(Clone, Debug, Default, PartialEq, Eq)]
180struct SupersessionIndex {
181    /// `(subject, predicate) -> (memory_id, valid_at)` for the current
182    /// Semantic memory at that `(s, p)` — `None` if no live memory.
183    semantic_by_sp: BTreeMap<(SymbolId, SymbolId), CurrentSemantic>,
184    /// `(subject, predicate) -> (memory_id, valid_at)` for the current
185    /// Inferential memory at that `(s, p)` per `temporal-model.md`
186    /// § 5.4 (Inf re-derivation supersession mirrors Sem § 5.1). Reuses
187    /// [`CurrentSemantic`] shape — identical `(memory_id, valid_at)`
188    /// record — to avoid a duplicate struct for the same data.
189    inferential_by_sp: BTreeMap<(SymbolId, SymbolId), CurrentSemantic>,
190    /// `rule_id -> (memory_id, committed_at)` for current Procedural
191    /// memories keyed by rule identifier (§ 5.2 primary key).
192    procedural_by_rule: BTreeMap<SymbolId, CurrentProcedural>,
193    /// `(canonical(trigger), scope) -> (memory_id, committed_at)` for
194    /// current Procedural memories keyed by the `(trigger, scope)`
195    /// pair (§ 5.2 secondary key). `Value` has no `Ord` impl because
196    /// of `f64`, so the trigger is keyed by its canonical-byte
197    /// encoding (stable within a process; not persisted).
198    procedural_by_trigger_scope: BTreeMap<(Vec<u8>, SymbolId), CurrentProcedural>,
199    /// Reverse index from `memory_id` back to the two Procedural
200    /// index keys it occupies. Used during supersession to clear out
201    /// the OTHER key of a memory that was matched via only one key
202    /// (spec § 5.2 invalidates on either key matching).
203    procedural_keys_by_memory: BTreeMap<SymbolId, ProceduralKeys>,
204}
205
206/// Index entry for a currently-authoritative Semantic memory.
207#[derive(Copy, Clone, Debug, PartialEq, Eq)]
208struct CurrentSemantic {
209    memory_id: SymbolId,
210    valid_at: ClockTime,
211}
212
213/// Safety cap on Episode-chain traversal. Parent cycles cannot
214/// form via the write path (parent must already be committed before
215/// a child declares it) but replay of a corrupted log could present
216/// one; the cap keeps `episode_chain` bounded in pathological cases.
217pub const MAX_EPISODE_CHAIN_DEPTH: usize = 1024;
218
219/// The two index keys a Procedural memory occupies.
220#[derive(Clone, Debug, PartialEq, Eq)]
221struct ProceduralKeys {
222    rule_id: SymbolId,
223    trigger_scope: (Vec<u8>, SymbolId),
224}
225
226/// Index entry for a currently-authoritative Procedural memory.
227/// Tracks `committed_at` so the emit path can detect intra-batch
228/// conflicts — two Pro writes in the same batch with overlapping
229/// supersession keys share `committed_at`, producing a
230/// zero-duration "supersession" that's almost certainly an agent
231/// bug.
232#[derive(Copy, Clone, Debug, PartialEq, Eq)]
233struct CurrentProcedural {
234    memory_id: SymbolId,
235    committed_at: ClockTime,
236}
237
238impl Pipeline {
239    /// Construct a pipeline with an empty symbol table.
240    #[must_use]
241    pub fn new() -> Self {
242        Self::default()
243    }
244
245    /// Read-only view of the workspace symbol table.
246    #[must_use]
247    pub fn table(&self) -> &SymbolTable {
248        &self.table
249    }
250
251    /// Read-only view of the decay parameters used by the read path.
252    /// Defaults to [`crate::decay::DecayConfig::librarian_defaults`];
253    /// the store layer installs per-workspace overrides when an
254    /// `mimir.toml` is present.
255    #[must_use]
256    pub fn decay_config(&self) -> &crate::decay::DecayConfig {
257        &self.decay_config
258    }
259
260    /// Replace the decay parameters used by the read path. Intended
261    /// for the store layer to wire `mimir.toml` overrides in at
262    /// `Store::open`; tests may also call this directly.
263    pub fn set_decay_config(&mut self, cfg: crate::decay::DecayConfig) {
264        self.decay_config = cfg;
265    }
266
267    /// Replay a `SYMBOL_ALLOC` record into the pipeline's symbol table.
268    /// Thin pass-through to [`SymbolTable::replay_allocate`]; exposed
269    /// for [`Store::open`](crate::store::Store::open).
270    ///
271    /// # Errors
272    ///
273    /// Propagates [`BindError`] variants from the underlying
274    /// `SymbolTable::replay_allocate` call.
275    pub fn replay_allocate(
276        &mut self,
277        id: SymbolId,
278        name: String,
279        kind: SymbolKind,
280    ) -> Result<(), BindError> {
281        self.table.replay_allocate(id, name, kind)
282    }
283
284    /// Replay a `SYMBOL_ALIAS` record.
285    ///
286    /// # Errors
287    ///
288    /// Propagates [`BindError`] variants from the underlying
289    /// `SymbolTable::replay_alias` call.
290    pub fn replay_alias(&mut self, id: SymbolId, alias: String) -> Result<(), BindError> {
291        self.table.replay_alias(id, alias)
292    }
293
294    /// Replay a `SYMBOL_RENAME` record.
295    ///
296    /// # Errors
297    ///
298    /// Propagates [`BindError`] variants from the underlying
299    /// `SymbolTable::replay_rename` call.
300    pub fn replay_rename(&mut self, id: SymbolId, new_canonical: String) -> Result<(), BindError> {
301        self.table.replay_rename(id, new_canonical)
302    }
303
304    /// Replay a `SYMBOL_RETIRE` record.
305    ///
306    /// # Errors
307    ///
308    /// Propagates [`BindError`] variants from the underlying
309    /// `SymbolTable::replay_retire` call.
310    pub fn replay_retire(&mut self, id: SymbolId, name: String) -> Result<(), BindError> {
311        self.table.replay_retire(id, name)
312    }
313
314    /// Set the pipeline's memory-ID counter. Used by
315    /// [`Store::open`](crate::store::Store::open) to restore the
316    /// counter from durable state.
317    pub fn set_next_memory_counter(&mut self, counter: u64) {
318        self.next_memory_counter = counter;
319    }
320
321    /// Read the pipeline's memory-ID counter — the value that the
322    /// next `allocate_memory_id` call will synthesise into
323    /// `__mem_{n}`. Exposed for tests and for inspection tooling.
324    #[must_use]
325    pub fn next_memory_counter(&self) -> u64 {
326        self.next_memory_counter
327    }
328
329    /// The highest `committed_at` this pipeline has assigned, or
330    /// `None` before the first successful batch. Per `temporal-model.md`
331    /// § 9.2 / § 12 invariant #1, every successful commit must exceed
332    /// this value; the pipeline enforces that internally via
333    /// `max(wall_now, last_committed_at + 1)`.
334    #[must_use]
335    pub fn last_committed_at(&self) -> Option<ClockTime> {
336        self.last_committed_at
337    }
338
339    /// Advance the pipeline's monotonic commit watermark to `at`.
340    ///
341    /// Used by [`Store::open`](crate::store::Store) during replay to
342    /// restore the watermark from the highest `committed_at` durably
343    /// recorded in the log. No-op if `at <= last_committed_at`.
344    pub fn advance_last_committed_at(&mut self, at: ClockTime) {
345        if self.last_committed_at.is_none_or(|prev| at > prev) {
346            self.last_committed_at = Some(at);
347        }
348    }
349
350    /// Read-only view of the supersession DAG.
351    #[must_use]
352    pub fn dag(&self) -> &SupersessionDag {
353        &self.dag
354    }
355
356    /// Every Semantic memory this pipeline has committed or replayed,
357    /// in commit order. Consumed by the as-of resolver
358    /// (`crate::resolver`) per `temporal-model.md` § 7.
359    #[must_use]
360    pub fn semantic_records(&self) -> &[SemRecord] {
361        &self.semantic_records
362    }
363
364    /// Every Episodic memory this pipeline has committed or replayed,
365    /// in commit order.
366    #[must_use]
367    pub fn episodic_records(&self) -> &[EpiRecord] {
368        &self.episodic_records
369    }
370
371    /// Every Procedural memory this pipeline has committed or
372    /// replayed, in commit order.
373    #[must_use]
374    pub fn procedural_records(&self) -> &[ProRecord] {
375        &self.procedural_records
376    }
377
378    /// Indices in [`Pipeline::semantic_records`] of every Semantic
379    /// record ever emitted at `(s, p)`. Returns an empty slice when
380    /// the pair has no history. O(log n) lookup; the resolver uses
381    /// this to avoid scanning the full record history.
382    #[must_use]
383    pub fn semantic_history_at(&self, s: SymbolId, p: SymbolId) -> &[usize] {
384        self.semantic_by_sp_history
385            .get(&(s, p))
386            .map_or(&[], Vec::as_slice)
387    }
388
389    /// Indices in [`Pipeline::procedural_records`] of every
390    /// Procedural record ever emitted under `rule_id`. Returns an
391    /// empty slice when the rule has no history.
392    #[must_use]
393    pub fn procedural_history_for(&self, rule_id: SymbolId) -> &[usize] {
394        self.procedural_by_rule_history
395            .get(&rule_id)
396            .map_or(&[], Vec::as_slice)
397    }
398
399    /// Every Inferential memory this pipeline has committed or
400    /// replayed, in commit order. Consumed by the Inferential resolver
401    /// (`crate::resolver::resolve_inferential`) per `temporal-model.md`
402    /// § 5.4 — Inf is keyed by `(s, p)` like Sem, with re-derivation
403    /// as the auto-supersession trigger.
404    #[must_use]
405    pub fn inferential_records(&self) -> &[InfRecord] {
406        &self.inferential_records
407    }
408
409    /// Indices in [`Pipeline::inferential_records`] of every
410    /// Inferential record ever emitted at `(s, p)`. Returns an empty
411    /// slice when the pair has no history. Mirrors
412    /// [`Pipeline::semantic_history_at`].
413    #[must_use]
414    pub fn inferential_history_at(&self, s: SymbolId, p: SymbolId) -> &[usize] {
415        self.inferential_by_sp_history
416            .get(&(s, p))
417            .map_or(&[], Vec::as_slice)
418    }
419
420    /// Register that Episode `episode_id` committed at `at`. Called
421    /// by [`crate::store::Store`] after a successful commit and
422    /// during log replay for every `Checkpoint` record. The mapping
423    /// backs the read path's Episode-scoped predicates per
424    /// `read-protocol.md` § 4.1 — an `(s, p)` → `(episode_id, at)`
425    /// mapping lets `:in_episode @E` resolve by comparing a
426    /// candidate's `committed_at` against `@E`'s registered clock.
427    ///
428    /// Redundant registrations (same id + same clock) are a no-op;
429    /// id collisions with a different clock are ignored on the
430    /// assumption that replay walks records in order and the first
431    /// registration wins.
432    pub fn register_episode(&mut self, episode_id: SymbolId, at: ClockTime) {
433        self.episode_committed_at.entry(episode_id).or_insert(at);
434    }
435
436    /// The `committed_at` clock Episode `episode_id` was committed at,
437    /// or `None` if the pipeline has not seen that Episode. See
438    /// [`Pipeline::register_episode`].
439    #[must_use]
440    pub fn episode_committed_at(&self, episode_id: SymbolId) -> Option<ClockTime> {
441        self.episode_committed_at.get(&episode_id).copied()
442    }
443
444    /// Iterate every Episode the pipeline has registered as
445    /// `(episode_id, committed_at)` pairs.
446    ///
447    /// Iteration order is **unspecified** (the underlying storage is a
448    /// `HashMap`); callers that need a stable order — e.g. paginated
449    /// listing for `mimir-mcp::mimir_list_episodes` — must collect
450    /// and sort. Sorting by `committed_at` is the canonical UI choice
451    /// since it matches the durability-order on disk.
452    pub fn iter_episodes(&self) -> impl Iterator<Item = (SymbolId, ClockTime)> + '_ {
453        self.episode_committed_at.iter().map(|(id, at)| (*id, *at))
454    }
455
456    /// Record that Episode `child` has `parent` as its parent Episode.
457    /// Called by the store after a batch with `(episode :start
458    /// :parent_episode @E)` metadata, and during replay when an
459    /// `EpisodeMeta` record carries `parent_episode_id`. Idempotent
460    /// on duplicate calls with the same parent; a conflicting parent
461    /// on an Episode already registered is ignored (first-write wins,
462    /// matching replay's append-only semantics).
463    pub fn register_episode_parent(&mut self, child: SymbolId, parent: SymbolId) {
464        self.episode_parent.entry(child).or_insert(parent);
465    }
466
467    /// The parent Episode of `episode_id`, or `None` if the Episode
468    /// has no parent (or the pipeline has not seen it).
469    #[must_use]
470    pub fn episode_parent(&self, episode_id: SymbolId) -> Option<SymbolId> {
471        self.episode_parent.get(&episode_id).copied()
472    }
473
474    /// Take the metadata captured from the most recent
475    /// `compile_batch`'s `(episode :start …)` form, if any. Clears
476    /// the pending slot so a subsequent batch without a directive
477    /// doesn't reuse stale metadata.
478    pub fn take_pending_episode_metadata(&mut self) -> Option<PendingEpisodeMetadata> {
479        self.pending_episode_metadata.take()
480    }
481
482    /// `true` if `memory_id` is currently pinned
483    /// (`confidence-decay.md` § 7). Pinned memories skip decay.
484    #[must_use]
485    pub fn is_pinned(&self, memory_id: SymbolId) -> bool {
486        self.pinned_memories.contains(&memory_id)
487    }
488
489    /// `true` if `memory_id` is currently flagged
490    /// operator-authoritative (`confidence-decay.md` § 8).
491    /// Authoritative memories skip decay.
492    #[must_use]
493    pub fn is_authoritative(&self, memory_id: SymbolId) -> bool {
494        self.authoritative_memories.contains(&memory_id)
495    }
496
497    /// Replay a `Pin` / `Unpin` / `AuthoritativeSet` /
498    /// `AuthoritativeClear` flag event from the canonical log.
499    /// Called by [`crate::store::Store::open`] during recovery so
500    /// the pipeline's pin / authoritative sets reflect the durable
501    /// state.
502    pub fn replay_flag(&mut self, record: &CanonicalRecord) {
503        match record {
504            CanonicalRecord::Pin(r) => {
505                self.pinned_memories.insert(r.memory_id);
506            }
507            CanonicalRecord::Unpin(r) => {
508                self.pinned_memories.remove(&r.memory_id);
509            }
510            CanonicalRecord::AuthoritativeSet(r) => {
511                self.authoritative_memories.insert(r.memory_id);
512            }
513            CanonicalRecord::AuthoritativeClear(r) => {
514                self.authoritative_memories.remove(&r.memory_id);
515            }
516            _ => {} // not a flag event — no-op
517        }
518    }
519
520    /// Walk the Episode chain from `episode_id` up through parents.
521    /// Yields `episode_id` first, then its parent, grandparent, and
522    /// so on. Bounded by [`MAX_EPISODE_CHAIN_DEPTH`] to guard
523    /// against pathological or corrupt parent cycles (the binder
524    /// rejects cycles at write time, but replay of a corrupted log
525    /// might still present one).
526    pub fn episode_chain(&self, episode_id: SymbolId) -> impl Iterator<Item = SymbolId> + '_ {
527        let mut current = Some(episode_id);
528        let mut depth = 0_usize;
529        std::iter::from_fn(move || {
530            if depth >= MAX_EPISODE_CHAIN_DEPTH {
531                return None;
532            }
533            let id = current?;
534            depth += 1;
535            current = self.episode_parent.get(&id).copied();
536            Some(id)
537        })
538    }
539
540    /// Replay one edge from the canonical log into the supersession
541    /// DAG. Called by [`Store::open`](crate::store::Store) during
542    /// recovery; the acyclicity check still runs, so a log with a
543    /// corruption-introduced cycle surfaces as an error rather than a
544    /// silent invariant violation.
545    ///
546    /// # Errors
547    ///
548    /// Propagates [`DagError`](crate::dag::DagError) variants from
549    /// `SupersessionDag::add_edge`.
550    pub fn replay_edge(&mut self, edge: crate::dag::Edge) -> Result<(), crate::dag::DagError> {
551        self.dag.add_edge(edge)
552    }
553
554    /// Replay a canonical record into the pipeline's
555    /// supersession-detection indices. Idempotent when records are
556    /// applied in log order; a replayed forward-supersession Sem
557    /// record replaces the prior `(s, p)` entry, a retroactive Sem
558    /// record does not; a Procedural record matching either
559    /// supersession key (§ 5.2) clears the prior entries and inserts
560    /// under both of its keys.
561    ///
562    /// Non-memory records are ignored; callers can pass every record
563    /// and let this method filter internally.
564    ///
565    /// Scope: Semantic + Procedural. Inferential staling (§ 5.4) is
566    /// tracked in issue #29.
567    pub fn replay_memory_record(&mut self, record: &CanonicalRecord) {
568        match record {
569            CanonicalRecord::Sem(sem) => {
570                let key = (sem.s, sem.p);
571                let sem_index = self.semantic_records.len();
572                self.semantic_records.push(sem.clone());
573                self.semantic_by_sp_history
574                    .entry(key)
575                    .or_default()
576                    .push(sem_index);
577                let replace = self
578                    .supersession_index
579                    .semantic_by_sp
580                    .get(&key)
581                    .is_none_or(|existing| sem.clocks.valid_at > existing.valid_at);
582                if replace {
583                    self.supersession_index.semantic_by_sp.insert(
584                        key,
585                        CurrentSemantic {
586                            memory_id: sem.memory_id,
587                            valid_at: sem.clocks.valid_at,
588                        },
589                    );
590                }
591            }
592            CanonicalRecord::Epi(epi) => {
593                self.episodic_records.push(epi.clone());
594            }
595            CanonicalRecord::Inf(inf) => {
596                // Keep the reverse parent index in sync on replay
597                // so post-open supersessions can emit StaleParent
598                // edges against already-committed Inferentials
599                // (temporal-model.md § 5.4).
600                for parent in &inf.derived_from {
601                    self.inferentials_by_parent
602                        .entry(*parent)
603                        .or_default()
604                        .push(inf.memory_id);
605                }
606                // Record + history for the resolver. Mirror Sem: push
607                // the record in commit order, record its index in the
608                // `(s, p)` history, and update the current-state
609                // supersession index so post-open writes can auto-
610                // supersede. Replay skips the conflict check — the
611                // original write time rejected any such conflict and
612                // could not have reached the log.
613                let inf_index = self.inferential_records.len();
614                let key = (inf.s, inf.p);
615                let valid_at = inf.clocks.valid_at;
616                self.inferential_records.push(inf.clone());
617                self.inferential_by_sp_history
618                    .entry(key)
619                    .or_default()
620                    .push(inf_index);
621                let replace = self
622                    .supersession_index
623                    .inferential_by_sp
624                    .get(&key)
625                    .is_none_or(|existing| valid_at > existing.valid_at);
626                if replace {
627                    self.supersession_index.inferential_by_sp.insert(
628                        key,
629                        CurrentSemantic {
630                            memory_id: inf.memory_id,
631                            valid_at,
632                        },
633                    );
634                }
635            }
636            CanonicalRecord::Pro(pro) => {
637                let pro_index = self.procedural_records.len();
638                self.procedural_records.push(pro.clone());
639                self.procedural_by_rule_history
640                    .entry(pro.rule_id)
641                    .or_default()
642                    .push(pro_index);
643                // Log replay path — skip the intra-batch-conflict
644                // check because any such conflict was already
645                // rejected at the original write time (and therefore
646                // couldn't have reached the log). Returned list of
647                // superseded memories is discarded: replay doesn't
648                // emit edges, the log already carries them as
649                // separate `Supersedes` records that `replay_edge`
650                // handles.
651                replay_procedural_supersession(
652                    &mut self.supersession_index,
653                    pro.memory_id,
654                    pro.clocks.committed_at,
655                    pro.rule_id,
656                    &pro.trigger,
657                    pro.scope,
658                );
659            }
660            _ => {}
661        }
662    }
663
664    /// Allocate a fresh `__ep_{counter}` symbol for use as a
665    /// `CHECKPOINT` record's `episode_id`. The synthesized name follows
666    /// the same reserved-prefix convention as `__mem_{n}` (see
667    /// `allocate_memory_id`'s collision-handling note). Used by the
668    /// [`Store`](crate::store::Store) commit path; exposed as `pub`
669    /// because `Store` composes `Pipeline` rather than inheriting from
670    /// it.
671    ///
672    /// # Errors
673    ///
674    /// - [`EmitError::MemoryIdAllocation`] if the synthesized name
675    ///   collides with an existing symbol (same pathological-agent
676    ///   case as memory-ID allocation).
677    pub fn allocate_episode_symbol(&mut self, counter: u64) -> Result<SymbolId, EmitError> {
678        let name = format!("__ep_{counter}");
679        self.table
680            .allocate(name.clone(), SymbolKind::Memory)
681            .map_err(|cause| EmitError::MemoryIdAllocation { name, cause })
682    }
683
684    /// Compile one batch of agent input into canonical records.
685    ///
686    /// `wall_now` is the librarian's host wall clock at the start of
687    /// this batch; injected for determinism (tests pass a fixed
688    /// `ClockTime`). The batch's `committed_at` is derived from
689    /// `wall_now` via the monotonic rule in `temporal-model.md` § 9.2:
690    /// `effective_now = max(wall_now, last_committed_at + 1)`. All
691    /// memory records within the batch share that same `effective_now`
692    /// as their `committed_at` and — for non-Episodic kinds — their
693    /// `observed_at`.
694    ///
695    /// Future-validity rejection (semantic stage) uses the raw
696    /// `wall_now`, not the monotonic watermark, so a transiently
697    /// inflated watermark cannot relax the "no future writes without
698    /// `:projected true`" rule.
699    ///
700    /// # Errors
701    ///
702    /// Any stage may return a [`PipelineError`]; on error the workspace
703    /// state (symbol table, memory counter, and commit watermark) is
704    /// untouched. [`PipelineError::ClockExhausted`] fires if the
705    /// monotonic bump would reach the reserved `u64::MAX` sentinel.
706    ///
707    /// # Example
708    ///
709    /// ```
710    /// # #![allow(clippy::unwrap_used)]
711    /// use mimir_core::pipeline::Pipeline;
712    /// use mimir_core::ClockTime;
713    ///
714    /// let mut pipe = Pipeline::new();
715    /// let now = ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel");
716    /// let input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
717    /// let records = pipe.compile_batch(input, now).unwrap();
718    /// // The batch emits a `Sem` memory record preceded by `SymbolAlloc`
719    /// // records for every first-use symbol name.
720    /// assert!(records.iter().any(|r| matches!(r, mimir_core::canonical::CanonicalRecord::Sem(_))));
721    /// ```
722    pub fn compile_batch(
723        &mut self,
724        input: &str,
725        wall_now: ClockTime,
726    ) -> Result<Vec<CanonicalRecord>, PipelineError> {
727        // observability.md: `mimir.pipeline.compile_batch` span. Fields
728        // are `Empty` until the emit stage finishes so the span still
729        // records timing on error paths (counts stay unset).
730        let span = tracing::info_span!(
731            "mimir.pipeline.compile_batch",
732            input_len = input.len(),
733            record_count = tracing::field::Empty,
734            memory_count = tracing::field::Empty,
735            edge_count = tracing::field::Empty,
736        );
737        let _enter = span.enter();
738
739        let forms = parse::parse(input).map_err(PipelineError::Parse)?;
740
741        // Compute the batch's `committed_at` before running stateful
742        // stages so a clock-exhaustion error cannot leave partial work.
743        let effective_now = monotonic_commit_clock(wall_now, self.last_committed_at)?;
744
745        // Clone live state so mid-batch failures cannot leak partial
746        // mutations. Every state field the emit stage may touch lands
747        // here so full-batch rollback is trivial — drop the working
748        // copies and leave `self` untouched.
749        let mut working_table = self.table.clone();
750        let mut working_counter = self.next_memory_counter;
751        let mut working_dag = self.dag.clone();
752        let mut working_index = self.supersession_index.clone();
753        let mut working_sem_records = self.semantic_records.clone();
754        let mut working_sem_by_sp = self.semantic_by_sp_history.clone();
755        let mut working_epi_records = self.episodic_records.clone();
756        let mut working_pro_records = self.procedural_records.clone();
757        let mut working_pro_by_rule = self.procedural_by_rule_history.clone();
758        let mut working_inf_records = self.inferential_records.clone();
759        let mut working_inf_by_sp = self.inferential_by_sp_history.clone();
760
761        let (bound, journal) =
762            bind::bind(forms, &mut working_table).map_err(PipelineError::Bind)?;
763
764        // Semantic validation (future-validity rejection) uses the raw
765        // wall clock, which is the librarian's estimate of "now" for
766        // judging agent clock skew — not the monotonic watermark, which
767        // may transiently lead the wall clock after a regression.
768        let validated =
769            semantic::validate(bound, &working_table, wall_now).map_err(PipelineError::Semantic)?;
770
771        let mut working_pending_meta: Option<PendingEpisodeMetadata> = None;
772        let mut working_pinned = self.pinned_memories.clone();
773        let mut working_authoritative = self.authoritative_memories.clone();
774        let mut working_infs_by_parent = self.inferentials_by_parent.clone();
775        let mut emit_state = EmitState {
776            table: &mut working_table,
777            counter: &mut working_counter,
778            dag: &mut working_dag,
779            index: &mut working_index,
780            semantic_records: &mut working_sem_records,
781            semantic_by_sp: &mut working_sem_by_sp,
782            episodic_records: &mut working_epi_records,
783            procedural_records: &mut working_pro_records,
784            procedural_by_rule: &mut working_pro_by_rule,
785            inferential_records: &mut working_inf_records,
786            inferential_by_sp: &mut working_inf_by_sp,
787            pending_episode: &mut working_pending_meta,
788            pinned: &mut working_pinned,
789            authoritative: &mut working_authoritative,
790            inferentials_by_parent: &mut working_infs_by_parent,
791            now: effective_now,
792        };
793        let records = emit(&validated, &journal, &mut emit_state).map_err(PipelineError::Emit)?;
794
795        // All stages succeeded — commit.
796        self.table = working_table;
797        self.next_memory_counter = working_counter;
798        self.last_committed_at = Some(effective_now);
799        self.dag = working_dag;
800        self.supersession_index = working_index;
801        self.semantic_records = working_sem_records;
802        self.semantic_by_sp_history = working_sem_by_sp;
803        self.episodic_records = working_epi_records;
804        self.procedural_records = working_pro_records;
805        self.procedural_by_rule_history = working_pro_by_rule;
806        self.inferential_records = working_inf_records;
807        self.inferential_by_sp_history = working_inf_by_sp;
808        self.pending_episode_metadata = working_pending_meta;
809        self.pinned_memories = working_pinned;
810        self.authoritative_memories = working_authoritative;
811        self.inferentials_by_parent = working_infs_by_parent;
812
813        let (memory_count, edge_count) = count_memory_and_edge_records(&records);
814        span.record("record_count", records.len());
815        span.record("memory_count", memory_count);
816        span.record("edge_count", edge_count);
817
818        Ok(records)
819    }
820}
821
822/// `(memory_count, edge_count)` tally over a record batch — identifiers
823/// only, no payload inspection. Consumed by the
824/// `mimir.pipeline.compile_batch` span.
825fn count_memory_and_edge_records(records: &[CanonicalRecord]) -> (usize, usize) {
826    let mut memory = 0_usize;
827    let mut edge = 0_usize;
828    for r in records {
829        match r {
830            CanonicalRecord::Sem(_)
831            | CanonicalRecord::Epi(_)
832            | CanonicalRecord::Pro(_)
833            | CanonicalRecord::Inf(_) => memory += 1,
834            CanonicalRecord::Supersedes(_)
835            | CanonicalRecord::Corrects(_)
836            | CanonicalRecord::StaleParent(_)
837            | CanonicalRecord::Reconfirms(_) => edge += 1,
838            _ => {}
839        }
840    }
841    (memory, edge)
842}
843
844/// Mutable write-time state threaded through the emit stage.
845///
846/// Bundles every field that `emit` / `emit_form` may mutate so the
847/// function signatures don't grow a parameter for each. Individual
848/// helpers borrow what they need.
849struct EmitState<'a> {
850    table: &'a mut SymbolTable,
851    counter: &'a mut u64,
852    dag: &'a mut SupersessionDag,
853    index: &'a mut SupersessionIndex,
854    semantic_records: &'a mut Vec<SemRecord>,
855    /// `(s, p) -> indices` over `semantic_records`; see
856    /// [`Pipeline::semantic_by_sp_history`].
857    semantic_by_sp: &'a mut BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
858    episodic_records: &'a mut Vec<EpiRecord>,
859    procedural_records: &'a mut Vec<ProRecord>,
860    /// `rule_id -> indices` over `procedural_records`; see
861    /// [`Pipeline::procedural_by_rule_history`].
862    procedural_by_rule: &'a mut BTreeMap<SymbolId, Vec<usize>>,
863    inferential_records: &'a mut Vec<InfRecord>,
864    /// `(s, p) -> indices` over `inferential_records`; see
865    /// [`Pipeline::inferential_history_at`].
866    inferential_by_sp: &'a mut BTreeMap<(SymbolId, SymbolId), Vec<usize>>,
867    /// Pending batch-level Episode metadata from an
868    /// `(episode :start …)` form, threaded through so emit can
869    /// populate and `compile_batch` can commit. Rolled back with the
870    /// rest of working state on failure.
871    pending_episode: &'a mut Option<PendingEpisodeMetadata>,
872    /// Pinned-memory set (`confidence-decay.md` § 7).
873    pinned: &'a mut BTreeSet<SymbolId>,
874    /// Operator-authoritative memory set
875    /// (`confidence-decay.md` § 8).
876    authoritative: &'a mut BTreeSet<SymbolId>,
877    /// Reverse parent index for Inferential staling
878    /// (`temporal-model.md` § 5.4).
879    inferentials_by_parent: &'a mut BTreeMap<SymbolId, Vec<SymbolId>>,
880    now: ClockTime,
881}
882
883/// Compute the batch commit clock per `temporal-model.md` § 9.2:
884/// `max(wall_now, last_committed_at + 1)`. Returns
885/// [`PipelineError::ClockExhausted`] if bumping past `last_committed_at`
886/// would reach the `u64::MAX` sentinel that [`ClockTime`] refuses.
887fn monotonic_commit_clock(
888    wall_now: ClockTime,
889    last_committed_at: Option<ClockTime>,
890) -> Result<ClockTime, PipelineError> {
891    let Some(prev) = last_committed_at else {
892        return Ok(wall_now);
893    };
894    if wall_now > prev {
895        return Ok(wall_now);
896    }
897    // Wall clock did not advance past the previous commit. Bump by 1ms.
898    let next_raw = prev
899        .as_millis()
900        .checked_add(1)
901        .ok_or(PipelineError::ClockExhausted {
902            last_committed_at: prev,
903        })?;
904    ClockTime::try_from_millis(next_raw).map_err(|_| PipelineError::ClockExhausted {
905        last_committed_at: prev,
906    })
907}
908
909fn emit(
910    forms: &[ValidatedForm],
911    journal: &[SymbolMutation],
912    state: &mut EmitState,
913) -> Result<Vec<CanonicalRecord>, EmitError> {
914    // Symbol events first, so replay sees allocations before the memory
915    // records that reference their IDs.
916    let mut out = Vec::with_capacity(journal.len() + forms.len());
917    for mutation in journal {
918        out.push(emit_symbol_mutation(mutation, state.now));
919    }
920    for form in forms {
921        // Alias / Rename / Retire produce no memory-level record —
922        // their durability comes via the SYMBOL_* canonical records
923        // emitted above from the bind journal.
924        if matches!(
925            form,
926            ValidatedForm::Alias { .. }
927                | ValidatedForm::Rename { .. }
928                | ValidatedForm::Retire { .. }
929        ) {
930            continue;
931        }
932        emit_form(form, state, &mut out)?;
933    }
934    Ok(out)
935}
936
937fn emit_symbol_mutation(mutation: &SymbolMutation, now: ClockTime) -> CanonicalRecord {
938    match mutation {
939        SymbolMutation::Allocate { id, name, kind } => {
940            CanonicalRecord::SymbolAlloc(SymbolEventRecord {
941                symbol_id: *id,
942                name: name.clone(),
943                symbol_kind: *kind,
944                at: now,
945            })
946        }
947        SymbolMutation::Rename {
948            id,
949            new_canonical,
950            kind,
951        } => CanonicalRecord::SymbolRename(SymbolEventRecord {
952            symbol_id: *id,
953            name: new_canonical.clone(),
954            symbol_kind: *kind,
955            at: now,
956        }),
957        SymbolMutation::Alias { id, alias, kind } => {
958            CanonicalRecord::SymbolAlias(SymbolEventRecord {
959                symbol_id: *id,
960                name: alias.clone(),
961                symbol_kind: *kind,
962                at: now,
963            })
964        }
965        SymbolMutation::Retire { id, name, kind } => {
966            CanonicalRecord::SymbolRetire(SymbolEventRecord {
967                symbol_id: *id,
968                name: name.clone(),
969                symbol_kind: *kind,
970                at: now,
971            })
972        }
973    }
974}
975
976#[allow(clippy::too_many_lines)]
977fn emit_form(
978    form: &ValidatedForm,
979    state: &mut EmitState,
980    out: &mut Vec<CanonicalRecord>,
981) -> Result<(), EmitError> {
982    match form {
983        ValidatedForm::Sem {
984            s,
985            p,
986            o,
987            source,
988            confidence,
989            valid_at,
990            projected,
991            ..
992        } => {
993            let memory_id = allocate_memory_id(state, out)?;
994            // Auto-supersession per temporal-model.md § 5.1: look up
995            // an existing Semantic memory at `(s, p)` and decide
996            // forward / retroactive / conflict.
997            let (record_invalid_at, supersession) =
998                resolve_semantic_supersession(state.index, memory_id, *s, *p, *valid_at)?;
999            let sem = SemRecord {
1000                memory_id,
1001                s: *s,
1002                p: *p,
1003                o: o.clone(),
1004                source: *source,
1005                confidence: *confidence,
1006                clocks: Clocks {
1007                    valid_at: *valid_at,
1008                    observed_at: state.now,
1009                    committed_at: state.now,
1010                    invalid_at: record_invalid_at,
1011                },
1012                flags: SemFlags {
1013                    projected: *projected,
1014                },
1015            };
1016            let sem_index = state.semantic_records.len();
1017            state.semantic_records.push(sem.clone());
1018            state
1019                .semantic_by_sp
1020                .entry((*s, *p))
1021                .or_default()
1022                .push(sem_index);
1023            out.push(CanonicalRecord::Sem(sem));
1024            // Emit the Supersedes edge *after* the new memory so a
1025            // log reader sees the memory first, then the edge that
1026            // refers to it.
1027            if let Some(target) = supersession {
1028                emit_supersedes_edge(state, out, memory_id, target)?;
1029            }
1030        }
1031        ValidatedForm::Epi {
1032            event_id,
1033            kind,
1034            participants,
1035            location,
1036            at_time,
1037            observed_at,
1038            source,
1039            confidence,
1040            ..
1041        } => {
1042            let memory_id = allocate_memory_id(state, out)?;
1043            let epi = EpiRecord {
1044                memory_id,
1045                event_id: *event_id,
1046                kind: *kind,
1047                participants: participants.clone(),
1048                location: *location,
1049                at_time: *at_time,
1050                observed_at: *observed_at,
1051                source: *source,
1052                confidence: *confidence,
1053                committed_at: state.now,
1054                invalid_at: None,
1055            };
1056            state.episodic_records.push(epi.clone());
1057            out.push(CanonicalRecord::Epi(epi));
1058        }
1059        ValidatedForm::Pro {
1060            rule_id,
1061            trigger,
1062            action,
1063            precondition,
1064            scope,
1065            source,
1066            confidence,
1067            ..
1068        } => {
1069            let memory_id = allocate_memory_id(state, out)?;
1070            let pro = ProRecord {
1071                memory_id,
1072                rule_id: *rule_id,
1073                trigger: trigger.clone(),
1074                action: action.clone(),
1075                precondition: precondition.clone(),
1076                scope: *scope,
1077                source: *source,
1078                confidence: *confidence,
1079                clocks: Clocks {
1080                    valid_at: state.now,
1081                    observed_at: state.now,
1082                    committed_at: state.now,
1083                    invalid_at: None,
1084                },
1085            };
1086            let pro_index = state.procedural_records.len();
1087            state.procedural_records.push(pro.clone());
1088            state
1089                .procedural_by_rule
1090                .entry(*rule_id)
1091                .or_default()
1092                .push(pro_index);
1093            out.push(CanonicalRecord::Pro(pro));
1094            // Auto-supersession per § 5.2: dedup by `rule_id` OR
1095            // `(trigger, scope)`. Either match triggers supersession;
1096            // if both match distinct prior memories, both get edges.
1097            let superseded = apply_procedural_supersession(
1098                state.index,
1099                memory_id,
1100                state.now,
1101                *rule_id,
1102                trigger,
1103                *scope,
1104            )?;
1105            for old in superseded {
1106                emit_supersedes_edge(state, out, memory_id, old)?;
1107            }
1108        }
1109        ValidatedForm::Inf {
1110            s,
1111            p,
1112            o,
1113            derived_from,
1114            method,
1115            confidence,
1116            valid_at,
1117            projected,
1118        } => {
1119            let memory_id = allocate_memory_id(state, out)?;
1120            // Write-time stale flag (`temporal-model.md` § 5.4):
1121            // an Inferential derived from an already-superseded
1122            // parent is born stale. A parent is superseded iff the
1123            // DAG carries an incoming `Supersedes` edge on it.
1124            let born_stale = derived_from
1125                .iter()
1126                .any(|parent| parent_is_superseded(state.dag, *parent));
1127            // Auto-supersession per temporal-model.md § 5.4 ("auto-
1128            // supersession rule as if Inferential were Semantic —
1129            // same (s, p) later valid_at"). Shares the Sem § 5.1
1130            // forward / retroactive / conflict logic.
1131            let (record_invalid_at, supersession) =
1132                resolve_inferential_supersession(state.index, memory_id, *s, *p, *valid_at)?;
1133            let inf = InfRecord {
1134                memory_id,
1135                s: *s,
1136                p: *p,
1137                o: o.clone(),
1138                derived_from: derived_from.clone(),
1139                method: *method,
1140                confidence: *confidence,
1141                clocks: Clocks {
1142                    valid_at: *valid_at,
1143                    observed_at: state.now,
1144                    committed_at: state.now,
1145                    invalid_at: record_invalid_at,
1146                },
1147                flags: InfFlags {
1148                    projected: *projected,
1149                    stale: born_stale,
1150                },
1151            };
1152            let inf_index = state.inferential_records.len();
1153            state.inferential_records.push(inf.clone());
1154            state
1155                .inferential_by_sp
1156                .entry((*s, *p))
1157                .or_default()
1158                .push(inf_index);
1159            out.push(CanonicalRecord::Inf(inf));
1160            // Register in the reverse-parent index so future
1161            // supersessions on any parent can emit `StaleParent`
1162            // edges against this Inferential without scanning the
1163            // full history.
1164            for parent in derived_from {
1165                state
1166                    .inferentials_by_parent
1167                    .entry(*parent)
1168                    .or_default()
1169                    .push(memory_id);
1170            }
1171            // Emit the Supersedes edge *after* the new memory so a
1172            // log reader sees the memory first, then the edge that
1173            // refers to it. Mirrors Sem § 5.1 ordering.
1174            if let Some(target) = supersession {
1175                emit_supersedes_edge(state, out, memory_id, target)?;
1176            }
1177        }
1178        // Alias / Rename / Retire are filtered out by emit() before
1179        // reaching here; their canonical form is the SYMBOL_* record
1180        // from the bind journal.
1181        ValidatedForm::Alias { .. }
1182        | ValidatedForm::Rename { .. }
1183        | ValidatedForm::Retire { .. } => {
1184            return Err(EmitError::Unsupported {
1185                form: "symbol-event-form-without-journal",
1186            })
1187        }
1188        ValidatedForm::Correct { .. } => return Err(EmitError::Unsupported { form: "correct" }),
1189        ValidatedForm::Promote { .. } => return Err(EmitError::Unsupported { form: "promote" }),
1190        ValidatedForm::Query { .. } => return Err(EmitError::Unsupported { form: "query" }),
1191        ValidatedForm::Episode {
1192            action,
1193            label,
1194            parent_episode,
1195            retracts,
1196        } => {
1197            // Episode forms emit no canonical record at this layer.
1198            // `:start` deposits metadata in the batch's pending slot
1199            // for Store to attach to the CHECKPOINT; `:close` is a
1200            // no-op under the single-batch-per-Episode model (spec
1201            // § 3.1 — the compile_batch return implicitly closes).
1202            // The semantic stage guarantees at most one Episode
1203            // form per batch, so we can unconditionally overwrite.
1204            if matches!(action, crate::parse::EpisodeAction::Start) {
1205                *state.pending_episode = Some(PendingEpisodeMetadata {
1206                    label: label.clone(),
1207                    parent_episode: *parent_episode,
1208                    retracts: retracts.clone(),
1209                });
1210            } else {
1211                // `:close` — still clear any stale pending metadata
1212                // so the batch signals "no new Episode metadata."
1213                *state.pending_episode = None;
1214            }
1215        }
1216        ValidatedForm::Flag {
1217            action,
1218            memory,
1219            actor,
1220        } => {
1221            let record = crate::canonical::FlagEventRecord {
1222                memory_id: *memory,
1223                at: state.now,
1224                actor_symbol: *actor,
1225            };
1226            // Flip the pipeline's pin / auth sets alongside the
1227            // canonical emission so the read path picks up the new
1228            // state without needing a round-trip through replay.
1229            match action {
1230                crate::parse::FlagAction::Pin => {
1231                    state.pinned.insert(*memory);
1232                    out.push(CanonicalRecord::Pin(record));
1233                }
1234                crate::parse::FlagAction::Unpin => {
1235                    state.pinned.remove(memory);
1236                    out.push(CanonicalRecord::Unpin(record));
1237                }
1238                crate::parse::FlagAction::AuthoritativeSet => {
1239                    state.authoritative.insert(*memory);
1240                    out.push(CanonicalRecord::AuthoritativeSet(record));
1241                }
1242                crate::parse::FlagAction::AuthoritativeClear => {
1243                    state.authoritative.remove(memory);
1244                    out.push(CanonicalRecord::AuthoritativeClear(record));
1245                }
1246            }
1247        }
1248    }
1249    Ok(())
1250}
1251
1252/// Resolve the auto-supersession decision for a new Semantic memory
1253/// against the current-state index. Per `temporal-model.md` § 5.1:
1254///
1255/// - No prior at `(s, p)`: insert, no edge.
1256/// - New `valid_at > old.valid_at` (forward): replace the index
1257///   entry; the caller emits a Supersedes edge `new → old`.
1258/// - New `valid_at < old.valid_at` (retroactive correction): the new
1259///   memory is valid only for the period up to `old.valid_at`, so
1260///   its `invalid_at` is set at write time and the index entry for
1261///   `(s, p)` stays pointed at `old`. A Supersedes edge still records
1262///   the temporal relationship.
1263/// - Equal `valid_at`: two memories at the same `(s, p)` claiming
1264///   identical validity start cannot both be authoritative under the
1265///   single-writer-per-workspace invariant. Surface as an emit-time
1266///   error so the agent picks a distinct `valid_at` and re-batches.
1267fn resolve_semantic_supersession(
1268    index: &mut SupersessionIndex,
1269    new_memory_id: SymbolId,
1270    s: SymbolId,
1271    p: SymbolId,
1272    new_valid_at: ClockTime,
1273) -> Result<(Option<ClockTime>, Option<SymbolId>), EmitError> {
1274    let key = (s, p);
1275    let Some(old) = index.semantic_by_sp.get(&key).copied() else {
1276        index.semantic_by_sp.insert(
1277            key,
1278            CurrentSemantic {
1279                memory_id: new_memory_id,
1280                valid_at: new_valid_at,
1281            },
1282        );
1283        return Ok((None, None));
1284    };
1285    match new_valid_at.cmp(&old.valid_at) {
1286        std::cmp::Ordering::Greater => {
1287            // Forward supersession.
1288            index.semantic_by_sp.insert(
1289                key,
1290                CurrentSemantic {
1291                    memory_id: new_memory_id,
1292                    valid_at: new_valid_at,
1293                },
1294            );
1295            tracing::info!(
1296                target: "mimir.supersession",
1297                kind = "semantic",
1298                direction = "forward",
1299                s = %s,
1300                p = %p,
1301                old_memory_id = %old.memory_id,
1302                new_memory_id = %new_memory_id,
1303                "semantic auto-supersession",
1304            );
1305            Ok((None, Some(old.memory_id)))
1306        }
1307        std::cmp::Ordering::Less => {
1308            // Retroactive: new memory's validity closes at old's start.
1309            // Index entry preserves `old` as current.
1310            //
1311            // Note the asymmetry vs forward supersession: § 6.2 #4
1312            // says `to.invalid_at = from.valid_at` for every
1313            // Supersedes edge, but § 5.1 retroactive instead sets
1314            // `from.invalid_at = to.valid_at` (the NEW memory's
1315            // invalid_at, not the old one's). We follow § 5.1 here;
1316            // 6.4's as-of resolver must not derive invalid_at from
1317            // edges alone — it has to read it from the memory record
1318            // directly.
1319            tracing::info!(
1320                target: "mimir.supersession",
1321                kind = "semantic",
1322                direction = "retroactive",
1323                s = %s,
1324                p = %p,
1325                old_memory_id = %old.memory_id,
1326                new_memory_id = %new_memory_id,
1327                "semantic auto-supersession",
1328            );
1329            Ok((Some(old.valid_at), Some(old.memory_id)))
1330        }
1331        std::cmp::Ordering::Equal => Err(EmitError::SemanticSupersessionConflict {
1332            s,
1333            p,
1334            valid_at: new_valid_at,
1335            existing: old.memory_id,
1336        }),
1337    }
1338}
1339
1340/// Resolve auto-supersession for a new Inferential memory against
1341/// the current-state index per `temporal-model.md` § 5.4. Mirrors
1342/// [`resolve_semantic_supersession`]: a re-derivation with the same
1343/// `(s, p)` and a later `valid_at` forward-supersedes; an earlier
1344/// `valid_at` is retroactive and closes the *new* memory's validity
1345/// at the existing record's `valid_at`; equal `valid_at` is a
1346/// conflict under the single-writer invariant.
1347fn resolve_inferential_supersession(
1348    index: &mut SupersessionIndex,
1349    new_memory_id: SymbolId,
1350    s: SymbolId,
1351    p: SymbolId,
1352    new_valid_at: ClockTime,
1353) -> Result<(Option<ClockTime>, Option<SymbolId>), EmitError> {
1354    let key = (s, p);
1355    let Some(old) = index.inferential_by_sp.get(&key).copied() else {
1356        index.inferential_by_sp.insert(
1357            key,
1358            CurrentSemantic {
1359                memory_id: new_memory_id,
1360                valid_at: new_valid_at,
1361            },
1362        );
1363        return Ok((None, None));
1364    };
1365    match new_valid_at.cmp(&old.valid_at) {
1366        std::cmp::Ordering::Greater => {
1367            index.inferential_by_sp.insert(
1368                key,
1369                CurrentSemantic {
1370                    memory_id: new_memory_id,
1371                    valid_at: new_valid_at,
1372                },
1373            );
1374            tracing::info!(
1375                target: "mimir.supersession",
1376                kind = "inferential",
1377                direction = "forward",
1378                s = %s,
1379                p = %p,
1380                old_memory_id = %old.memory_id,
1381                new_memory_id = %new_memory_id,
1382                "inferential auto-supersession",
1383            );
1384            Ok((None, Some(old.memory_id)))
1385        }
1386        std::cmp::Ordering::Less => {
1387            // Retroactive: new memory's validity closes at old's
1388            // start. Index entry preserves `old` as current.
1389            tracing::info!(
1390                target: "mimir.supersession",
1391                kind = "inferential",
1392                direction = "retroactive",
1393                s = %s,
1394                p = %p,
1395                old_memory_id = %old.memory_id,
1396                new_memory_id = %new_memory_id,
1397                "inferential auto-supersession",
1398            );
1399            Ok((Some(old.valid_at), Some(old.memory_id)))
1400        }
1401        std::cmp::Ordering::Equal => Err(EmitError::InferentialSupersessionConflict {
1402            s,
1403            p,
1404            valid_at: new_valid_at,
1405            existing: old.memory_id,
1406        }),
1407    }
1408}
1409
1410/// Resolve auto-supersession for a new Procedural memory against
1411/// the current-state index per `temporal-model.md` § 5.2.
1412///
1413/// Either a `rule_id` match or a `(trigger, scope)` match triggers
1414/// supersession. If both keys match distinct existing memories (i.e.
1415/// the new memory's `rule_id` points to memory A and its
1416/// `(trigger, scope)` points to memory B ≠ A), BOTH are superseded
1417/// and the caller emits a `Supersedes` edge for each. The case
1418/// where both lookups converge to the same memory (a write that
1419/// duplicates an existing Pro on both keys) dedupes to a single
1420/// edge.
1421///
1422/// Side effects: removes the superseded memories' index entries
1423/// under both of their keys (once superseded, a memory is no longer
1424/// current on *any* key) and inserts the new memory under both of
1425/// its keys.
1426///
1427/// # Errors
1428///
1429/// [`EmitError::ProceduralSupersessionConflict`] if any matched
1430/// predecessor shares `committed_at` with the new memory — i.e.,
1431/// both were emitted in the same batch. This is the Pro analog of
1432/// Semantic's equal-`valid_at` rejection: silently accepting the
1433/// edge would produce a zero-duration "supersession" that's almost
1434/// certainly an agent bug.
1435fn apply_procedural_supersession(
1436    index: &mut SupersessionIndex,
1437    new_memory_id: SymbolId,
1438    new_committed_at: ClockTime,
1439    rule_id: SymbolId,
1440    trigger: &crate::Value,
1441    scope: SymbolId,
1442) -> Result<Vec<SymbolId>, EmitError> {
1443    let (superseded, trigger_scope_key) = procedural_lookup(index, rule_id, trigger, scope);
1444
1445    // Reject intra-batch conflict before mutating the index — equal
1446    // committed_at means NEW and OLD were both produced in the same
1447    // batch's emit pass, which would yield a zero-duration
1448    // supersession.
1449    for old in &superseded {
1450        if old.committed_at == new_committed_at {
1451            return Err(EmitError::ProceduralSupersessionConflict {
1452                rule_id,
1453                existing: old.memory_id,
1454            });
1455        }
1456    }
1457
1458    procedural_install(
1459        index,
1460        &superseded,
1461        new_memory_id,
1462        new_committed_at,
1463        rule_id,
1464        trigger_scope_key,
1465    );
1466    if !superseded.is_empty() {
1467        tracing::info!(
1468            target: "mimir.supersession",
1469            kind = "procedural",
1470            rule_id = %rule_id,
1471            new_memory_id = %new_memory_id,
1472            superseded_count = superseded.len(),
1473            "procedural auto-supersession",
1474        );
1475    }
1476    Ok(superseded.into_iter().map(|c| c.memory_id).collect())
1477}
1478
1479/// Replay entrypoint — applies the same index mutations as the emit
1480/// path but skips the intra-batch-conflict check. Safe because the
1481/// emit path rejected any such conflict at original write time, so
1482/// no log record can contain one.
1483fn replay_procedural_supersession(
1484    index: &mut SupersessionIndex,
1485    new_memory_id: SymbolId,
1486    new_committed_at: ClockTime,
1487    rule_id: SymbolId,
1488    trigger: &crate::Value,
1489    scope: SymbolId,
1490) {
1491    let (superseded, trigger_scope_key) = procedural_lookup(index, rule_id, trigger, scope);
1492    procedural_install(
1493        index,
1494        &superseded,
1495        new_memory_id,
1496        new_committed_at,
1497        rule_id,
1498        trigger_scope_key,
1499    );
1500}
1501
1502/// Look up the (up to two) prior Procedural memories matched by
1503/// `rule_id` and `(trigger, scope)`. Returns the canonical
1504/// `(trigger_bytes, scope)` key alongside the matches so callers can
1505/// reuse it for insertion without re-encoding.
1506fn procedural_lookup(
1507    index: &SupersessionIndex,
1508    rule_id: SymbolId,
1509    trigger: &crate::Value,
1510    scope: SymbolId,
1511) -> (Vec<CurrentProcedural>, (Vec<u8>, SymbolId)) {
1512    let trigger_scope_key = (trigger.index_key_bytes(), scope);
1513    let by_rule = index.procedural_by_rule.get(&rule_id).copied();
1514    let by_ts = index
1515        .procedural_by_trigger_scope
1516        .get(&trigger_scope_key)
1517        .copied();
1518
1519    let mut superseded: Vec<CurrentProcedural> = Vec::new();
1520    if let Some(old) = by_rule {
1521        superseded.push(old);
1522    }
1523    if let Some(old) = by_ts {
1524        if !superseded
1525            .iter()
1526            .any(|existing| existing.memory_id == old.memory_id)
1527        {
1528            superseded.push(old);
1529        }
1530    }
1531    (superseded, trigger_scope_key)
1532}
1533
1534/// Clear every superseded memory's BOTH keys and insert NEW under
1535/// both of its keys. Shared by emit + replay.
1536fn procedural_install(
1537    index: &mut SupersessionIndex,
1538    superseded: &[CurrentProcedural],
1539    new_memory_id: SymbolId,
1540    new_committed_at: ClockTime,
1541    rule_id: SymbolId,
1542    trigger_scope_key: (Vec<u8>, SymbolId),
1543) {
1544    for old in superseded {
1545        if let Some(keys) = index.procedural_keys_by_memory.remove(&old.memory_id) {
1546            index.procedural_by_rule.remove(&keys.rule_id);
1547            index
1548                .procedural_by_trigger_scope
1549                .remove(&keys.trigger_scope);
1550        }
1551    }
1552    let new_entry = CurrentProcedural {
1553        memory_id: new_memory_id,
1554        committed_at: new_committed_at,
1555    };
1556    let new_keys = ProceduralKeys {
1557        rule_id,
1558        trigger_scope: trigger_scope_key.clone(),
1559    };
1560    index.procedural_by_rule.insert(rule_id, new_entry);
1561    index
1562        .procedural_by_trigger_scope
1563        .insert(trigger_scope_key, new_entry);
1564    index
1565        .procedural_keys_by_memory
1566        .insert(new_memory_id, new_keys);
1567}
1568
1569/// Push a `Supersedes` edge record into the output stream and add
1570/// the matching in-memory edge to the working DAG. Also emits a
1571/// `StaleParent` edge from each Inferential derived from `to` —
1572/// per `temporal-model.md` § 5.4, a superseded parent invalidates
1573/// every dependent Inferential at the supersession instant.
1574fn emit_supersedes_edge(
1575    state: &mut EmitState,
1576    out: &mut Vec<CanonicalRecord>,
1577    from: SymbolId,
1578    to: SymbolId,
1579) -> Result<(), EmitError> {
1580    out.push(CanonicalRecord::Supersedes(EdgeRecord {
1581        from,
1582        to,
1583        at: state.now,
1584    }));
1585    state
1586        .dag
1587        .add_edge(DagEdge {
1588            kind: EdgeKind::Supersedes,
1589            from,
1590            to,
1591            at: state.now,
1592        })
1593        .map_err(EmitError::SupersessionDag)?;
1594    // Inferential staling: every Inferential that derived from the
1595    // now-superseded `to` gets a `StaleParent` edge committed in the
1596    // same batch. The reverse index is populated at Inf emit + log
1597    // replay so this lookup is O(log n) + O(k) in the dependent
1598    // count, typically ≤ 3.
1599    if let Some(dependents) = state.inferentials_by_parent.get(&to).cloned() {
1600        for inf_id in dependents {
1601            emit_stale_parent_edge(state, out, inf_id, to)?;
1602        }
1603    }
1604    Ok(())
1605}
1606
1607/// Emit a `StaleParent` edge (`inf → parent`) as part of the
1608/// Inferential-staling retroactive propagation. Added to both the
1609/// output stream (for durability) and the working DAG (so further
1610/// batch-local logic sees it).
1611fn emit_stale_parent_edge(
1612    state: &mut EmitState,
1613    out: &mut Vec<CanonicalRecord>,
1614    inf_id: SymbolId,
1615    parent_id: SymbolId,
1616) -> Result<(), EmitError> {
1617    out.push(CanonicalRecord::StaleParent(EdgeRecord {
1618        from: inf_id,
1619        to: parent_id,
1620        at: state.now,
1621    }));
1622    state
1623        .dag
1624        .add_edge(DagEdge {
1625            kind: EdgeKind::StaleParent,
1626            from: inf_id,
1627            to: parent_id,
1628            at: state.now,
1629        })
1630        .map_err(EmitError::SupersessionDag)?;
1631    Ok(())
1632}
1633
1634/// True if `parent` has an incoming `Supersedes` edge in `dag` —
1635/// i.e. it has been superseded. Used at Inf emit time for the
1636/// write-time `stale` flag per `temporal-model.md` § 5.4.
1637fn parent_is_superseded(dag: &SupersessionDag, parent: SymbolId) -> bool {
1638    dag.edges_to(parent)
1639        .any(|e| matches!(e.kind, EdgeKind::Supersedes))
1640}
1641
1642fn allocate_memory_id(
1643    state: &mut EmitState,
1644    out: &mut Vec<CanonicalRecord>,
1645) -> Result<SymbolId, EmitError> {
1646    // `__mem_{n}` is the librarian's conventional memory-ID name; the
1647    // identifier grammar (ir-write-surface.md § 3.1) does not formally
1648    // reserve the `__` prefix, so a pathological agent could in
1649    // principle land a collision — if so, bind surfaces it as
1650    // `EmitError::MemoryIdAllocation` (the collision does not silently
1651    // overwrite an existing symbol).
1652    let name = format!("__mem_{}", *state.counter);
1653    *state.counter += 1;
1654    let id = state
1655        .table
1656        .allocate(name.clone(), SymbolKind::Memory)
1657        .map_err(|cause| EmitError::MemoryIdAllocation {
1658            name: name.clone(),
1659            cause,
1660        })?;
1661    // Emit a SymbolAlloc canonical record so the memory symbol is
1662    // recoverable on log replay (same constraint that applies to the
1663    // bind-journal-derived SymbolAlloc records).
1664    out.push(CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1665        symbol_id: id,
1666        name,
1667        symbol_kind: SymbolKind::Memory,
1668        at: state.now,
1669    }));
1670    Ok(id)
1671}
1672
1673/// Pipeline-level error — tags the stage at which compilation failed.
1674/// Every variant means the batch did **not** commit; the workspace
1675/// state is unchanged.
1676#[derive(Debug, Error, PartialEq)]
1677pub enum PipelineError {
1678    /// Lex/parse stage failure.
1679    #[error("parse error: {0}")]
1680    Parse(#[from] ParseError),
1681
1682    /// Bind stage failure.
1683    #[error("bind error: {0}")]
1684    Bind(#[from] BindError),
1685
1686    /// Semantic stage failure.
1687    #[error("semantic error: {0}")]
1688    Semantic(#[from] SemanticError),
1689
1690    /// Emit stage failure.
1691    #[error("emit error: {0}")]
1692    Emit(#[from] EmitError),
1693
1694    /// The monotonic `committed_at` rule would advance into the
1695    /// reserved `u64::MAX` sentinel that `ClockTime` refuses to
1696    /// represent. Per `temporal-model.md` § 9.1, `ClockTime::MAX - 1`
1697    /// reaches year ≈584 000 000, so this error is effectively a guard
1698    /// against pathological inputs rather than a real-world condition.
1699    #[error(
1700        "committed_at clock exhausted: monotonic advance past {last_committed_at} would hit reserved sentinel"
1701    )]
1702    ClockExhausted {
1703        /// The previous commit clock before the attempted advance.
1704        last_committed_at: ClockTime,
1705    },
1706}
1707
1708/// Errors produced by the Emit stage. An `EmitError` always means an
1709/// invariant should have been caught earlier or the form is not yet
1710/// supported at this milestone.
1711#[derive(Debug, Error, PartialEq)]
1712pub enum EmitError {
1713    /// A form shape is not yet wired to a canonical record. See the
1714    /// module-level scope notes for which forms emit records in this
1715    /// milestone.
1716    #[error("form {form} is not yet emitted by this pipeline milestone")]
1717    Unsupported {
1718        /// Form name (`alias`, `rename`, `retire`, `correct`, `promote`, `query`).
1719        form: &'static str,
1720    },
1721
1722    /// Allocating the synthesized `__mem_{n}` symbol failed. The only
1723    /// realistic cause is a name collision with an agent-emitted symbol
1724    /// that used the reserved `__mem_` prefix, which would itself be a
1725    /// prior bug; preserved as a typed variant for diagnosability.
1726    #[error("memory-id allocation failed for {name}: {cause}")]
1727    MemoryIdAllocation {
1728        /// The synthesized name that collided.
1729        name: String,
1730        /// Underlying bind error.
1731        cause: BindError,
1732    },
1733
1734    /// Two Semantic writes land at the same `(s, p)` with identical
1735    /// `valid_at`. Per `temporal-model.md` § 5.1 two memories cannot
1736    /// both be authoritative at the same conflict key and identical
1737    /// validity start under the single-writer invariant — surface as
1738    /// a deterministic emit-time error so the agent can correct the
1739    /// `valid_at` or choose a different `(s, p)`.
1740    #[error(
1741        "semantic supersession conflict at (s={s:?}, p={p:?}) valid_at={valid_at}: new memory has the same valid_at as existing memory {existing:?}"
1742    )]
1743    SemanticSupersessionConflict {
1744        /// Subject of the conflicting write.
1745        s: SymbolId,
1746        /// Predicate of the conflicting write.
1747        p: SymbolId,
1748        /// Shared `valid_at` that triggered the conflict.
1749        valid_at: ClockTime,
1750        /// Memory ID of the existing current-state memory.
1751        existing: SymbolId,
1752    },
1753
1754    /// Two Inferential writes land at the same `(s, p)` with identical
1755    /// `valid_at`. Per `temporal-model.md` § 5.4 Inferential
1756    /// supersession mirrors Semantic § 5.1 — equal `valid_at` against
1757    /// the same `(s, p)` is a deterministic write conflict the agent
1758    /// must resolve by choosing a distinct `valid_at` or re-keying.
1759    #[error(
1760        "inferential supersession conflict at (s={s:?}, p={p:?}) valid_at={valid_at}: new memory has the same valid_at as existing memory {existing:?}"
1761    )]
1762    InferentialSupersessionConflict {
1763        /// Subject of the conflicting write.
1764        s: SymbolId,
1765        /// Predicate of the conflicting write.
1766        p: SymbolId,
1767        /// Shared `valid_at` that triggered the conflict.
1768        valid_at: ClockTime,
1769        /// Memory ID of the existing current-state memory.
1770        existing: SymbolId,
1771    },
1772
1773    /// An auto-supersession edge failed the DAG's acyclicity check.
1774    /// Cannot happen from auto-supersession alone (new memories have
1775    /// fresh IDs that cannot appear as ancestors), but the DAG's
1776    /// contract surfaces any violation rather than silently accepting.
1777    #[error("supersession DAG rejected edge: {0}")]
1778    SupersessionDag(#[from] crate::dag::DagError),
1779
1780    /// Two Procedural writes in the same batch land on overlapping
1781    /// supersession keys (same `rule_id` or same `(trigger, scope)`).
1782    /// They would share `committed_at` per § 9.2 monotonic-batch
1783    /// semantics, producing a zero-duration Supersedes edge. Per the
1784    /// Semantic analog at § 5.1 this is a deterministic write
1785    /// conflict rather than a silent accept — the agent should split
1786    /// the batch or choose distinct keys.
1787    #[error(
1788        "procedural supersession conflict: batch contains two Pro writes at the same supersession key (rule_id={rule_id:?}), first bound to {existing:?}"
1789    )]
1790    ProceduralSupersessionConflict {
1791        /// The `rule_id` under conflict.
1792        rule_id: SymbolId,
1793        /// The first in-batch Pro memory that occupies the key.
1794        existing: SymbolId,
1795    },
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800    use super::*;
1801    use crate::canonical::Opcode;
1802
1803    fn now() -> ClockTime {
1804        ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
1805    }
1806
1807    const SEM_OK: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
1808
1809    fn memory_records(records: &[CanonicalRecord]) -> Vec<&CanonicalRecord> {
1810        records
1811            .iter()
1812            .filter(|r| {
1813                matches!(
1814                    r.opcode(),
1815                    Opcode::Sem | Opcode::Epi | Opcode::Pro | Opcode::Inf
1816                )
1817            })
1818            .collect()
1819    }
1820
1821    #[test]
1822    fn pathological_agent_collides_with_mem_counter() {
1823        // A pathological agent mentions `@__mem_0` as a symbol in
1824        // the same batch that produces a librarian-synthesised
1825        // `__mem_0` memory-id. `allocate_memory_id` surfaces the
1826        // collision as `EmitError::MemoryIdAllocation` and rolls
1827        // back the batch — no silent skip, no overwrite.
1828        let mut pipe = Pipeline::new();
1829        let input = "(sem @alice @knows @__mem_0 :src @observation :c 0.8 :v 2024-01-15)";
1830        let err = pipe
1831            .compile_batch(input, now())
1832            .expect_err("__mem_0 collision");
1833        let PipelineError::Emit(EmitError::MemoryIdAllocation { name, .. }) = &err else {
1834            panic!("expected MemoryIdAllocation error, got {err:?}");
1835        };
1836        assert_eq!(name, "__mem_0");
1837        // Batch rolled back: neither the agent's @__mem_0 symbol
1838        // nor the other batch allocations committed.
1839        assert!(pipe.table.lookup("__mem_0").is_none());
1840        assert!(pipe.table.lookup("alice").is_none());
1841        assert_eq!(pipe.next_memory_counter, 0);
1842    }
1843
1844    #[test]
1845    fn single_sem_form_roundtrips_through_pipeline() {
1846        let mut pipe = Pipeline::new();
1847        let records = pipe.compile_batch(SEM_OK, now()).expect("compile");
1848        // SYMBOL_ALLOC records for @alice, @knows, @bob, @observation,
1849        // and __mem_0 precede the memory record.
1850        let mems = memory_records(&records);
1851        assert_eq!(mems.len(), 1);
1852        assert_eq!(mems[0].opcode(), Opcode::Sem);
1853        // Every non-memory record in this batch is a SymbolAlloc.
1854        for r in &records {
1855            assert!(matches!(r.opcode(), Opcode::Sem | Opcode::SymbolAlloc));
1856        }
1857    }
1858
1859    #[test]
1860    fn epi_records_are_retained_after_compile() {
1861        let mut pipe = Pipeline::new();
1862        let input = "(epi @evt_001 @rename (@old @new) @github \
1863                     :at 2024-01-15T10:00:00Z :obs 2024-01-15T10:00:05Z \
1864                     :src @observation :c 0.9)";
1865
1866        let records = pipe.compile_batch(input, now()).expect("compile");
1867
1868        let mems = memory_records(&records);
1869        assert_eq!(mems.len(), 1);
1870        assert_eq!(mems[0].opcode(), Opcode::Epi);
1871        assert_eq!(pipe.episodic_records().len(), 1);
1872        let retained = &pipe.episodic_records()[0];
1873        assert_eq!(
1874            retained.event_id,
1875            pipe.table.lookup("evt_001").expect("evt")
1876        );
1877        assert_eq!(retained.kind, pipe.table.lookup("rename").expect("kind"));
1878        assert_eq!(retained.participants.len(), 2);
1879    }
1880
1881    #[test]
1882    fn multi_form_batch_emits_in_input_order() {
1883        let mut pipe = Pipeline::new();
1884        let input = "
1885            (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
1886            (sem @alice @knows @carol :src @observation :c 0.7 :v 2024-01-16)
1887        ";
1888        let records = pipe.compile_batch(input, now()).expect("compile");
1889        let mems = memory_records(&records);
1890        assert_eq!(mems.len(), 2);
1891        for r in &mems {
1892            assert_eq!(r.opcode(), Opcode::Sem);
1893        }
1894    }
1895
1896    #[test]
1897    fn empty_input_is_a_no_op_batch() {
1898        // The semantics choice: `compile_batch("")` parses as zero
1899        // forms and produces zero records. It does NOT error. No
1900        // symbols allocated, no memory-id counter advancement, no
1901        // records emitted. This locks in the "empty is valid" path
1902        // so later callers (e.g. batched wire transport) don't
1903        // accidentally introduce a different behavior.
1904        let mut pipe = Pipeline::new();
1905        let records = pipe.compile_batch("", now()).expect("empty compiles");
1906        assert!(records.is_empty());
1907        assert_eq!(pipe.next_memory_counter, 0);
1908        // Empty batches DO advance the commit watermark — the batch
1909        // still ran, and a subsequent batch submitted with the same
1910        // wall clock must bump past it. `Store::commit_batch` relies
1911        // on this: it reads `pipeline.last_committed_at()` to stamp
1912        // the CHECKPOINT + episode alloc, so an empty batch that
1913        // silently skipped the advance would let the checkpoint
1914        // regress under a repeated wall clock.
1915        assert_eq!(pipe.last_committed_at(), Some(now()));
1916        // Whitespace-only input is equivalent.
1917        let records = pipe
1918            .compile_batch("   \n\t  ", now())
1919            .expect("whitespace compiles");
1920        assert!(records.is_empty());
1921        assert_eq!(pipe.next_memory_counter, 0);
1922        // Second empty batch bumped the watermark by 1 ms.
1923        assert_eq!(
1924            pipe.last_committed_at().expect("set").as_millis(),
1925            now().as_millis() + 1
1926        );
1927    }
1928
1929    #[test]
1930    fn parse_error_does_not_mutate_table() {
1931        let mut pipe = Pipeline::new();
1932        let before_table = pipe.table.clone();
1933        let err = pipe.compile_batch("(sem @a", now()).expect_err("malformed");
1934        assert!(matches!(err, PipelineError::Parse(_)));
1935        assert_eq!(pipe.table, before_table);
1936        assert_eq!(pipe.next_memory_counter, 0);
1937    }
1938
1939    #[test]
1940    fn bind_error_in_mid_batch_rolls_back_all_prior_allocations() {
1941        let mut pipe = Pipeline::new();
1942        // First form allocates @x as Agent (subject slot) and @rel as
1943        // Predicate. Second form reuses @x in the predicate slot, where
1944        // it must be Predicate — kind mismatch.
1945        let input = "
1946            (sem @x @rel @y :src @observation :c 0.8 :v 2024-01-15)
1947            (sem @alice @x @z :src @observation :c 0.8 :v 2024-01-16)
1948        ";
1949        let err = pipe.compile_batch(input, now()).expect_err("kind mismatch");
1950        assert!(matches!(err, PipelineError::Bind(_)));
1951        // Batch rollback: no symbol from this batch committed.
1952        assert!(pipe.table.lookup("x").is_none());
1953        assert!(pipe.table.lookup("rel").is_none());
1954        assert!(pipe.table.lookup("y").is_none());
1955        assert_eq!(pipe.next_memory_counter, 0);
1956    }
1957
1958    #[test]
1959    fn semantic_error_rolls_back_all_prior_allocations() {
1960        let mut pipe = Pipeline::new();
1961        // Registry bound is 0.98; request 0.99 — semantic rejects.
1962        let input = "(sem @a @knows @b :src @registry :c 0.99 :v 2024-01-15)";
1963        let err = pipe
1964            .compile_batch(input, now())
1965            .expect_err("conf over bound");
1966        assert!(matches!(err, PipelineError::Semantic(_)));
1967        assert!(pipe.table.lookup("a").is_none());
1968        assert!(pipe.table.lookup("b").is_none());
1969    }
1970
1971    #[test]
1972    fn successful_batch_commits_table_and_counter() {
1973        let mut pipe = Pipeline::new();
1974        let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
1975        assert!(pipe.table.lookup("alice").is_some());
1976        assert_eq!(pipe.next_memory_counter, 1);
1977
1978        let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1979        let _ = pipe.compile_batch(input2, now()).expect("second");
1980        // Alice from first batch persisted — reused, not reallocated.
1981        assert_eq!(pipe.next_memory_counter, 2);
1982    }
1983
1984    #[test]
1985    fn successive_calls_produce_distinct_memory_ids() {
1986        let mut pipe = Pipeline::new();
1987        let r1 = pipe.compile_batch(SEM_OK, now()).expect("first");
1988        let r2 = pipe
1989            .compile_batch(
1990                "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-01-16)",
1991                now(),
1992            )
1993            .expect("second");
1994        let Some(CanonicalRecord::Sem(s1)) = memory_records(&r1).first().copied() else {
1995            panic!("expected Sem in first batch");
1996        };
1997        let Some(CanonicalRecord::Sem(s2)) = memory_records(&r2).first().copied() else {
1998            panic!("expected Sem in second batch");
1999        };
2000        assert_ne!(s1.memory_id, s2.memory_id);
2001    }
2002
2003    #[test]
2004    fn retire_form_emits_symbol_retire_not_error() {
2005        // A retire form commits as a SymbolRetire canonical record via
2006        // the bind journal; it does NOT surface a memory-level record
2007        // and does NOT return `EmitError::Unsupported`.
2008        let mut pipe = Pipeline::new();
2009        let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
2010        let records = pipe
2011            .compile_batch("(retire @alice)", now())
2012            .expect("retire supported");
2013        // Exactly one SymbolRetire; no memory records.
2014        assert!(records.iter().any(|r| r.opcode() == Opcode::SymbolRetire));
2015        assert!(memory_records(&records).is_empty());
2016    }
2017
2018    #[test]
2019    fn same_input_produces_byte_identical_records() {
2020        let input = "
2021            (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
2022            (sem @alice @knows @carol :src @observation :c 0.7 :v 2024-01-16)
2023        ";
2024        let fixed_now = now();
2025        let mut pipe_a = Pipeline::new();
2026        let mut pipe_b = Pipeline::new();
2027        let a = pipe_a.compile_batch(input, fixed_now).expect("a");
2028        let b = pipe_b.compile_batch(input, fixed_now).expect("b");
2029        assert_eq!(a, b);
2030    }
2031
2032    #[test]
2033    fn clocks_populated_from_now_parameter() {
2034        let mut pipe = Pipeline::new();
2035        // `now` must be >= the form's `:v` date or semantic rejects as
2036        // future-validity; set both consistently.
2037        let t = now();
2038        let records = pipe.compile_batch(SEM_OK, t).expect("compile");
2039        let Some(CanonicalRecord::Sem(sem)) = memory_records(&records).first().copied() else {
2040            panic!("expected Sem");
2041        };
2042        assert_eq!(sem.clocks.observed_at, t);
2043        assert_eq!(sem.clocks.committed_at, t);
2044        assert_eq!(sem.clocks.invalid_at, None);
2045    }
2046
2047    #[test]
2048    fn first_batch_uses_wall_clock_as_committed_at() {
2049        let mut pipe = Pipeline::new();
2050        assert_eq!(pipe.last_committed_at(), None);
2051        let t = now();
2052        let _ = pipe.compile_batch(SEM_OK, t).expect("compile");
2053        assert_eq!(pipe.last_committed_at(), Some(t));
2054    }
2055
2056    #[test]
2057    fn monotonic_commit_clock_bumps_past_regressing_wall_clock() {
2058        // temporal-model.md § 9.2 / § 12 #1: committed_at must be
2059        // strictly monotonic per workspace even when the host wall
2060        // clock regresses (NTP correction, VM clock warp, etc.).
2061        let mut pipe = Pipeline::new();
2062        let t1 = ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel");
2063        let t_regressed = ClockTime::try_from_millis(1_713_350_300_000).expect("non-sentinel");
2064
2065        let first = pipe
2066            .compile_batch(SEM_OK, t1)
2067            .expect("first batch commits at t1");
2068        let first_sem = first.iter().find_map(|r| match r {
2069            CanonicalRecord::Sem(s) => Some(s),
2070            _ => None,
2071        });
2072        assert_eq!(first_sem.expect("sem").clocks.committed_at, t1);
2073
2074        // Second batch submitted with wall clock that regressed; the
2075        // librarian must bump committed_at to t1 + 1 ms. Use a
2076        // distinct predicate so auto-supersession doesn't interfere.
2077        let second = pipe
2078            .compile_batch(
2079                "(sem @alice @likes @carol :src @observation :c 0.8 :v 2024-01-15)",
2080                t_regressed,
2081            )
2082            .expect("second batch");
2083        let second_sem = second.iter().find_map(|r| match r {
2084            CanonicalRecord::Sem(s) => Some(s),
2085            _ => None,
2086        });
2087        let expected = ClockTime::try_from_millis(t1.as_millis() + 1).expect("non-sentinel");
2088        assert_eq!(second_sem.expect("sem").clocks.committed_at, expected);
2089        assert_eq!(pipe.last_committed_at(), Some(expected));
2090    }
2091
2092    #[test]
2093    fn identical_wall_clock_across_batches_still_bumps_committed_at() {
2094        let mut pipe = Pipeline::new();
2095        let t = now();
2096        let _ = pipe.compile_batch(SEM_OK, t).expect("first");
2097        // Same `t` on purpose — two batches in the same millisecond.
2098        // Use a distinct predicate so Semantic auto-supersession
2099        // doesn't reject this as a (s, p, valid_at) conflict — we're
2100        // testing the commit-clock bump, not supersession.
2101        let second = pipe
2102            .compile_batch(
2103                "(sem @alice @likes @dave :src @observation :c 0.8 :v 2024-01-15)",
2104                t,
2105            )
2106            .expect("second");
2107        let second_sem = second.iter().find_map(|r| match r {
2108            CanonicalRecord::Sem(s) => Some(s),
2109            _ => None,
2110        });
2111        assert_eq!(
2112            second_sem.expect("sem").clocks.committed_at.as_millis(),
2113            t.as_millis() + 1
2114        );
2115    }
2116
2117    #[test]
2118    fn failed_batch_does_not_advance_commit_watermark() {
2119        let mut pipe = Pipeline::new();
2120        let t1 = now();
2121        let _ = pipe.compile_batch(SEM_OK, t1).expect("seed");
2122        let watermark_before = pipe.last_committed_at();
2123        // This batch fails in semantic (confidence > registry bound).
2124        let t2 = ClockTime::try_from_millis(t1.as_millis() + 10_000).expect("non-sentinel");
2125        let err = pipe
2126            .compile_batch(
2127                "(sem @alice @knows @bob :src @registry :c 0.99 :v 2024-01-15)",
2128                t2,
2129            )
2130            .expect_err("semantic reject");
2131        assert!(matches!(err, PipelineError::Semantic(_)));
2132        // Watermark must not have advanced — the monotonic commit
2133        // clock is part of the batch-atomic state per § 11.3.
2134        assert_eq!(pipe.last_committed_at(), watermark_before);
2135    }
2136
2137    #[test]
2138    fn monotonic_commit_clock_helper_returns_wall_clock_when_unset() {
2139        let t = now();
2140        assert_eq!(monotonic_commit_clock(t, None).expect("fresh"), t);
2141    }
2142
2143    #[test]
2144    fn monotonic_commit_clock_helper_bumps_when_wall_clock_not_ahead() {
2145        let prev = ClockTime::try_from_millis(1_000_000).expect("non-sentinel");
2146        // Wall clock exactly at prev.
2147        let at = monotonic_commit_clock(prev, Some(prev)).expect("bump");
2148        assert_eq!(at.as_millis(), 1_000_001);
2149        // Wall clock behind prev.
2150        let behind = ClockTime::try_from_millis(500_000).expect("non-sentinel");
2151        let at = monotonic_commit_clock(behind, Some(prev)).expect("bump");
2152        assert_eq!(at.as_millis(), 1_000_001);
2153    }
2154
2155    #[test]
2156    fn clock_exhaustion_returns_typed_error() {
2157        // `ClockTime::try_from_millis` refuses `u64::MAX` (reserved
2158        // sentinel), so the bump rule at `MAX - 1` must surface the
2159        // typed error rather than panic.
2160        let max_valid = ClockTime::try_from_millis(u64::MAX - 1).expect("non-sentinel");
2161        let err = monotonic_commit_clock(max_valid, Some(max_valid))
2162            .expect_err("must exhaust past MAX - 1");
2163        let PipelineError::ClockExhausted { last_committed_at } = err else {
2164            panic!("expected ClockExhausted, got {err:?}");
2165        };
2166        assert_eq!(last_committed_at, max_valid);
2167    }
2168
2169    #[test]
2170    fn semantic_future_validity_uses_wall_clock_not_monotonic_watermark() {
2171        // Discriminating scenario: place `valid_at` strictly between
2172        // the regressed wall clock and the inflated monotonic
2173        // watermark.
2174        //
2175        //   wall_now      < valid_at < watermark = effective_now
2176        //
2177        // Under the wall-clock choice (the spec's intent per § 9.3 —
2178        // "is this future relative to the librarian's estimate of
2179        // current time?"), semantic must reject with `FutureValidity`.
2180        // Under an (incorrect) monotonic-watermark choice, the check
2181        // would pass because valid_at < watermark. This test fails if
2182        // a future refactor silently swaps to the watermark.
2183        let mut pipe = Pipeline::new();
2184        // Inflate the watermark to 2024-01-15T00:10:00Z.
2185        let seed_wall = ClockTime::try_from_millis(1_705_277_400_000).expect("non-sentinel");
2186        let _ = pipe
2187            .compile_batch(
2188                "(sem @seed_a @seed_r @seed_b :src @observation :c 0.8 :v 2024-01-14)",
2189                seed_wall,
2190            )
2191            .expect("seed");
2192        let watermark = pipe.last_committed_at().expect("set");
2193
2194        // Regress wall clock to 2024-01-15T00:00:00Z.
2195        let regressed_wall = ClockTime::try_from_millis(1_705_276_800_000).expect("non-sentinel");
2196        assert!(regressed_wall < watermark);
2197
2198        // valid_at = 2024-01-15T00:05:00Z — future relative to
2199        // regressed_wall but past relative to watermark. Wall-clock
2200        // semantic must reject.
2201        let err = pipe
2202            .compile_batch(
2203                "(sem @alice @knows @emma :src @observation :c 0.8 :v 2024-01-15T00:05:00Z)",
2204                regressed_wall,
2205            )
2206            .expect_err("must reject future valid_at under regressed wall clock");
2207        assert!(
2208            matches!(
2209                err,
2210                PipelineError::Semantic(SemanticError::FutureValidity { .. })
2211            ),
2212            "expected FutureValidity, got {err:?}"
2213        );
2214
2215        // Sanity: the same form with a past valid_at must still be
2216        // accepted, and committed_at must advance past the watermark.
2217        let records = pipe
2218            .compile_batch(
2219                "(sem @alice @knows @emma :src @observation :c 0.8 :v 2024-01-14)",
2220                regressed_wall,
2221            )
2222            .expect("past valid_at under regressed wall clock must succeed");
2223        let sem = records.iter().find_map(|r| match r {
2224            CanonicalRecord::Sem(s) => Some(s),
2225            _ => None,
2226        });
2227        assert!(sem.expect("sem").clocks.committed_at > watermark);
2228    }
2229
2230    // ----------------------------------------------------------------
2231    // 6.3a — Semantic auto-supersession
2232    // ----------------------------------------------------------------
2233
2234    fn sem_records(records: &[CanonicalRecord]) -> Vec<&SemRecord> {
2235        records
2236            .iter()
2237            .filter_map(|r| match r {
2238                CanonicalRecord::Sem(s) => Some(s),
2239                _ => None,
2240            })
2241            .collect()
2242    }
2243
2244    fn supersedes_edges(records: &[CanonicalRecord]) -> Vec<&EdgeRecord> {
2245        records
2246            .iter()
2247            .filter_map(|r| match r {
2248                CanonicalRecord::Supersedes(e) => Some(e),
2249                _ => None,
2250            })
2251            .collect()
2252    }
2253
2254    fn stale_parent_edges(records: &[CanonicalRecord]) -> Vec<&EdgeRecord> {
2255        records
2256            .iter()
2257            .filter_map(|r| match r {
2258                CanonicalRecord::StaleParent(e) => Some(e),
2259                _ => None,
2260            })
2261            .collect()
2262    }
2263
2264    #[test]
2265    fn inf_against_current_parent_is_not_born_stale() {
2266        let mut pipe = Pipeline::new();
2267        pipe.compile_batch(SEM_OK, now()).expect("sem");
2268        let inf_src = "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)";
2269        let records = pipe
2270            .compile_batch(inf_src, later_now())
2271            .expect("inferential");
2272        let inf = records
2273            .iter()
2274            .find_map(|r| match r {
2275                CanonicalRecord::Inf(i) => Some(i),
2276                _ => None,
2277            })
2278            .expect("inf emitted");
2279        assert!(!inf.flags.stale, "parent still current — not born stale");
2280    }
2281
2282    #[test]
2283    fn supersession_emits_stale_parent_edges_to_dependent_inferentials() {
2284        let mut pipe = Pipeline::new();
2285        pipe.compile_batch(SEM_OK, now()).expect("first sem");
2286        // Inferential derived from the first sem (__mem_0).
2287        pipe.compile_batch(
2288            "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)",
2289            later_now(),
2290        )
2291        .expect("inf");
2292        // Supersede the first Sem with a newer one at same (s, p).
2293        let records = pipe
2294            .compile_batch(
2295                "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-02-01)",
2296                even_later_now(),
2297            )
2298            .expect("supersede");
2299        let stales = stale_parent_edges(&records);
2300        assert_eq!(
2301            stales.len(),
2302            1,
2303            "one dependent Inferential should receive a StaleParent edge"
2304        );
2305        let inf_id = pipe.table().lookup("__mem_1").expect("inf id");
2306        let old_parent = pipe.table().lookup("__mem_0").expect("parent id");
2307        assert_eq!(stales[0].from, inf_id);
2308        assert_eq!(stales[0].to, old_parent);
2309    }
2310
2311    #[test]
2312    fn inf_born_stale_when_parent_already_superseded() {
2313        let mut pipe = Pipeline::new();
2314        pipe.compile_batch(SEM_OK, now()).expect("first sem");
2315        pipe.compile_batch(
2316            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-02-01)",
2317            later_now(),
2318        )
2319        .expect("supersede — __mem_0 is now superseded");
2320        // Now create an Inf against the already-superseded parent.
2321        let records = pipe
2322            .compile_batch(
2323                "(inf @alice @likes @coffee (@__mem_0) @majority_vote :c 0.7 :v 2024-01-15)",
2324                even_later_now(),
2325            )
2326            .expect("inf");
2327        let inf = records
2328            .iter()
2329            .find_map(|r| match r {
2330                CanonicalRecord::Inf(i) => Some(i),
2331                _ => None,
2332            })
2333            .expect("inf");
2334        assert!(
2335            inf.flags.stale,
2336            "Inferential born from already-superseded parent must carry stale=true"
2337        );
2338    }
2339
2340    fn later_now() -> ClockTime {
2341        ClockTime::try_from_millis(1_713_350_400_000 + 1_000).expect("non-sentinel")
2342    }
2343
2344    fn even_later_now() -> ClockTime {
2345        ClockTime::try_from_millis(1_713_350_400_000 + 2_000).expect("non-sentinel")
2346    }
2347
2348    #[test]
2349    fn sem_with_fresh_sp_does_not_emit_supersedes_edge() {
2350        let mut pipe = Pipeline::new();
2351        let records = pipe.compile_batch(SEM_OK, now()).expect("first");
2352        assert!(
2353            supersedes_edges(&records).is_empty(),
2354            "first write at (s, p) has nothing to supersede"
2355        );
2356        assert_eq!(pipe.dag().len(), 0);
2357    }
2358
2359    #[test]
2360    fn forward_sem_emits_supersedes_edge_and_updates_index() {
2361        // First write `@alice @knows @bob` at 2024-01-15; second at
2362        // 2024-03-01 with same (s, p) supersedes forward. Both
2363        // valid_ats are before `now()` (2024-04-17) so semantic
2364        // future-validity doesn't reject.
2365        let mut pipe = Pipeline::new();
2366        let first = pipe.compile_batch(SEM_OK, now()).expect("first");
2367        let first_mem = sem_records(&first)[0].memory_id;
2368
2369        let second_input = "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)";
2370        let second = pipe.compile_batch(second_input, now()).expect("second");
2371        let sems = sem_records(&second);
2372        assert_eq!(sems.len(), 1);
2373        let second_mem = sems[0].memory_id;
2374        // Forward supersession: new's invalid_at stays None.
2375        assert_eq!(sems[0].clocks.invalid_at, None);
2376
2377        let edges = supersedes_edges(&second);
2378        assert_eq!(edges.len(), 1, "exactly one Supersedes edge");
2379        assert_eq!(edges[0].from, second_mem);
2380        assert_eq!(edges[0].to, first_mem);
2381        // DAG mirrors the edge.
2382        assert_eq!(pipe.dag().len(), 1);
2383    }
2384
2385    #[test]
2386    fn retroactive_sem_sets_invalid_at_and_preserves_existing_as_current() {
2387        // First write with valid_at 2024-03-01. Second with valid_at
2388        // 2024-01-15 (earlier) is a retroactive correction: it's
2389        // valid only up to 2024-03-01 (`new.invalid_at = old.valid_at`),
2390        // and the index keeps pointing to the newer-valid_at memory.
2391        let mut pipe = Pipeline::new();
2392        let first_input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-03-01)";
2393        let first = pipe.compile_batch(first_input, now()).expect("first");
2394        let first_mem = sem_records(&first)[0].memory_id;
2395        let first_valid_at = sem_records(&first)[0].clocks.valid_at;
2396
2397        let retro_input = "(sem @alice @knows @zoe :src @observation :c 0.8 :v 2024-01-15)";
2398        let retro = pipe.compile_batch(retro_input, now()).expect("retro");
2399        let sems = sem_records(&retro);
2400        let retro_mem = sems[0].memory_id;
2401        assert_eq!(
2402            sems[0].clocks.invalid_at,
2403            Some(first_valid_at),
2404            "retroactive new memory's invalid_at closes at old's valid_at"
2405        );
2406
2407        let edges = supersedes_edges(&retro);
2408        assert_eq!(edges.len(), 1);
2409        assert_eq!(edges[0].from, retro_mem);
2410        assert_eq!(edges[0].to, first_mem);
2411
2412        // A third forward write at 2024-04-01 must still supersede
2413        // `first` (not `retro`) — the index preserved `first` as
2414        // current after the retroactive insert.
2415        let third_input = "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-04-01)";
2416        let third = pipe.compile_batch(third_input, now()).expect("third");
2417        let third_mem = sem_records(&third)[0].memory_id;
2418        let third_edges = supersedes_edges(&third);
2419        assert_eq!(third_edges.len(), 1);
2420        assert_eq!(third_edges[0].from, third_mem);
2421        assert_eq!(
2422            third_edges[0].to, first_mem,
2423            "forward supersession targets the most-recent-valid_at memory, not the retroactive one"
2424        );
2425    }
2426
2427    #[test]
2428    fn equal_valid_at_at_same_sp_returns_supersession_conflict() {
2429        let mut pipe = Pipeline::new();
2430        let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
2431        // Same (@alice, @knows) and same valid_at 2024-01-15 — single
2432        // writer per workspace, so this is a deterministic error.
2433        let err = pipe
2434            .compile_batch(SEM_OK, now())
2435            .expect_err("equal valid_at conflicts");
2436        assert!(
2437            matches!(
2438                err,
2439                PipelineError::Emit(EmitError::SemanticSupersessionConflict { .. })
2440            ),
2441            "expected SemanticSupersessionConflict, got {err:?}"
2442        );
2443    }
2444
2445    #[test]
2446    fn disjoint_sp_pairs_do_not_supersede_each_other() {
2447        let mut pipe = Pipeline::new();
2448        let _ = pipe.compile_batch(SEM_OK, now()).expect("first");
2449        // Different predicate — no supersession.
2450        let other = pipe
2451            .compile_batch(
2452                "(sem @alice @likes @bob :src @observation :c 0.8 :v 2024-01-15)",
2453                now(),
2454            )
2455            .expect("disjoint");
2456        assert!(supersedes_edges(&other).is_empty());
2457        assert_eq!(pipe.dag().len(), 0);
2458    }
2459
2460    #[test]
2461    fn forward_chain_produces_edge_per_link() {
2462        let mut pipe = Pipeline::new();
2463        let vs = ["2024-01-15", "2024-02-15", "2024-03-15", "2024-04-15"];
2464        for v in vs {
2465            let input = format!("(sem @alice @knows @bob :src @observation :c 0.8 :v {v})");
2466            let _ = pipe.compile_batch(&input, now()).expect("compile");
2467        }
2468        // Three supersession events across four writes.
2469        assert_eq!(pipe.dag().len(), 3);
2470    }
2471
2472    #[test]
2473    fn failed_batch_does_not_leak_edge_or_index_mutation() {
2474        // Discriminating test for the 6.2 clone-on-write contract as
2475        // applied to 6.3a's DAG + index mutations: form 1 passes all
2476        // stages and mutates both `working_dag` (edge) and
2477        // `working_index` (new (s, p) entry); form 2 passes parse +
2478        // bind + semantic but fails at EMIT because it lands on the
2479        // same (s, p, valid_at) that form 1 just wrote into
2480        // `working_index`. Per batch atomicity, BOTH mutations must
2481        // be dropped — neither the edge nor the index update can
2482        // survive into `self`.
2483        //
2484        // Without the DAG + index clones in `compile_batch`, form
2485        // 1's edge and index entry would leak past form 2's failure.
2486        let mut pipe = Pipeline::new();
2487        let _ = pipe.compile_batch(SEM_OK, now()).expect("seed");
2488        assert_eq!(pipe.dag().len(), 0, "seed did not supersede");
2489
2490        // Both forms pass semantic (confidence ≤ bound, valid_at is
2491        // past). Form 1 forward-supersedes the seed; form 2 hits the
2492        // same (s, p, valid_at) form 1 just wrote into working_index.
2493        let two_forms = "\
2494            (sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)\n\
2495            (sem @alice @knows @dan :src @observation :c 0.7 :v 2024-03-01)";
2496        let err = pipe
2497            .compile_batch(two_forms, now())
2498            .expect_err("emit conflict");
2499        assert!(
2500            matches!(
2501                err,
2502                PipelineError::Emit(EmitError::SemanticSupersessionConflict { .. })
2503            ),
2504            "expected SemanticSupersessionConflict from form 2, got {err:?}"
2505        );
2506
2507        // Form 1's edge did NOT land in self.dag.
2508        assert_eq!(pipe.dag().len(), 0, "failed batch must not leak edge");
2509
2510        // Form 1's index update did NOT land either — a fresh
2511        // follow-up commit at (alice, knows, 2024-03-01) must target
2512        // the SEED as its predecessor (not carol, which form 1 would
2513        // have made current).
2514        let clean = pipe
2515            .compile_batch(
2516                "(sem @alice @knows @eve :src @observation :c 0.8 :v 2024-03-01)",
2517                now(),
2518            )
2519            .expect("clean follow-up");
2520        assert_eq!(pipe.dag().len(), 1);
2521        let seed_memory = pipe.table.lookup("__mem_0").expect("seed mem alloc");
2522        assert_eq!(
2523            pipe.dag().edges()[0].to,
2524            seed_memory,
2525            "post-rollback commit still sees SEED as predecessor"
2526        );
2527        assert_eq!(sem_records(&clean)[0].memory_id, pipe.dag().edges()[0].from);
2528    }
2529
2530    // ----------------------------------------------------------------
2531    // 6.3b — Procedural auto-supersession
2532    // ----------------------------------------------------------------
2533
2534    const PRO_OK: &str = r#"(pro @rule_route "agent_write" "route_via_librarian"
2535        :scp @mimir :src @policy :c 1.0)"#;
2536
2537    fn pro_records(records: &[CanonicalRecord]) -> Vec<&ProRecord> {
2538        records
2539            .iter()
2540            .filter_map(|r| match r {
2541                CanonicalRecord::Pro(p) => Some(p),
2542                _ => None,
2543            })
2544            .collect()
2545    }
2546
2547    #[test]
2548    fn pro_fresh_rule_does_not_supersede() {
2549        let mut pipe = Pipeline::new();
2550        let records = pipe.compile_batch(PRO_OK, now()).expect("first pro");
2551        assert_eq!(pro_records(&records).len(), 1);
2552        assert!(supersedes_edges(&records).is_empty());
2553        assert_eq!(pipe.dag().len(), 0);
2554    }
2555
2556    #[test]
2557    fn pro_same_rule_id_triggers_supersession() {
2558        let mut pipe = Pipeline::new();
2559        let first = pipe.compile_batch(PRO_OK, now()).expect("first");
2560        let first_mem = pro_records(&first)[0].memory_id;
2561
2562        let second_input = r#"(pro @rule_route "other_trigger" "other_action"
2563            :scp @other_scope :src @policy :c 0.9)"#;
2564        let second = pipe.compile_batch(second_input, now()).expect("second");
2565        let second_mem = pro_records(&second)[0].memory_id;
2566
2567        let edges = supersedes_edges(&second);
2568        assert_eq!(edges.len(), 1, "same rule_id → one Supersedes edge");
2569        assert_eq!(edges[0].from, second_mem);
2570        assert_eq!(edges[0].to, first_mem);
2571        assert_eq!(pipe.dag().len(), 1);
2572    }
2573
2574    #[test]
2575    fn pro_same_trigger_scope_triggers_supersession() {
2576        let mut pipe = Pipeline::new();
2577        let first = pipe.compile_batch(PRO_OK, now()).expect("first");
2578        let first_mem = pro_records(&first)[0].memory_id;
2579
2580        // Different rule_id but identical trigger + scope ("agent_write" + @mimir) — § 5.2 secondary key.
2581        let second_input = r#"(pro @rule_other "agent_write" "different_action"
2582            :scp @mimir :src @policy :c 0.9)"#;
2583        let second = pipe.compile_batch(second_input, now()).expect("second");
2584        let second_mem = pro_records(&second)[0].memory_id;
2585
2586        let edges = supersedes_edges(&second);
2587        assert_eq!(edges.len(), 1, "same (trigger, scope) → one edge");
2588        assert_eq!(edges[0].from, second_mem);
2589        assert_eq!(edges[0].to, first_mem);
2590    }
2591
2592    #[test]
2593    fn pro_dual_key_match_supersedes_both_distinct_olds() {
2594        // OLD1: rule @rule_a, trigger "t_a", scope @scope_a
2595        // OLD2: rule @rule_b, trigger "t_b", scope @scope_b
2596        // NEW:  rule @rule_a, trigger "t_b", scope @scope_b
2597        // NEW matches OLD1 by rule_id AND OLD2 by (trigger, scope) — both must be superseded.
2598        let mut pipe = Pipeline::new();
2599        let old1 = pipe
2600            .compile_batch(
2601                r#"(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)"#,
2602                now(),
2603            )
2604            .expect("old1");
2605        let old1_mem = pro_records(&old1)[0].memory_id;
2606        let old2 = pipe
2607            .compile_batch(
2608                r#"(pro @rule_b "t_b" "act_b" :scp @scope_b :src @policy :c 1.0)"#,
2609                now(),
2610            )
2611            .expect("old2");
2612        let old2_mem = pro_records(&old2)[0].memory_id;
2613
2614        let new_input = r#"(pro @rule_a "t_b" "act_new" :scp @scope_b :src @policy :c 1.0)"#;
2615        let new = pipe.compile_batch(new_input, now()).expect("new");
2616        let new_mem = pro_records(&new)[0].memory_id;
2617
2618        let edges = supersedes_edges(&new);
2619        assert_eq!(edges.len(), 2, "dual-key match → two Supersedes edges");
2620        let targets: std::collections::BTreeSet<_> = edges.iter().map(|e| e.to).collect();
2621        assert!(targets.contains(&old1_mem));
2622        assert!(targets.contains(&old2_mem));
2623        for e in &edges {
2624            assert_eq!(e.from, new_mem);
2625        }
2626    }
2627
2628    #[test]
2629    fn pro_duplicate_cross_batch_commit_emits_one_edge() {
2630        // Commit the exact same Pro in two successive batches. OLD
2631        // sits at both indices (rule_id + (trigger, scope)); NEW's
2632        // two lookups converge to OLD. The dedup branch collapses
2633        // them to a single Supersedes edge — the only observable
2634        // shape of "both keys converge to same old," since non-dup
2635        // Pros are inserted under both their own keys and a second
2636        // Pro can only converge both lookups onto a single
2637        // predecessor by duplicating all of its keys.
2638        let mut pipe = Pipeline::new();
2639        let _ = pipe.compile_batch(PRO_OK, now()).expect("seed");
2640        let second = pipe.compile_batch(PRO_OK, now()).expect("same again");
2641        let edges = supersedes_edges(&second);
2642        assert_eq!(edges.len(), 1, "same memory matched twice → one edge");
2643    }
2644
2645    #[test]
2646    fn pro_intra_batch_same_rule_id_is_rejected() {
2647        // Two Pro forms in the same batch with identical rule_id
2648        // would share `committed_at`, producing a zero-duration
2649        // supersession. Per the Semantic analog (equal valid_at →
2650        // SemanticSupersessionConflict), v1 surfaces this
2651        // deterministically rather than silently accepting.
2652        let mut pipe = Pipeline::new();
2653        let two_forms = r#"
2654            (pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)
2655            (pro @rule_a "t_b" "act_b" :scp @scope_b :src @policy :c 1.0)
2656        "#;
2657        let err = pipe
2658            .compile_batch(two_forms, now())
2659            .expect_err("intra-batch rule_id conflict");
2660        assert!(
2661            matches!(
2662                err,
2663                PipelineError::Emit(EmitError::ProceduralSupersessionConflict { .. })
2664            ),
2665            "expected ProceduralSupersessionConflict, got {err:?}"
2666        );
2667        // Atomicity — form 1's index + DAG mutations must have rolled back.
2668        assert_eq!(pipe.dag().len(), 0);
2669    }
2670
2671    #[test]
2672    fn pro_intra_batch_same_trigger_scope_is_rejected() {
2673        // Same sanity check for the secondary key — two Pros in one
2674        // batch sharing (trigger, scope) but with distinct rule_ids.
2675        let mut pipe = Pipeline::new();
2676        let two_forms = r#"
2677            (pro @rule_a "shared_t" "act_a" :scp @shared_scope :src @policy :c 1.0)
2678            (pro @rule_b "shared_t" "act_b" :scp @shared_scope :src @policy :c 1.0)
2679        "#;
2680        let err = pipe
2681            .compile_batch(two_forms, now())
2682            .expect_err("intra-batch (trigger, scope) conflict");
2683        assert!(matches!(
2684            err,
2685            PipelineError::Emit(EmitError::ProceduralSupersessionConflict { .. })
2686        ));
2687        assert_eq!(pipe.dag().len(), 0);
2688    }
2689
2690    #[test]
2691    fn pro_supersession_clears_old_from_both_keys() {
2692        // After NEW supersedes OLD via rule_id match, OLD must no
2693        // longer be reachable via its OTHER key either.
2694        let mut pipe = Pipeline::new();
2695        let old = pipe
2696            .compile_batch(
2697                r#"(pro @rule_a "t_a" "act_a" :scp @scope_a :src @policy :c 1.0)"#,
2698                now(),
2699            )
2700            .expect("old");
2701        let old_mem = pro_records(&old)[0].memory_id;
2702
2703        // Supersede by rule_id.
2704        let _ = pipe
2705            .compile_batch(
2706                r#"(pro @rule_a "different" "new_act" :scp @different :src @policy :c 1.0)"#,
2707                now(),
2708            )
2709            .expect("super by rule");
2710
2711        // A third write that matches OLD's (trigger, scope) only.
2712        // OLD was cleared from both indices when superseded, so no
2713        // edge from this write should point to OLD.
2714        let third = pipe
2715            .compile_batch(
2716                r#"(pro @rule_fresh "t_a" "act_x" :scp @scope_a :src @policy :c 1.0)"#,
2717                now(),
2718            )
2719            .expect("third");
2720        for edge in supersedes_edges(&third) {
2721            assert_ne!(
2722                edge.to, old_mem,
2723                "already-superseded OLD must not be superseded again"
2724            );
2725        }
2726    }
2727}