Skip to main content

mimir_core/
store.rs

1//! Workspace store — the durable write path per
2//! `docs/concepts/write-protocol.md`.
3//!
4//! [`Store`] owns a [`Pipeline`] and the
5//! workspace's [`CanonicalLog`], and exposes
6//! one public operation: [`Store::commit_batch`]. The commit runs the
7//! full pipeline (parse → bind → semantic → emit), appends the emitted
8//! records plus a `CHECKPOINT` marker to the log, and fsyncs. Any
9//! failure at any phase is rolled back — the in-memory pipeline state
10//! and the log both revert to their pre-batch values.
11//!
12//! Durability surface:
13//!
14//! - Two-phase commit per spec § 4 (append records + `CHECKPOINT`,
15//!   fsync, ack).
16//! - Mid-batch rollback via log truncation + pipeline-state restore.
17//! - Recovery at open truncates crash-shaped orphan records past the
18//!   last committed `CHECKPOINT` (spec § 10) and rejects
19//!   non-recoverable corrupt tails without truncating them.
20//! - Symbol-table replay: `SYMBOL_*` records emitted by the bind
21//!   mutation journal (spec § 3.4) and the librarian-synthesized
22//!   `__mem_{n}` / `__ep_{n}` allocations are decoded on `Store::open`
23//!   and replayed into the pipeline's `SymbolTable`, restoring
24//!   durably-committed state across process restarts. The monotonic
25//!   memory and episode counters advance past the highest-numbered
26//!   reserved-prefix symbol in the log.
27//! - The `LogBackend` trait abstracts the filesystem primitives so
28//!   tests can inject faults on `append` / `sync` / `truncate`; see
29//!   the `FaultyLog` test backend in this module's tests.
30//! - The spec § 7 failure-mode matrix is covered: rows 3 / 6 / 7
31//!   directly (orphan memory record without `CHECKPOINT`; disk-full
32//!   on append; fsync returns error). Rows 1 / 2 / 5 / 8 collapse to
33//!   the recovery-on-next-open path already exercised by the reopen
34//!   tests. Row 4 (crash between `CHECKPOINT` append and fsync) is
35//!   physically untestable in user-space — its two possible outcomes
36//!   collapse to row 3 (bytes not durable → orphan) or row 5 (bytes
37//!   durable → committed).
38
39use std::path::Path;
40
41use thiserror::Error;
42
43use crate::canonical::{
44    decode_all, decode_record, encode_record, CanonicalRecord, CheckpointRecord, DecodeError,
45    EpisodeMetaRecord, SymbolEventRecord,
46};
47use crate::clock::ClockTime;
48use crate::log::{CanonicalLog, LogBackend, LogError};
49use crate::pipeline::{Pipeline, PipelineError};
50use crate::symbol::{SymbolId, SymbolKind};
51
52/// Identifier for one committed Episode. Wraps the [`SymbolId`] stored
53/// in the Episode's `CHECKPOINT` record.
54#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
55pub struct EpisodeId(SymbolId);
56
57impl EpisodeId {
58    /// The underlying symbol ID assigned to this Episode's `CHECKPOINT`
59    /// record.
60    #[must_use]
61    pub const fn as_symbol(self) -> SymbolId {
62        self.0
63    }
64}
65
66/// The workspace store — a [`LogBackend`] plus the `Pipeline` that
67/// produces its records. The default backend is [`CanonicalLog`] (real
68/// filesystem); tests and crash-injection harnesses parameterize with
69/// their own `LogBackend` implementation.
70pub struct Store<L: LogBackend = CanonicalLog> {
71    log: L,
72    pipeline: Pipeline,
73    next_episode_counter: u64,
74}
75
76impl Store<CanonicalLog> {
77    /// Open or create a workspace at `path`. Convenience constructor
78    /// that wires a real filesystem-backed [`CanonicalLog`].
79    ///
80    /// # Errors
81    ///
82    /// - [`StoreError::Log`] on any filesystem / I/O failure during
83    ///   open, scan, or truncate.
84    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
85        let log = CanonicalLog::open(path).map_err(StoreError::Log)?;
86        Self::from_backend(log)
87    }
88
89    /// Open or create a workspace-partitioned store under a shared
90    /// `data_root`. The log lands at
91    /// `data_root/<workspace_hex>/canonical.log` per
92    /// `workspace-model.md` § 4.2. Parent directories are created on
93    /// demand.
94    ///
95    /// Two `Store`s opened under the same `data_root` but different
96    /// [`WorkspaceId`](crate::WorkspaceId) values land in disjoint
97    /// directories — per spec § 2 the partition is **structural**, not
98    /// policy-enforced.
99    ///
100    /// # Errors
101    ///
102    /// - [`StoreError::Log`] on any filesystem / I/O failure.
103    pub fn open_in_workspace(
104        data_root: impl AsRef<Path>,
105        workspace_id: crate::WorkspaceId,
106    ) -> Result<Self, StoreError> {
107        // Use the full 32-byte hex digest for the directory name —
108        // `Display` only shows 8 bytes, which could collide on large
109        // workspace counts. The directory name is a filesystem path,
110        // not a human-facing identifier.
111        use std::fmt::Write;
112        let mut hex = String::with_capacity(workspace_id.as_bytes().len() * 2);
113        for b in workspace_id.as_bytes() {
114            // Writing to a String cannot fail; the result is ignored.
115            let _ = write!(hex, "{b:02x}");
116        }
117        let workspace_dir = data_root.as_ref().join(&hex);
118        std::fs::create_dir_all(&workspace_dir)
119            .map_err(|e| StoreError::Log(crate::log::LogError::Io(e)))?;
120        let log_path = workspace_dir.join("canonical.log");
121        let log = CanonicalLog::open(log_path).map_err(StoreError::Log)?;
122        Self::from_backend(log)
123    }
124}
125
126impl<L: LogBackend> Store<L> {
127    /// Construct a `Store` over an arbitrary [`LogBackend`]. On open,
128    /// crash-shaped orphan bytes past the last durable `CHECKPOINT`
129    /// are truncated (spec § 10 recovery step), non-recoverable tail
130    /// corruption is rejected without truncation, and `SYMBOL_*`
131    /// events from the committed log are replayed into the pipeline's
132    /// symbol table so workspace state fully reconstructs across
133    /// process restarts.
134    ///
135    /// # Errors
136    ///
137    /// - [`StoreError::Log`] on any backend I/O failure.
138    /// - [`StoreError::CorruptTail`] if non-recoverable bytes are
139    ///   found past the last durable `CHECKPOINT`.
140    /// - [`StoreError::Pipeline`] if replay of a `SYMBOL_*` record
141    ///   fails (log corruption).
142    pub fn from_backend(mut log: L) -> Result<Self, StoreError> {
143        let log_len_on_open = log.len();
144        let bytes_on_open = log.read_all().map_err(StoreError::Log)?;
145        let committed_end = Self::committed_end_for_open(&bytes_on_open)?;
146        if committed_end < bytes_on_open.len() {
147            let committed_end_u64 =
148                u64::try_from(committed_end).map_err(|_| StoreError::Log(LogError::LogOverflow))?;
149            let orphan_bytes = log_len_on_open - committed_end_u64;
150            log.truncate(committed_end_u64).map_err(StoreError::Log)?;
151            tracing::warn!(
152                target: "mimir.recovery.orphan_truncated",
153                log_len_before = log_len_on_open,
154                committed_end = committed_end_u64,
155                orphan_bytes,
156                "truncated orphan bytes past last CHECKPOINT on open",
157            );
158        }
159        // Replay SYMBOL_* events so the pipeline's table reflects
160        // durably-committed state. Counters are advanced past the
161        // highest-numbered `__mem_{n}` / `__ep_{n}` symbol seen.
162        // Tail recovery already discarded orphan bytes; a decode
163        // failure below this point is genuine corruption in the
164        // committed region of the log and routes to a distinct error
165        // variant so callers can distinguish it from I/O, truncation,
166        // or tail-corruption errors.
167        let records = decode_all(&bytes_on_open[..committed_end])?;
168        let mut pipeline = Pipeline::new();
169        let mut next_memory_counter = 0_u64;
170        let mut next_episode_counter = 0_u64;
171        let mut symbol_alloc_count = 0_u64;
172        let mut symbol_mutation_count = 0_u64;
173        let mut checkpoint_count = 0_u64;
174        for record in records {
175            // Restore the monotonic commit watermark from every
176            // replayed record's commit time so post-reopen batches
177            // keep the `committed_at` monotonicity invariant
178            // (temporal-model.md § 9.2 / § 12 #1).
179            pipeline.advance_last_committed_at(record.committed_at());
180            // Replay supersession edges into the DAG, checking
181            // acyclicity (§ 6.2 #1). If an edge appears before the
182            // first batch has advanced the DAG, this is trivially OK.
183            if let Some(edge) = crate::dag::Edge::try_from_record(&record) {
184                pipeline.replay_edge(edge)?;
185            }
186            // Replay memory records into the supersession-detection
187            // indices so post-open batches can auto-supersede (§ 5).
188            pipeline.replay_memory_record(&record);
189            // Flag events update the pinned / authoritative sets.
190            pipeline.replay_flag(&record);
191            match record {
192                CanonicalRecord::SymbolAlloc(event) => {
193                    pipeline
194                        .replay_allocate(event.symbol_id, event.name.clone(), event.symbol_kind)
195                        .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
196                    Self::advance_reserved_counter("__mem_", &event.name, &mut next_memory_counter);
197                    Self::advance_reserved_counter("__ep_", &event.name, &mut next_episode_counter);
198                    symbol_alloc_count += 1;
199                }
200                CanonicalRecord::SymbolAlias(event) => {
201                    pipeline
202                        .replay_alias(event.symbol_id, event.name)
203                        .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
204                    symbol_mutation_count += 1;
205                }
206                CanonicalRecord::SymbolRename(event) => {
207                    pipeline
208                        .replay_rename(event.symbol_id, event.name)
209                        .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
210                    symbol_mutation_count += 1;
211                }
212                CanonicalRecord::SymbolRetire(event) => {
213                    pipeline
214                        .replay_retire(event.symbol_id, event.name)
215                        .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
216                    symbol_mutation_count += 1;
217                }
218                CanonicalRecord::Checkpoint(cp) => {
219                    // Register the replayed Episode with the pipeline
220                    // so post-open Episode-scoped reads see it.
221                    pipeline.register_episode(cp.episode_id, cp.at);
222                    checkpoint_count += 1;
223                }
224                CanonicalRecord::EpisodeMeta(meta) => {
225                    // Restore the Episode index too — register_episode
226                    // is idempotent if the following Checkpoint
227                    // re-registers with the same clock.
228                    pipeline.register_episode(meta.episode_id, meta.at);
229                    if let Some(parent) = meta.parent_episode_id {
230                        pipeline.register_episode_parent(meta.episode_id, parent);
231                    }
232                }
233                _ => {}
234            }
235        }
236        pipeline.set_next_memory_counter(next_memory_counter);
237        // Emit a recovery summary only when there's actually committed
238        // state to report — a fresh store should stay silent.
239        if symbol_alloc_count > 0 || symbol_mutation_count > 0 || checkpoint_count > 0 {
240            tracing::info!(
241                target: "mimir.recovery.symbol_replay",
242                symbol_alloc_count,
243                symbol_mutation_count,
244                checkpoint_count,
245                next_memory_counter,
246                next_episode_counter,
247                "replayed committed log into pipeline on open",
248            );
249        }
250        Ok(Self {
251            log,
252            pipeline,
253            next_episode_counter,
254        })
255    }
256
257    fn committed_end_for_open(bytes: &[u8]) -> Result<usize, StoreError> {
258        let mut pos = 0_usize;
259        let mut last_checkpoint_end = 0_usize;
260        while pos < bytes.len() {
261            match decode_record(&bytes[pos..]) {
262                Ok((record, consumed)) => {
263                    pos += consumed;
264                    if matches!(record, CanonicalRecord::Checkpoint(_)) {
265                        last_checkpoint_end = pos;
266                    }
267                }
268                Err(source) if Self::is_recoverable_tail_decode_error(&source) => {
269                    return Ok(last_checkpoint_end);
270                }
271                Err(source) => {
272                    let offset =
273                        u64::try_from(pos).map_err(|_| StoreError::Log(LogError::LogOverflow))?;
274                    return Err(StoreError::CorruptTail { offset, source });
275                }
276            }
277        }
278        Ok(last_checkpoint_end)
279    }
280
281    const fn is_recoverable_tail_decode_error(error: &DecodeError) -> bool {
282        matches!(
283            error,
284            DecodeError::Truncated { .. } | DecodeError::LengthMismatch { .. }
285        )
286    }
287
288    fn advance_reserved_counter(prefix: &str, name: &str, counter: &mut u64) {
289        if let Some(suffix) = name.strip_prefix(prefix) {
290            if let Ok(n) = suffix.parse::<u64>() {
291                if n + 1 > *counter {
292                    *counter = n + 1;
293                }
294            }
295        }
296    }
297
298    /// Committed log length in bytes.
299    #[must_use]
300    pub fn log_len(&self) -> u64 {
301        self.log.len()
302    }
303
304    /// Read-only view of the underlying pipeline. Used by callers
305    /// that want to issue read-path queries (`execute_query`) or
306    /// inspect pipeline state without owning the whole store.
307    #[must_use]
308    pub fn pipeline(&self) -> &Pipeline {
309        &self.pipeline
310    }
311
312    /// Mutable view of the pipeline. Exposed so tests and
313    /// downstream callers can call `execute_query` (which needs
314    /// `&self`, not `&mut self`, but the mut accessor keeps the
315    /// door open for future read-path methods that do require
316    /// exclusive borrow).
317    pub fn pipeline_mut(&mut self) -> &mut Pipeline {
318        &mut self.pipeline
319    }
320
321    /// Compile a batch of agent input and commit it atomically.
322    ///
323    /// The two phases run under the workspace's single-writer invariant:
324    ///
325    /// 1. Pipeline compiles the input into a `Vec<CanonicalRecord>`. On
326    ///    pipeline error the pipeline's in-memory state is already
327    ///    auto-rolled-back (per `Pipeline::compile_batch`'s clone-on-
328    ///    write contract) and no log bytes have been written.
329    /// 2. Records + a `CHECKPOINT` marker are appended to the log.
330    /// 3. The log is fsynced. On success the batch is durable and the
331    ///    new Episode ID is returned; on fsync failure the log is
332    ///    truncated to its pre-batch offset and the pipeline's
333    ///    in-memory state is restored from a snapshot taken before
334    ///    step 1.
335    ///
336    /// # Errors
337    ///
338    /// - [`StoreError::Pipeline`] if parse / bind / semantic / emit
339    ///   rejected the batch. In-memory state is unchanged; log is
340    ///   untouched.
341    /// - [`StoreError::Log`] if the append / sync / truncate sequence
342    ///   failed at any step. In-memory pipeline state is restored to
343    ///   its pre-batch snapshot; log is truncated back to pre-batch.
344    pub fn commit_batch(&mut self, input: &str, now: ClockTime) -> Result<EpisodeId, StoreError> {
345        self.commit_batch_with_metadata(input, now, &EpisodeMetadata::default())
346    }
347
348    /// Commit a batch and attach agent-visible Episode metadata
349    /// (label, `parent_episode`, retracts) per `episode-semantics.md`
350    /// § 4.2 / § 5. Same commit semantics as [`Self::commit_batch`];
351    /// when `metadata` is non-empty, an `EpisodeMeta` canonical
352    /// record is emitted immediately before the `CHECKPOINT`.
353    ///
354    /// # Errors
355    ///
356    /// Same as [`Self::commit_batch`]. If `metadata.label` exceeds
357    /// the 256-byte cap (spec § 4.3) the commit fails with a
358    /// [`StoreError::InvalidEpisodeMetadata`] before any log writes.
359    pub fn commit_batch_with_metadata(
360        &mut self,
361        input: &str,
362        now: ClockTime,
363        metadata: &EpisodeMetadata,
364    ) -> Result<EpisodeId, StoreError> {
365        // observability.md: `mimir.commit.batch` span wraps the full
366        // commit. Fields recorded after each phase so timing stays
367        // attached even on error paths.
368        let span = tracing::info_span!(
369            "mimir.commit.batch",
370            log_offset_before = self.log.len(),
371            log_offset_after = tracing::field::Empty,
372            record_count = tracing::field::Empty,
373            episode_id = tracing::field::Empty,
374            fsync_micros = tracing::field::Empty,
375        );
376        let _enter = span.enter();
377
378        metadata.validate()?;
379        let pipeline_snapshot = self.pipeline.clone();
380        let episode_counter_snapshot = self.next_episode_counter;
381        let log_len_before = self.log.len();
382
383        // Phase 0: compile. compile_batch's internal clone-on-write
384        // means a pipeline error leaves self.pipeline untouched; a
385        // successful compile auto-applies the working state.
386        let records = self.pipeline.compile_batch(input, now)?;
387
388        // If the batch carried an `(episode :start …)` form, the
389        // pipeline captured its metadata. Merge with the explicit
390        // `metadata` arg — form-level metadata wins on conflict
391        // because the agent wrote it directly into the batch.
392        let pending = self.pipeline.take_pending_episode_metadata();
393        let mut resolved_meta = metadata.clone();
394        if let Some(p) = pending {
395            if p.label.is_some() {
396                resolved_meta.label = p.label;
397            }
398            if p.parent_episode.is_some() {
399                resolved_meta.parent_episode = p.parent_episode;
400            }
401            if !p.retracts.is_empty() {
402                resolved_meta.retracts = p.retracts;
403            }
404            // Re-validate since form-level label may exceed cap
405            // (bind already checks, but defence-in-depth).
406            resolved_meta.validate()?;
407        }
408
409        // The pipeline monotonically advances `committed_at` past any
410        // previous batch (temporal-model.md § 9.2). The checkpoint and
411        // episode-alloc records must use that same advanced clock —
412        // stamping them with raw wall-clock `now` would violate the
413        // per-workspace monotonicity invariant on a regressed clock.
414        let effective_now = self.pipeline.last_committed_at().unwrap_or(now);
415
416        // Phase 1: append each record plus a closing CHECKPOINT.
417        let episode_id = self
418            .pipeline
419            .allocate_episode_symbol(self.next_episode_counter)
420            .map_err(|e| {
421                // Compile succeeded and mutated the pipeline; roll back.
422                self.pipeline = pipeline_snapshot.clone();
423                self.next_episode_counter = episode_counter_snapshot;
424                StoreError::Pipeline(PipelineError::Emit(e))
425            })?;
426        self.next_episode_counter += 1;
427
428        let checkpoint = CheckpointRecord {
429            episode_id,
430            at: effective_now,
431            memory_count: memory_record_count(&records),
432        };
433
434        // Emit a SymbolAlloc record for the synthesized __ep_{n}
435        // episode symbol so replay can reconstruct it. This sits
436        // between the pipeline's journal-derived SymbolAlloc records
437        // and the memory records; replay treats it like any other
438        // SymbolAlloc.
439        let episode_alloc = CanonicalRecord::SymbolAlloc(SymbolEventRecord {
440            symbol_id: episode_id,
441            name: format!("__ep_{episode_counter_snapshot}"),
442            symbol_kind: SymbolKind::Memory,
443            at: effective_now,
444        });
445
446        let episode_meta = resolved_meta.to_record(episode_id, effective_now);
447
448        let mut buf = Vec::new();
449        encode_record(&episode_alloc, &mut buf);
450        for r in &records {
451            encode_record(r, &mut buf);
452        }
453        if let Some(ref meta_rec) = episode_meta {
454            encode_record(&CanonicalRecord::EpisodeMeta(meta_rec.clone()), &mut buf);
455        }
456        encode_record(&CanonicalRecord::Checkpoint(checkpoint), &mut buf);
457
458        if let Err(e) = self.log.append(&buf) {
459            self.rollback(&pipeline_snapshot, episode_counter_snapshot, log_len_before)?;
460            return Err(StoreError::Log(e));
461        }
462
463        // Phase 2: fsync. Per spec § 7, an fsync failure is treated as
464        // uncommitted — roll back log + pipeline.
465        let fsync_start = std::time::Instant::now();
466        if let Err(e) = self.log.sync() {
467            self.rollback(&pipeline_snapshot, episode_counter_snapshot, log_len_before)?;
468            return Err(StoreError::Log(e));
469        }
470        let fsync_micros = u64::try_from(fsync_start.elapsed().as_micros()).unwrap_or(u64::MAX);
471
472        // Post-commit: register the Episode's metadata with the
473        // pipeline so Episode-scoped reads (`read-protocol.md`
474        // § 4.1) can resolve `:in_episode` / `:after_episode` /
475        // `:before_episode` against this commit's clock.
476        self.pipeline.register_episode(episode_id, effective_now);
477        if let Some(ref meta_rec) = episode_meta {
478            if let Some(parent) = meta_rec.parent_episode_id {
479                self.pipeline.register_episode_parent(episode_id, parent);
480            }
481        }
482
483        span.record("log_offset_after", self.log.len());
484        span.record("record_count", records.len());
485        span.record("episode_id", tracing::field::display(episode_id));
486        span.record("fsync_micros", fsync_micros);
487
488        Ok(EpisodeId(episode_id))
489    }
490
491    /// Restore pipeline + episode-counter snapshot and truncate the log
492    /// back to `log_len_before`. Helper used by `commit_batch` on any
493    /// Phase 1 / Phase 2 failure.
494    fn rollback(
495        &mut self,
496        pipeline_snapshot: &Pipeline,
497        episode_counter_snapshot: u64,
498        log_len_before: u64,
499    ) -> Result<(), StoreError> {
500        self.pipeline = pipeline_snapshot.clone();
501        self.next_episode_counter = episode_counter_snapshot;
502        // Best-effort log truncation. If this fails too, the log has
503        // orphan bytes past log_len_before; recovery on the next open
504        // will truncate them via last_checkpoint_end(). Propagate the
505        // secondary error for diagnosability.
506        if self.log.len() > log_len_before {
507            self.log.truncate(log_len_before).map_err(StoreError::Log)?;
508        }
509        Ok(())
510    }
511}
512
513fn memory_record_count(records: &[CanonicalRecord]) -> u64 {
514    records
515        .iter()
516        .filter(|record| {
517            matches!(
518                record,
519                CanonicalRecord::Sem(_)
520                    | CanonicalRecord::Epi(_)
521                    | CanonicalRecord::Pro(_)
522                    | CanonicalRecord::Inf(_)
523            )
524        })
525        .count() as u64
526}
527
528/// Errors produced by [`Store`].
529#[derive(Debug, Error)]
530pub enum StoreError {
531    /// A pipeline stage (parse / bind / semantic / emit) rejected the
532    /// batch. In-memory state and log are both untouched.
533    #[error("pipeline error: {0}")]
534    Pipeline(#[from] PipelineError),
535
536    /// A filesystem / I/O failure during append, sync, or truncate.
537    /// On commit-time failures the pipeline and log are rolled back to
538    /// their pre-batch state before this error is returned.
539    #[error("log error: {0}")]
540    Log(#[from] LogError),
541
542    /// Non-recoverable bytes were found after the last durable
543    /// `CHECKPOINT` during `Store::open`. Unlike crash-shaped orphan
544    /// tails (`Truncated` / `LengthMismatch`) or valid uncommitted
545    /// records, these bytes are preserved for inspection or restore
546    /// rather than silently truncated.
547    #[error("corrupt canonical log tail at offset {offset}: {source}")]
548    CorruptTail {
549        /// Logical byte offset where corrupt tail decoding failed.
550        offset: u64,
551        /// The underlying [`DecodeError`] from `canonical::decode_record`.
552        source: DecodeError,
553    },
554
555    /// The committed portion of the log (bytes before the last
556    /// `CHECKPOINT` fsync) failed to decode during `Store::open`. This
557    /// is distinct from tail recovery and indicates genuine
558    /// corruption in the durable store.
559    #[error("committed canonical log failed to decode: {source}")]
560    CorruptCommittedLog {
561        /// The underlying [`DecodeError`] from `canonical::decode_all`.
562        #[from]
563        source: crate::canonical::DecodeError,
564    },
565
566    /// A supersession edge replayed from the committed log failed its
567    /// acyclicity check. The on-disk edges are expected to satisfy
568    /// `temporal-model.md` § 6.2 invariant #1; surfacing as a typed
569    /// error on open keeps silent invariant violations out of the
570    /// reopened store.
571    #[error("supersession DAG replay failed: {source}")]
572    DagReplay {
573        /// The underlying [`DagError`](crate::dag::DagError).
574        #[from]
575        source: crate::dag::DagError,
576    },
577
578    /// Supplied [`EpisodeMetadata`] violates a
579    /// `episode-semantics.md` constraint — e.g. a `label` exceeding
580    /// the 256-byte cap (§ 4.3).
581    #[error("invalid episode metadata: {reason}")]
582    InvalidEpisodeMetadata {
583        /// Human-readable description of the failed constraint.
584        reason: &'static str,
585    },
586}
587
588/// Agent-supplied Episode metadata. Passed into
589/// [`Store::commit_batch_with_metadata`] to attach a label / parent /
590/// retracts to the next committed Episode. See
591/// `episode-semantics.md` § 3.2 / § 4.2 / § 5.
592#[derive(Clone, Debug, Default, PartialEq, Eq)]
593pub struct EpisodeMetadata {
594    /// Optional human-readable label.
595    pub label: Option<String>,
596    /// Optional parent Episode.
597    pub parent_episode: Option<SymbolId>,
598    /// Episodes this Episode retracts.
599    pub retracts: Vec<SymbolId>,
600}
601
602impl EpisodeMetadata {
603    /// Spec § 4.3 cap.
604    pub const MAX_LABEL_BYTES: usize = 256;
605
606    /// True if no metadata is attached; the commit path skips
607    /// emitting an `EpisodeMeta` record in this case.
608    #[must_use]
609    pub fn is_empty(&self) -> bool {
610        self.label.as_deref().is_none_or(str::is_empty)
611            && self.parent_episode.is_none()
612            && self.retracts.is_empty()
613    }
614
615    /// Spec § 4.3: labels cap at 256 bytes.
616    fn validate(&self) -> Result<(), StoreError> {
617        if let Some(label) = self.label.as_deref() {
618            if label.len() > Self::MAX_LABEL_BYTES {
619                return Err(StoreError::InvalidEpisodeMetadata {
620                    reason: "label exceeds 256-byte cap",
621                });
622            }
623        }
624        Ok(())
625    }
626
627    /// Convert to a canonical `EpisodeMetaRecord` for the given
628    /// Episode and commit time. Returns `None` when
629    /// [`Self::is_empty`] — no metadata record is emitted for bare
630    /// (implicit-Episode) commits.
631    fn to_record(&self, episode_id: SymbolId, at: ClockTime) -> Option<EpisodeMetaRecord> {
632        if self.is_empty() {
633            return None;
634        }
635        Some(EpisodeMetaRecord {
636            episode_id,
637            at,
638            label: self.label.clone().filter(|s| !s.is_empty()),
639            parent_episode_id: self.parent_episode,
640            retracts: self.retracts.clone(),
641        })
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::canonical::{decode_all, decode_record, CanonicalRecord};
649    use crate::read::{Framing, FramingSource, ReadFlags};
650    use tempfile::TempDir;
651
652    const SEM_OK: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
653    const SEM_OK_2: &str = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
654
655    fn fixed_now() -> ClockTime {
656        ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
657    }
658
659    fn open_fresh(dir: &TempDir) -> Store {
660        Store::open(dir.path().join("canonical.log")).expect("open")
661    }
662
663    // ----------------------------------------------------------
664    // FaultyLog — in-memory LogBackend with armable failure hooks.
665    // ----------------------------------------------------------
666
667    #[derive(Default)]
668    struct FaultyLog {
669        bytes: Vec<u8>,
670        fail_next_append: Option<std::io::ErrorKind>,
671        fail_next_sync: Option<std::io::ErrorKind>,
672        fail_next_truncate: Option<std::io::ErrorKind>,
673    }
674
675    impl FaultyLog {
676        fn new() -> Self {
677            Self::default()
678        }
679
680        fn arm_append_failure(&mut self, kind: std::io::ErrorKind) {
681            self.fail_next_append = Some(kind);
682        }
683
684        fn arm_sync_failure(&mut self, kind: std::io::ErrorKind) {
685            self.fail_next_sync = Some(kind);
686        }
687
688        fn arm_truncate_failure(&mut self, kind: std::io::ErrorKind) {
689            self.fail_next_truncate = Some(kind);
690        }
691    }
692
693    impl LogBackend for FaultyLog {
694        fn append(&mut self, bytes: &[u8]) -> Result<(), LogError> {
695            if let Some(kind) = self.fail_next_append.take() {
696                return Err(LogError::Io(std::io::Error::from(kind)));
697            }
698            self.bytes.extend_from_slice(bytes);
699            Ok(())
700        }
701
702        fn sync(&mut self) -> Result<(), LogError> {
703            if let Some(kind) = self.fail_next_sync.take() {
704                return Err(LogError::Io(std::io::Error::from(kind)));
705            }
706            Ok(())
707        }
708
709        fn truncate(&mut self, new_len: u64) -> Result<(), LogError> {
710            if let Some(kind) = self.fail_next_truncate.take() {
711                return Err(LogError::Io(std::io::Error::from(kind)));
712            }
713            let current = self.bytes.len() as u64;
714            if new_len > current {
715                return Err(LogError::TruncateBeyondEnd {
716                    requested: new_len,
717                    current,
718                });
719            }
720            let new_len_usize = usize::try_from(new_len).unwrap_or(self.bytes.len());
721            self.bytes.truncate(new_len_usize);
722            Ok(())
723        }
724
725        fn read_all(&mut self) -> Result<Vec<u8>, LogError> {
726            Ok(self.bytes.clone())
727        }
728
729        fn len(&self) -> u64 {
730            self.bytes.len() as u64
731        }
732
733        fn last_checkpoint_end(&mut self) -> Result<u64, LogError> {
734            let mut pos: usize = 0;
735            let mut last_checkpoint_end: u64 = 0;
736            while pos < self.bytes.len() {
737                match decode_record(&self.bytes[pos..]) {
738                    Ok((record, consumed)) => {
739                        pos += consumed;
740                        if matches!(record, CanonicalRecord::Checkpoint(_)) {
741                            last_checkpoint_end = pos as u64;
742                        }
743                    }
744                    Err(_) => break,
745                }
746            }
747            Ok(last_checkpoint_end)
748        }
749    }
750
751    #[test]
752    fn commit_single_batch_persists_records_and_checkpoint() {
753        let dir = TempDir::new().expect("tmp");
754        let mut store = open_fresh(&dir);
755        let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
756
757        // Log content: [SymbolAlloc...][Sem][Checkpoint]. Last record
758        // must be a Checkpoint; exactly one memory record (Sem).
759        let bytes = store.log.read_all().expect("read");
760        let records = decode_all(&bytes).expect("decode");
761        assert!(matches!(
762            records.last(),
763            Some(CanonicalRecord::Checkpoint(_))
764        ));
765        let checkpoint = records
766            .iter()
767            .find_map(|r| match r {
768                CanonicalRecord::Checkpoint(c) => Some(c),
769                _ => None,
770            })
771            .expect("checkpoint");
772        assert_eq!(
773            checkpoint.memory_count, 1,
774            "checkpoint memory_count must count memory records, not symbol events"
775        );
776        let mem_count = records
777            .iter()
778            .filter(|r| matches!(r, CanonicalRecord::Sem(_)))
779            .count();
780        assert_eq!(mem_count, 1);
781    }
782
783    #[test]
784    fn commit_registers_episode_with_pipeline() {
785        let dir = TempDir::new().expect("tmp");
786        let mut store = open_fresh(&dir);
787        let first = store.commit_batch(SEM_OK, fixed_now()).expect("first");
788        let second = store.commit_batch(SEM_OK_2, fixed_now()).expect("second");
789
790        // Both Episodes must be registered with the Pipeline for
791        // post-commit `:in_episode` / `:after_episode` reads to
792        // resolve. Query via the real `(query ...)` form.
793        let got1 = store
794            .pipeline_mut()
795            .execute_query("(query :in_episode @__ep_0)")
796            .expect("q1");
797        assert_eq!(got1.records.len(), 1, "first Episode holds SEM_OK");
798
799        let got2 = store
800            .pipeline_mut()
801            .execute_query("(query :after_episode @__ep_0)")
802            .expect("q2");
803        assert_eq!(got2.records.len(), 1, "SEM_OK_2 commits after __ep_0");
804
805        // Episode IDs are sequential synthesized symbols.
806        assert_ne!(first, second);
807    }
808
809    #[test]
810    fn replay_registers_episodes_with_pipeline() {
811        // Round-trip through `Store::open` — reopening should
812        // restore the episodes-by-committed-at index so post-reopen
813        // Episode-scoped reads still work.
814        let dir = TempDir::new().expect("tmp");
815        {
816            let mut store = open_fresh(&dir);
817            store.commit_batch(SEM_OK, fixed_now()).expect("first");
818        }
819        let mut reopened = open_fresh(&dir);
820        let got = reopened
821            .pipeline_mut()
822            .execute_query("(query :in_episode @__ep_0)")
823            .expect("query");
824        assert_eq!(
825            got.records.len(),
826            1,
827            "replay must re-register Episodes with the pipeline"
828        );
829    }
830
831    #[test]
832    fn commit_with_metadata_emits_episode_meta_record() {
833        let dir = TempDir::new().expect("tmp");
834        let mut store = open_fresh(&dir);
835        let first = store.commit_batch(SEM_OK, fixed_now()).expect("first");
836        let meta = EpisodeMetadata {
837            label: Some("design-session".into()),
838            parent_episode: Some(first.0),
839            retracts: Vec::new(),
840        };
841        store
842            .commit_batch_with_metadata(SEM_OK_2, fixed_now(), &meta)
843            .expect("second with metadata");
844
845        let bytes = store.log.read_all().expect("read");
846        let records = decode_all(&bytes).expect("decode");
847        let meta_count = records
848            .iter()
849            .filter(|r| matches!(r, CanonicalRecord::EpisodeMeta(_)))
850            .count();
851        assert_eq!(
852            meta_count, 1,
853            "only the metadata-carrying commit should emit an EpisodeMeta"
854        );
855        // Find the metadata record and inspect it.
856        let meta_rec = records
857            .iter()
858            .find_map(|r| match r {
859                CanonicalRecord::EpisodeMeta(m) => Some(m),
860                _ => None,
861            })
862            .expect("EpisodeMeta present");
863        assert_eq!(meta_rec.label.as_deref(), Some("design-session"));
864        assert_eq!(meta_rec.parent_episode_id, Some(first.0));
865    }
866
867    #[test]
868    fn episode_chain_walks_parent_links_after_replay() {
869        let dir = TempDir::new().expect("tmp");
870        // Commit three Episodes linked parent → child → grandchild.
871        let (first, second, third);
872        {
873            let mut store = open_fresh(&dir);
874            first = store.commit_batch(SEM_OK, fixed_now()).expect("first");
875            second = store
876                .commit_batch_with_metadata(
877                    "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)",
878                    fixed_now(),
879                    &EpisodeMetadata {
880                        label: None,
881                        parent_episode: Some(first.0),
882                        retracts: Vec::new(),
883                    },
884                )
885                .expect("second");
886            third = store
887                .commit_batch_with_metadata(
888                    "(sem @charlie @knows @dana :src @observation :c 0.7 :v 2024-01-17)",
889                    fixed_now(),
890                    &EpisodeMetadata {
891                        label: None,
892                        parent_episode: Some(second.0),
893                        retracts: Vec::new(),
894                    },
895                )
896                .expect("third");
897        }
898
899        // Reopen and query `:episode_chain @third` — should return
900        // memories from all three Episodes (third walks back to
901        // second walks back to first).
902        let mut reopened = open_fresh(&dir);
903        let _ = (first, second, third);
904        let got = reopened
905            .pipeline_mut()
906            .execute_query("(query :episode_chain @__ep_2)")
907            .expect("query");
908        assert_eq!(
909            got.records.len(),
910            3,
911            "episode_chain over three linked Episodes returns all three memories"
912        );
913    }
914
915    #[test]
916    fn label_exceeding_cap_rejects() {
917        let dir = TempDir::new().expect("tmp");
918        let mut store = open_fresh(&dir);
919        let bad_label = "x".repeat(EpisodeMetadata::MAX_LABEL_BYTES + 1);
920        let err = store
921            .commit_batch_with_metadata(
922                SEM_OK,
923                fixed_now(),
924                &EpisodeMetadata {
925                    label: Some(bad_label),
926                    parent_episode: None,
927                    retracts: Vec::new(),
928                },
929            )
930            .expect_err("label too long");
931        assert!(matches!(
932            err,
933            StoreError::InvalidEpisodeMetadata { reason } if reason.contains("256")
934        ));
935    }
936
937    #[test]
938    fn episode_start_form_writes_episode_meta_end_to_end() {
939        // `(episode :start :label …)` in the batch produces an
940        // `EpisodeMeta` record in the log.
941        let dir = TempDir::new().expect("tmp");
942        let mut store = open_fresh(&dir);
943        let input = r#"(episode :start :label "design-session")
944                       (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)"#;
945        store.commit_batch(input, fixed_now()).expect("commit");
946
947        let bytes = store.log.read_all().expect("read");
948        let records = decode_all(&bytes).expect("decode");
949        let meta = records
950            .iter()
951            .find_map(|r| match r {
952                CanonicalRecord::EpisodeMeta(m) => Some(m),
953                _ => None,
954            })
955            .expect("EpisodeMeta present");
956        assert_eq!(meta.label.as_deref(), Some("design-session"));
957    }
958
959    #[test]
960    fn episode_close_form_is_accepted_no_op() {
961        // `(episode :close)` parses valid and commits without
962        // emitting an EpisodeMeta record (no metadata to carry).
963        let dir = TempDir::new().expect("tmp");
964        let mut store = open_fresh(&dir);
965        let input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
966                       (episode :close)";
967        store.commit_batch(input, fixed_now()).expect("commit");
968
969        let bytes = store.log.read_all().expect("read");
970        let records = decode_all(&bytes).expect("decode");
971        let meta_count = records
972            .iter()
973            .filter(|r| matches!(r, CanonicalRecord::EpisodeMeta(_)))
974            .count();
975        assert_eq!(
976            meta_count, 0,
977            ":close alone carries no metadata; no EpisodeMeta record"
978        );
979    }
980
981    #[test]
982    fn episode_start_with_parent_links_chain() {
983        let dir = TempDir::new().expect("tmp");
984        let mut store = open_fresh(&dir);
985        let first = store
986            .commit_batch(
987                r#"(episode :start :label "parent")
988                   (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)"#,
989                fixed_now(),
990            )
991            .expect("parent");
992        // Second batch references the first Episode via the
993        // write-surface form.
994        let second_input = "(episode :start :parent_episode @__ep_0)\n\
995             (sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
996        store
997            .commit_batch(second_input, fixed_now())
998            .expect("child");
999
1000        // `:episode_chain @__ep_1` should return records from both
1001        // Episodes.
1002        let got = store
1003            .pipeline_mut()
1004            .execute_query("(query :episode_chain @__ep_1)")
1005            .expect("query");
1006        assert_eq!(
1007            got.records.len(),
1008            2,
1009            "chain walk returns both linked Episodes (got {first:?})"
1010        );
1011    }
1012
1013    #[test]
1014    fn episode_start_with_retracts_records_metadata() {
1015        let dir = TempDir::new().expect("tmp");
1016        let mut store = open_fresh(&dir);
1017        let _bad = store
1018            .commit_batch(SEM_OK, fixed_now())
1019            .expect("bad episode");
1020        // Next batch retracts the first Episode via the write surface.
1021        // Distinct valid_at avoids the equal-valid_at auto-supersession
1022        // conflict (spec § 5.1 — two memories at the same `(s, p)` can't
1023        // share valid_at under the single-writer invariant).
1024        let input = r#"(episode :start :label "correction" :retracts (@__ep_0))
1025                       (sem @alice @knows @charlie :src @observation :c 0.95 :v 2024-01-16)"#;
1026        store.commit_batch(input, fixed_now()).expect("correction");
1027
1028        let bytes = store.log.read_all().expect("read");
1029        let records = decode_all(&bytes).expect("decode");
1030        let meta = records
1031            .iter()
1032            .find_map(|r| match r {
1033                CanonicalRecord::EpisodeMeta(m) => Some(m),
1034                _ => None,
1035            })
1036            .expect("EpisodeMeta present on the correction batch");
1037        assert_eq!(meta.retracts.len(), 1);
1038        assert_eq!(meta.label.as_deref(), Some("correction"));
1039    }
1040
1041    #[test]
1042    fn two_episode_directives_in_one_batch_reject() {
1043        let dir = TempDir::new().expect("tmp");
1044        let mut store = open_fresh(&dir);
1045        let input = r#"(episode :start :label "a")
1046                       (episode :start :label "b")
1047                       (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)"#;
1048        let err = store
1049            .commit_batch(input, fixed_now())
1050            .expect_err("multiple episode directives must reject");
1051        assert!(matches!(
1052            err,
1053            StoreError::Pipeline(PipelineError::Semantic(
1054                crate::semantic::SemanticError::MultipleEpisodeDirectives { count: 2 }
1055            ))
1056        ));
1057    }
1058
1059    #[test]
1060    fn pin_suspends_decay_and_flags_authoritative() {
1061        // Ancient Sem that would normally decay below 0.5 effective;
1062        // pinning it should lift effective back to stored and surface
1063        // Framing::Authoritative { set_by: AgentPinned }.
1064        let dir = TempDir::new().expect("tmp");
1065        let mut store = open_fresh(&dir);
1066        let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1067        let _ = store.commit_batch(old_sem, fixed_now()).expect("old sem");
1068
1069        // Before pin — LOW_CONFIDENCE should fire (decay applies).
1070        let before = store
1071            .pipeline_mut()
1072            .execute_query("(query)")
1073            .expect("before");
1074        assert!(
1075            before.flags.contains(ReadFlags::LOW_CONFIDENCE),
1076            "decayed stored 0.8 should be < 0.5 before pin"
1077        );
1078
1079        // Pin the memory via the write surface.
1080        store
1081            .commit_batch("(pin @__mem_0 :actor @mira)", fixed_now())
1082            .expect("pin");
1083
1084        // After pin — decay suspended, flag clears, framing surfaces as Authoritative.
1085        let after = store
1086            .pipeline_mut()
1087            .execute_query("(query :show_framing true)")
1088            .expect("after");
1089        assert!(
1090            !after.flags.contains(ReadFlags::LOW_CONFIDENCE),
1091            "pin must suspend decay"
1092        );
1093        assert_eq!(after.framings.len(), 1);
1094        assert_eq!(
1095            after.framings[0],
1096            Framing::Authoritative {
1097                set_by: FramingSource::AgentPinned
1098            }
1099        );
1100    }
1101
1102    #[test]
1103    fn unpin_restores_decay() {
1104        let dir = TempDir::new().expect("tmp");
1105        let mut store = open_fresh(&dir);
1106        let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1107        store.commit_batch(old_sem, fixed_now()).expect("old sem");
1108        store
1109            .commit_batch("(pin @__mem_0 :actor @mira)", fixed_now())
1110            .expect("pin");
1111        store
1112            .commit_batch("(unpin @__mem_0 :actor @mira)", fixed_now())
1113            .expect("unpin");
1114
1115        let got = store
1116            .pipeline_mut()
1117            .execute_query("(query)")
1118            .expect("query");
1119        assert!(
1120            got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1121            "unpin should restore decay"
1122        );
1123    }
1124
1125    #[test]
1126    fn authoritative_set_surfaces_operator_framing() {
1127        let dir = TempDir::new().expect("tmp");
1128        let mut store = open_fresh(&dir);
1129        let sem = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
1130        store.commit_batch(sem, fixed_now()).expect("sem");
1131        store
1132            .commit_batch("(authoritative_set @__mem_0 :actor @operator)", fixed_now())
1133            .expect("auth-set");
1134
1135        let got = store
1136            .pipeline_mut()
1137            .execute_query("(query :show_framing true)")
1138            .expect("query");
1139        assert_eq!(got.framings.len(), 1);
1140        assert_eq!(
1141            got.framings[0],
1142            Framing::Authoritative {
1143                set_by: FramingSource::OperatorAuthoritative
1144            }
1145        );
1146    }
1147
1148    #[test]
1149    fn authoritative_clear_resumes_decay() {
1150        let dir = TempDir::new().expect("tmp");
1151        let mut store = open_fresh(&dir);
1152        let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1153        store.commit_batch(old_sem, fixed_now()).expect("sem");
1154        store
1155            .commit_batch("(authoritative_set @__mem_0 :actor @operator)", fixed_now())
1156            .expect("set");
1157        store
1158            .commit_batch(
1159                "(authoritative_clear @__mem_0 :actor @operator)",
1160                fixed_now(),
1161            )
1162            .expect("clear");
1163
1164        let got = store
1165            .pipeline_mut()
1166            .execute_query("(query)")
1167            .expect("query");
1168        assert!(
1169            got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1170            "clear should restore decay"
1171        );
1172    }
1173
1174    #[test]
1175    fn pin_replay_survives_reopen() {
1176        let dir = TempDir::new().expect("tmp");
1177        let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1178        {
1179            let mut store = open_fresh(&dir);
1180            store.commit_batch(old_sem, fixed_now()).expect("sem");
1181            store
1182                .commit_batch("(pin @__mem_0 :actor @mira)", fixed_now())
1183                .expect("pin");
1184        }
1185        let mut reopened = open_fresh(&dir);
1186        let got = reopened
1187            .pipeline_mut()
1188            .execute_query("(query :show_framing true)")
1189            .expect("reopened query");
1190        // Pin state must survive replay.
1191        assert_eq!(got.framings.len(), 1);
1192        assert_eq!(
1193            got.framings[0],
1194            Framing::Authoritative {
1195                set_by: FramingSource::AgentPinned
1196            }
1197        );
1198    }
1199
1200    #[test]
1201    fn multiple_commits_accumulate_in_log() {
1202        let dir = TempDir::new().expect("tmp");
1203        let mut store = open_fresh(&dir);
1204        let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1205        let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1206        let _ = store.commit_batch(input2, fixed_now()).expect("second");
1207
1208        let bytes = store.log.read_all().expect("read");
1209        let records = decode_all(&bytes).expect("decode");
1210        // Two checkpoints and two Sems, intermingled with SymbolAlloc
1211        // records at the start of each batch.
1212        let checkpoints = records
1213            .iter()
1214            .filter(|r| matches!(r, CanonicalRecord::Checkpoint(_)))
1215            .count();
1216        assert_eq!(checkpoints, 2);
1217        let sems = records
1218            .iter()
1219            .filter(|r| matches!(r, CanonicalRecord::Sem(_)))
1220            .count();
1221        assert_eq!(sems, 2);
1222    }
1223
1224    #[test]
1225    fn pipeline_error_does_not_write_log() {
1226        let dir = TempDir::new().expect("tmp");
1227        let mut store = open_fresh(&dir);
1228        let err = store
1229            .commit_batch("(sem @a", fixed_now())
1230            .expect_err("malformed");
1231        assert!(matches!(err, StoreError::Pipeline(_)));
1232        assert_eq!(store.log.len(), 0);
1233    }
1234
1235    #[test]
1236    fn commits_assign_distinct_episode_ids() {
1237        let dir = TempDir::new().expect("tmp");
1238        let mut store = open_fresh(&dir);
1239        let a = store.commit_batch(SEM_OK, fixed_now()).expect("a");
1240        let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1241        let b = store.commit_batch(input2, fixed_now()).expect("b");
1242        assert_ne!(a.as_symbol(), b.as_symbol());
1243    }
1244
1245    #[test]
1246    fn reopen_truncates_orphans_past_last_checkpoint() {
1247        let dir = TempDir::new().expect("tmp");
1248        let path = dir.path().join("canonical.log");
1249        let committed_len;
1250        {
1251            let mut store = Store::open(&path).expect("open");
1252            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1253            committed_len = store.log.len();
1254        }
1255        // Simulate a crash mid-batch: append orphan bytes that are
1256        // neither a valid record nor terminated by a CHECKPOINT.
1257        {
1258            let mut raw = CanonicalLog::open(&path).expect("reopen raw");
1259            raw.append(&[0x01, 0x42, 0xFF, 0xFF]).expect("append");
1260            raw.sync().expect("sync");
1261            assert!(raw.len() > committed_len);
1262        }
1263        // Reopening the store must truncate the orphan bytes.
1264        let store = Store::open(&path).expect("reopen store");
1265        assert_eq!(store.log.len(), committed_len);
1266    }
1267
1268    #[test]
1269    fn reopen_on_empty_workspace_is_clean() {
1270        let dir = TempDir::new().expect("tmp");
1271        let store = Store::open(dir.path().join("canonical.log")).expect("open");
1272        assert_eq!(store.log_len(), 0);
1273    }
1274
1275    #[test]
1276    fn episode_allocation_collision_restores_pipeline_state() {
1277        // Covers the commit-path rollback branch where
1278        // `allocate_episode_symbol` fails after the pipeline already
1279        // auto-applied its compile mutations. We force a collision by
1280        // rewinding the episode counter to a value whose `__ep_{n}`
1281        // name is already in the table from a prior commit.
1282        let dir = TempDir::new().expect("tmp");
1283        let mut store = open_fresh(&dir);
1284        let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1285        assert_eq!(store.next_episode_counter, 1);
1286        let snapshot = store.pipeline.clone();
1287        let log_len_after_first = store.log.len();
1288
1289        // Force the collision.
1290        store.next_episode_counter = 0;
1291
1292        let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1293        let err = store
1294            .commit_batch(input2, fixed_now())
1295            .expect_err("collision");
1296        assert!(matches!(err, StoreError::Pipeline(_)));
1297
1298        // Rollback verification: pipeline + counter + log all restored
1299        // to their pre-second-commit state. In particular the pipeline
1300        // must NOT contain the new @carol symbol that compile_batch
1301        // allocated before the episode-collision fired.
1302        assert_eq!(store.next_episode_counter, 0);
1303        assert_eq!(store.pipeline, snapshot);
1304        assert_eq!(store.log.len(), log_len_after_first);
1305    }
1306
1307    #[test]
1308    fn reopen_restores_symbol_table_from_log() {
1309        let dir = TempDir::new().expect("tmp");
1310        let path = dir.path().join("canonical.log");
1311        let alice_id;
1312        {
1313            let mut store = Store::open(&path).expect("open");
1314            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1315            alice_id = store
1316                .pipeline
1317                .table()
1318                .lookup("alice")
1319                .expect("alice allocated");
1320        }
1321        // Reopen: replay must restore the table such that @alice is
1322        // still allocated with the SAME SymbolId.
1323        let store = Store::open(&path).expect("reopen");
1324        assert_eq!(store.pipeline.table().lookup("alice"), Some(alice_id));
1325        assert!(store.pipeline.table().lookup("knows").is_some());
1326        assert!(store.pipeline.table().lookup("bob").is_some());
1327    }
1328
1329    #[test]
1330    fn reopen_restores_table_from_epi_batch() {
1331        let dir = TempDir::new().expect("tmp");
1332        let path = dir.path().join("canonical.log");
1333        let evt_id;
1334        let alice_id;
1335        {
1336            let mut store = Store::open(&path).expect("open");
1337            let input = "(epi @evt_001 @rename (@old @new) @github \
1338                         :at 2024-01-15T10:00:00Z :obs 2024-01-15T10:00:05Z \
1339                         :src @alice :c 0.9)";
1340            let _ = store.commit_batch(input, fixed_now()).expect("commit");
1341            evt_id = store
1342                .pipeline
1343                .table()
1344                .lookup("evt_001")
1345                .expect("event id allocated");
1346            alice_id = store
1347                .pipeline
1348                .table()
1349                .lookup("alice")
1350                .expect("witness allocated");
1351        }
1352        let store = Store::open(&path).expect("reopen");
1353        assert_eq!(store.pipeline.table().lookup("evt_001"), Some(evt_id));
1354        assert_eq!(store.pipeline.table().lookup("alice"), Some(alice_id));
1355        assert!(store.pipeline.table().lookup("old").is_some());
1356        assert!(store.pipeline.table().lookup("new").is_some());
1357        assert!(store.pipeline.table().lookup("github").is_some());
1358        assert_eq!(store.pipeline.episodic_records().len(), 1);
1359        assert_eq!(store.pipeline.episodic_records()[0].event_id, evt_id);
1360        assert_eq!(store.pipeline.episodic_records()[0].source, alice_id);
1361    }
1362
1363    #[test]
1364    fn reopen_restores_table_from_pro_batch() {
1365        let dir = TempDir::new().expect("tmp");
1366        let path = dir.path().join("canonical.log");
1367        let rule_id;
1368        {
1369            let mut store = Store::open(&path).expect("open");
1370            let input = r#"(pro @rule_1 "trigger text" "action text" :scp @mimir :src @agent_instruction :c 0.9)"#;
1371            let _ = store.commit_batch(input, fixed_now()).expect("commit");
1372            rule_id = store
1373                .pipeline
1374                .table()
1375                .lookup("rule_1")
1376                .expect("rule allocated");
1377        }
1378        let store = Store::open(&path).expect("reopen");
1379        assert_eq!(store.pipeline.table().lookup("rule_1"), Some(rule_id));
1380        assert!(store.pipeline.table().lookup("mimir").is_some());
1381        assert!(store.pipeline.table().lookup("agent_instruction").is_some());
1382    }
1383
1384    #[test]
1385    fn reopen_restores_table_from_inf_batch() {
1386        let dir = TempDir::new().expect("tmp");
1387        let path = dir.path().join("canonical.log");
1388        let method_id;
1389        {
1390            let mut store = Store::open(&path).expect("open");
1391            let input = "(inf @alice @friend_of @carol (@m0 @m1) @citation_link \
1392                         :c 0.6 :v 2024-01-15)";
1393            let _ = store.commit_batch(input, fixed_now()).expect("commit");
1394            method_id = store
1395                .pipeline
1396                .table()
1397                .lookup("citation_link")
1398                .expect("method allocated");
1399        }
1400        let store = Store::open(&path).expect("reopen");
1401        assert_eq!(
1402            store.pipeline.table().lookup("citation_link"),
1403            Some(method_id)
1404        );
1405        for name in ["alice", "friend_of", "carol", "m0", "m1"] {
1406            assert!(
1407                store.pipeline.table().lookup(name).is_some(),
1408                "{name} lost on reopen"
1409            );
1410        }
1411    }
1412
1413    #[test]
1414    fn reopen_advances_memory_and_episode_counters() {
1415        let dir = TempDir::new().expect("tmp");
1416        let path = dir.path().join("canonical.log");
1417        {
1418            let mut store = Store::open(&path).expect("open");
1419            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1420        }
1421        let mut store = Store::open(&path).expect("reopen");
1422        assert_eq!(store.next_episode_counter, 1);
1423        // A follow-up commit must not collide on __mem_0 or __ep_0 —
1424        // replay advanced both counters past their pre-crash values.
1425        let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1426        let _ = store.commit_batch(input2, fixed_now()).expect("second");
1427    }
1428
1429    #[test]
1430    fn checkpoint_and_episode_alloc_use_monotonic_clock_under_regressed_wall_clock() {
1431        // Store-side contract: the CHECKPOINT record and the
1432        // synthetic __ep_{n} SymbolAlloc both carry `effective_now`
1433        // (the pipeline's monotonic-enforced clock) in their `at`
1434        // field, not the raw wall clock passed into `commit_batch`.
1435        // Without this, a regressed wall clock would let the
1436        // checkpoint's `at` sit below the prior batch's committed_at,
1437        // violating the per-workspace monotonicity invariant.
1438        let dir = TempDir::new().expect("tmp");
1439        let path = dir.path().join("canonical.log");
1440        let high = ClockTime::try_from_millis(2_000_000_000_000).expect("non-sentinel");
1441        let regressed = ClockTime::try_from_millis(1_800_000_000_000).expect("non-sentinel");
1442        {
1443            let mut store = Store::open(&path).expect("open");
1444            let _ = store.commit_batch(SEM_OK, high).expect("high");
1445            // Distinct predicate so auto-supersession doesn't
1446            // interfere — this test is about clock monotonicity, not
1447            // (s, p) supersession detection.
1448            let _ = store
1449                .commit_batch(
1450                    "(sem @alice @likes @dan :src @observation :c 0.8 :v 2024-01-15)",
1451                    regressed,
1452                )
1453                .expect("regressed");
1454        }
1455
1456        // Decode the log and pull the second batch's checkpoint and
1457        // __ep_1 alloc. Both must sit at `high + 1` — the monotonic
1458        // bump — not at `regressed` (which is < `high`).
1459        // Skip the 8-byte canonical-log header (magic + format version,
1460        // see `log::LOG_HEADER_SIZE`) before decoding the record stream.
1461        let raw = std::fs::read(&path).expect("read log");
1462        let header_size = usize::try_from(crate::log::LOG_HEADER_SIZE).expect("header fits");
1463        let bytes = &raw[header_size..];
1464        let records = decode_all(bytes).expect("decode");
1465
1466        // Find the __ep_1 SymbolAlloc.
1467        let ep1_alloc = records
1468            .iter()
1469            .find(|r| matches!(r, CanonicalRecord::SymbolAlloc(ev) if ev.name == "__ep_1"))
1470            .expect("__ep_1 alloc present");
1471        let CanonicalRecord::SymbolAlloc(ep1) = ep1_alloc else {
1472            unreachable!();
1473        };
1474        let expected = ClockTime::try_from_millis(high.as_millis() + 1).expect("non-sentinel");
1475        assert_eq!(ep1.at, expected, "__ep_1 alloc must use monotonic clock");
1476
1477        // There are two checkpoints — the second (last) corresponds
1478        // to the regressed batch.
1479        let checkpoints: Vec<_> = records
1480            .iter()
1481            .filter_map(|r| match r {
1482                CanonicalRecord::Checkpoint(c) => Some(c),
1483                _ => None,
1484            })
1485            .collect();
1486        assert_eq!(checkpoints.len(), 2, "two batches → two checkpoints");
1487        assert_eq!(
1488            checkpoints[1].at, expected,
1489            "second checkpoint.at must use monotonic clock, not regressed wall clock"
1490        );
1491    }
1492
1493    #[test]
1494    fn reopen_restores_monotonic_commit_watermark() {
1495        // temporal-model.md § 9.2 / § 12 #1: committed_at must be
1496        // strictly monotonic per workspace even across reopen. On
1497        // open, the pipeline's watermark is restored from the
1498        // highest `committed_at` seen in the log — so a follow-up
1499        // batch submitted with a regressed wall clock is still
1500        // bumped past the last durably-committed record.
1501        let dir = TempDir::new().expect("tmp");
1502        let path = dir.path().join("canonical.log");
1503        let high = ClockTime::try_from_millis(2_000_000_000_000).expect("non-sentinel");
1504        {
1505            let mut store = Store::open(&path).expect("open");
1506            let _ = store.commit_batch(SEM_OK, high).expect("commit at high");
1507            assert_eq!(store.pipeline.last_committed_at(), Some(high));
1508        }
1509        // Reopen and check the watermark survived.
1510        let mut store = Store::open(&path).expect("reopen");
1511        assert_eq!(store.pipeline.last_committed_at(), Some(high));
1512
1513        // Commit with a regressed wall clock; pipeline must bump
1514        // past `high`. `low` sits after 2024-01-15 so semantic does
1515        // not reject the form for future-validity.
1516        let low = ClockTime::try_from_millis(1_800_000_000_000).expect("non-sentinel");
1517        let _ = store
1518            .commit_batch(
1519                "(sem @alice @likes @dan :src @observation :c 0.8 :v 2024-01-15)",
1520                low,
1521            )
1522            .expect("regressed commit");
1523        let watermark = store
1524            .pipeline
1525            .last_committed_at()
1526            .expect("watermark set after commit");
1527        assert_eq!(watermark.as_millis(), high.as_millis() + 1);
1528    }
1529
1530    #[test]
1531    fn reopen_replays_rename_and_retire() {
1532        let dir = TempDir::new().expect("tmp");
1533        let path = dir.path().join("canonical.log");
1534        {
1535            let mut store = Store::open(&path).expect("open");
1536            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1537            let _ = store
1538                .commit_batch("(rename @alice @alice_v2)", fixed_now())
1539                .expect("rename");
1540            let _ = store
1541                .commit_batch("(retire @bob)", fixed_now())
1542                .expect("retire");
1543        }
1544        let store = Store::open(&path).expect("reopen");
1545        let alice_id = store
1546            .pipeline
1547            .table()
1548            .lookup("alice_v2")
1549            .expect("canonical rotated");
1550        assert_eq!(
1551            store
1552                .pipeline
1553                .table()
1554                .entry(alice_id)
1555                .expect("entry")
1556                .canonical_name,
1557            "alice_v2"
1558        );
1559        let bob_id = store.pipeline.table().lookup("bob").expect("bob");
1560        assert!(store.pipeline.table().is_retired(bob_id));
1561    }
1562
1563    // ----------------------------------------------------------
1564    // Crash-injection matrix per write-protocol.md § 7.
1565    // ----------------------------------------------------------
1566
1567    #[test]
1568    fn row_3_orphan_memory_record_without_checkpoint_truncated_on_reopen() {
1569        // Spec § 7 row: "Crash between last record and CHECKPOINT
1570        // append". Simulate by committing one batch (durable), then
1571        // appending an orphan memory record whose batch never reached
1572        // CHECKPOINT. Reopen must truncate to the last CHECKPOINT.
1573        let dir = TempDir::new().expect("tmp");
1574        let path = dir.path().join("canonical.log");
1575        let committed_len;
1576        {
1577            let mut store = Store::open(&path).expect("open");
1578            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1579            committed_len = store.log_len();
1580        }
1581        // Append a valid but uncommitted SymbolAlloc — represents an
1582        // in-progress batch that crashed before Phase 2.
1583        {
1584            let mut raw = CanonicalLog::open(&path).expect("raw");
1585            let fake_alloc = CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1586                symbol_id: SymbolId::new(999),
1587                name: "orphan_symbol".into(),
1588                symbol_kind: SymbolKind::Literal,
1589                at: fixed_now(),
1590            });
1591            let mut buf = Vec::new();
1592            encode_record(&fake_alloc, &mut buf);
1593            raw.append(&buf).expect("append");
1594            raw.sync().expect("sync");
1595            assert!(raw.len() > committed_len);
1596        }
1597        let store = Store::open(&path).expect("reopen");
1598        assert_eq!(store.log_len(), committed_len);
1599        // The orphaned symbol must NOT appear in the reconstructed table.
1600        assert!(store.pipeline.table().lookup("orphan_symbol").is_none());
1601    }
1602
1603    #[test]
1604    fn row_6_append_failure_rolls_back_pipeline_and_log() {
1605        // Spec § 7 row: "Disk full during Phase 1". Inject a
1606        // StorageFull on the next append and assert full rollback.
1607        let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1608        let pre_commit_pipeline = store.pipeline.clone();
1609        store
1610            .log
1611            .arm_append_failure(std::io::ErrorKind::StorageFull);
1612
1613        let err = store
1614            .commit_batch(SEM_OK, fixed_now())
1615            .expect_err("append failure");
1616        assert!(matches!(err, StoreError::Log(_)));
1617        // Log, pipeline, and episode counter are all restored.
1618        assert_eq!(store.log.len(), 0);
1619        assert_eq!(store.pipeline, pre_commit_pipeline);
1620        assert_eq!(store.next_episode_counter, 0);
1621    }
1622
1623    #[test]
1624    fn row_7_sync_failure_rolls_back_pipeline_and_log() {
1625        // Spec § 7 row: "fsync fails (hardware error)". Inject an IO
1626        // error on the next sync and assert full rollback. The log
1627        // bytes were appended but must be truncated back.
1628        let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1629        let pre_commit_pipeline = store.pipeline.clone();
1630        store.log.arm_sync_failure(std::io::ErrorKind::Other);
1631
1632        let err = store
1633            .commit_batch(SEM_OK, fixed_now())
1634            .expect_err("sync failure");
1635        assert!(matches!(err, StoreError::Log(_)));
1636        // The appended bytes have been truncated back.
1637        assert_eq!(store.log.len(), 0);
1638        assert_eq!(store.pipeline, pre_commit_pipeline);
1639        assert_eq!(store.next_episode_counter, 0);
1640    }
1641
1642    #[test]
1643    fn rollback_truncate_failure_still_surfaces_an_error() {
1644        // Compound-failure path: sync fails, triggering rollback;
1645        // rollback's truncate also fails. The store must surface
1646        // *an* error (currently the secondary truncate error, with
1647        // the primary sync error lost — documented diagnosability
1648        // limitation noted in the `rollback` helper's "best-effort"
1649        // comment). This test proves the path is reachable and the
1650        // caller is not silently given Ok.
1651        let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1652        let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1653        let pre_second_pipeline = store.pipeline.clone();
1654        let len_after_first = store.log.len();
1655
1656        store.log.arm_sync_failure(std::io::ErrorKind::Other);
1657        store
1658            .log
1659            .arm_truncate_failure(std::io::ErrorKind::PermissionDenied);
1660
1661        let err = store
1662            .commit_batch(SEM_OK_2, fixed_now())
1663            .expect_err("compound failure");
1664        assert!(matches!(err, StoreError::Log(_)));
1665        // Rollback could not truncate the log, so the bytes appended
1666        // for the second batch remain beyond `len_after_first` — a
1667        // reopen's `last_checkpoint_end` scan will truncate them.
1668        assert!(store.log.len() >= len_after_first);
1669        // Pipeline and counter were restored *before* the truncate
1670        // attempt, so their snapshot semantics hold even when
1671        // truncate fails.
1672        assert_eq!(store.pipeline, pre_second_pipeline);
1673        assert_eq!(store.next_episode_counter, 1);
1674    }
1675
1676    #[test]
1677    fn rollback_preserves_earlier_committed_bytes() {
1678        // Variant of rows 6/7: after one successful commit, a failure
1679        // on the second commit truncates back to the first commit's
1680        // length — not all the way to zero.
1681        let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1682        let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1683        let len_after_first = store.log.len();
1684        assert!(len_after_first > 0);
1685
1686        store.log.arm_sync_failure(std::io::ErrorKind::Other);
1687        let err = store
1688            .commit_batch(SEM_OK_2, fixed_now())
1689            .expect_err("sync failure");
1690        assert!(matches!(err, StoreError::Log(_)));
1691        assert_eq!(store.log.len(), len_after_first);
1692    }
1693
1694    #[test]
1695    fn orphan_truncation_is_idempotent() {
1696        // Spec § 1 graduation criterion #4 + § 10.2: truncating an
1697        // already-orphan-free log to the same committed offset is a
1698        // no-op. Running recovery multiple times must converge.
1699        let dir = TempDir::new().expect("tmp");
1700        let path = dir.path().join("canonical.log");
1701        {
1702            let mut store = Store::open(&path).expect("open");
1703            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1704            let _ = store.commit_batch(SEM_OK_2, fixed_now()).expect("second");
1705        }
1706        // Inject a crash-shaped partial record tail.
1707        {
1708            let mut raw = CanonicalLog::open(&path).expect("raw");
1709            raw.append(&[0x01_u8]).expect("append partial frame");
1710            raw.sync().expect("sync");
1711        }
1712        // First recovery truncates.
1713        let len_after_first_recovery = {
1714            let store = Store::open(&path).expect("recover once");
1715            store.log_len()
1716        };
1717        // Second recovery is a no-op — same length.
1718        let store = Store::open(&path).expect("recover twice");
1719        assert_eq!(store.log_len(), len_after_first_recovery);
1720    }
1721
1722    #[test]
1723    fn reopen_rejects_corrupt_tail_after_last_checkpoint() {
1724        let dir = TempDir::new().expect("tmp");
1725        let path = dir.path().join("canonical.log");
1726        let committed_len;
1727        {
1728            let mut store = Store::open(&path).expect("open");
1729            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1730            committed_len = store.log_len();
1731        }
1732
1733        {
1734            let mut raw = CanonicalLog::open(&path).expect("raw");
1735            raw.append(&[0x05_u8; 7]).expect("append corrupt tail");
1736            raw.sync().expect("sync");
1737            assert!(raw.len() > committed_len);
1738        }
1739
1740        let Err(err) = Store::open(&path) else {
1741            panic!("corrupt tail must not truncate");
1742        };
1743        assert!(
1744            matches!(err, StoreError::CorruptTail { .. }),
1745            "expected corrupt-tail error, got {err:?}"
1746        );
1747
1748        let raw = CanonicalLog::open(&path).expect("raw reopen");
1749        assert!(
1750            raw.len() > committed_len,
1751            "corrupt tail must be preserved for inspection"
1752        );
1753    }
1754
1755    #[test]
1756    fn reopen_rejects_corrupt_log_without_checkpoint() {
1757        let dir = TempDir::new().expect("tmp");
1758        let path = dir.path().join("canonical.log");
1759        {
1760            let mut raw = CanonicalLog::open(&path).expect("raw");
1761            raw.append(&[0x05_u8]).expect("append corrupt log");
1762            raw.sync().expect("sync");
1763        }
1764
1765        let Err(err) = Store::open(&path) else {
1766            panic!("corrupt checkpoint-free log must not truncate to empty");
1767        };
1768        assert!(
1769            matches!(
1770                err,
1771                StoreError::CorruptTail {
1772                    offset: 0,
1773                    source: DecodeError::UnknownOpcode { .. }
1774                }
1775            ),
1776            "expected corrupt-tail unknown-opcode error, got {err:?}"
1777        );
1778
1779        let raw = CanonicalLog::open(&path).expect("raw reopen");
1780        assert_eq!(raw.len(), 1, "corrupt bytes must be preserved");
1781    }
1782
1783    #[test]
1784    fn symbol_table_replay_reproduces_pre_crash_state() {
1785        // Spec § 1 graduation criterion #4: symbol-table replay
1786        // reproduces the exact pre-crash state. Commit a diverse
1787        // batch (alloc + rename + retire), capture every state field
1788        // — `SymbolTable`, `next_episode_counter`, and
1789        // `next_memory_counter` — then close and reopen. The replayed
1790        // triple must be byte-equal to the pre-close triple; asserting
1791        // all three ensures "exact state" isn't tested via a proxy.
1792        let dir = TempDir::new().expect("tmp");
1793        let path = dir.path().join("canonical.log");
1794        let table_before;
1795        let counter_before;
1796        let memory_counter_before;
1797        {
1798            let mut store = Store::open(&path).expect("open");
1799            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1800            let _ = store
1801                .commit_batch("(rename @alice @alice_v2)", fixed_now())
1802                .expect("rename");
1803            let _ = store
1804                .commit_batch("(retire @bob)", fixed_now())
1805                .expect("retire");
1806            table_before = store.pipeline.table().clone();
1807            counter_before = store.next_episode_counter;
1808            memory_counter_before = store.pipeline.next_memory_counter();
1809        }
1810        let store = Store::open(&path).expect("reopen");
1811        assert_eq!(store.pipeline.table(), &table_before);
1812        assert_eq!(store.next_episode_counter, counter_before);
1813        // `next_memory_counter` must advance past every `__mem_{n}`
1814        // seen in the log so a follow-up commit doesn't collide with
1815        // a pre-crash memory-id allocation.
1816        assert_eq!(store.pipeline.next_memory_counter(), memory_counter_before);
1817    }
1818
1819    #[test]
1820    fn checkpoint_is_atomic_commit_boundary() {
1821        // Spec § 1 graduation criterion #4 + § 12 invariant 1:
1822        // truncating the log to just before a Checkpoint makes the
1823        // batch uncommitted. After truncation to any Checkpoint
1824        // boundary, Store::open must treat the post-checkpoint bytes
1825        // as orphans.
1826        let dir = TempDir::new().expect("tmp");
1827        let path = dir.path().join("canonical.log");
1828        let len_after_first;
1829        {
1830            let mut store = Store::open(&path).expect("open");
1831            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1832            len_after_first = store.log_len();
1833            let _ = store.commit_batch(SEM_OK_2, fixed_now()).expect("second");
1834        }
1835        // Truncate to the first Checkpoint's end — simulates the
1836        // second batch's Checkpoint never having been durable.
1837        {
1838            let mut raw = CanonicalLog::open(&path).expect("raw");
1839            raw.truncate(len_after_first).expect("truncate");
1840        }
1841        let store = Store::open(&path).expect("reopen");
1842        // The second batch is fully discarded: @carol not present.
1843        assert!(store.pipeline.table().lookup("carol").is_none());
1844        // But the first batch's state is intact: @alice still there.
1845        assert!(store.pipeline.table().lookup("alice").is_some());
1846    }
1847
1848    // ----- workspace partitioning -----
1849
1850    #[test]
1851    fn open_in_workspace_creates_partitioned_directory() {
1852        // `workspace-model.md` § 4.2: different workspaces under the
1853        // same data_root land in different on-disk directories and
1854        // share no state.
1855        use crate::WorkspaceId;
1856        let data_root = TempDir::new().expect("tmp");
1857        let ws_a = WorkspaceId::from_git_remote("https://github.com/foo/mimir").unwrap();
1858        let ws_b = WorkspaceId::from_git_remote("https://github.com/bar/mimir").unwrap();
1859        assert_ne!(ws_a, ws_b);
1860
1861        {
1862            let mut store_a = Store::open_in_workspace(data_root.path(), ws_a).expect("open ws a");
1863            let _ = store_a.commit_batch(SEM_OK, fixed_now()).expect("commit a");
1864        }
1865        {
1866            let mut store_b = Store::open_in_workspace(data_root.path(), ws_b).expect("open ws b");
1867            // Workspace B's Store has no knowledge of workspace A's
1868            // commit — the table is fresh.
1869            assert!(store_b.pipeline.table().lookup("alice").is_none());
1870            let _ = store_b.commit_batch(SEM_OK, fixed_now()).expect("commit b");
1871        }
1872        // Reopen workspace A; its state is intact and independent of B.
1873        let store_a_again = Store::open_in_workspace(data_root.path(), ws_a).expect("reopen ws a");
1874        assert!(store_a_again.pipeline.table().lookup("alice").is_some());
1875    }
1876
1877    #[test]
1878    fn reopen_restores_procedural_supersession_index() {
1879        // 6.3b contract: the Procedural index (rule_id +
1880        // (trigger, scope)) is rebuilt from the log at open, so a
1881        // post-reopen Pro write with the same rule_id or same
1882        // (trigger, scope) correctly auto-supersedes the pre-reopen
1883        // memory.
1884        let dir = TempDir::new().expect("tmp");
1885        let path = dir.path().join("canonical.log");
1886        let pro_seed = r#"(pro @rule_route "agent_write" "route_via_librarian"
1887            :scp @mimir :src @policy :c 1.0)"#;
1888        {
1889            let mut store = Store::open(&path).expect("open");
1890            let _ = store.commit_batch(pro_seed, fixed_now()).expect("seed");
1891        }
1892        let mut store = Store::open(&path).expect("reopen");
1893        // Post-reopen write with the same rule_id — must auto-supersede.
1894        let records = store
1895            .pipeline
1896            .compile_batch(
1897                r#"(pro @rule_route "other_trigger" "other_action"
1898                :scp @other_scope :src @policy :c 0.9)"#,
1899                fixed_now(),
1900            )
1901            .expect("post-reopen compile");
1902        let edges: Vec<_> = records
1903            .iter()
1904            .filter(|r| matches!(r, CanonicalRecord::Supersedes(_)))
1905            .collect();
1906        assert_eq!(
1907            edges.len(),
1908            1,
1909            "post-reopen same-rule_id write must auto-supersede"
1910        );
1911    }
1912
1913    #[test]
1914    fn reopen_restores_supersession_index_so_post_reopen_auto_supersedes() {
1915        // 6.3a contract: the supersession-detection index is rebuilt
1916        // from the log at open, so a post-reopen batch at the same
1917        // (s, p) with a later valid_at correctly auto-supersedes the
1918        // pre-reopen memory. Without index replay, the new batch would
1919        // see an empty index and emit no Supersedes edge.
1920        let dir = TempDir::new().expect("tmp");
1921        let path = dir.path().join("canonical.log");
1922        {
1923            let mut store = Store::open(&path).expect("open");
1924            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("seed");
1925        }
1926        // Reopen and commit a later-valid_at write at the same (s, p).
1927        let mut store = Store::open(&path).expect("reopen");
1928        let records = store
1929            .pipeline
1930            .compile_batch(
1931                "(sem @alice @knows @mallory :src @observation :c 0.8 :v 2024-03-01)",
1932                fixed_now(),
1933            )
1934            .expect("post-reopen compile");
1935        let edges: Vec<_> = records
1936            .iter()
1937            .filter(|r| matches!(r, CanonicalRecord::Supersedes(_)))
1938            .collect();
1939        assert_eq!(
1940            edges.len(),
1941            1,
1942            "post-reopen forward write must auto-supersede the pre-reopen memory"
1943        );
1944    }
1945
1946    #[test]
1947    fn reopen_on_fully_committed_log_preserves_length() {
1948        let dir = TempDir::new().expect("tmp");
1949        let path = dir.path().join("canonical.log");
1950        let committed_len;
1951        {
1952            let mut store = Store::open(&path).expect("open");
1953            let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1954            committed_len = store.log.len();
1955        }
1956        let store = Store::open(&path).expect("reopen");
1957        assert_eq!(store.log.len(), committed_len);
1958    }
1959
1960    /// Append raw bytes bypassing the normal commit path, then close
1961    /// with a `CHECKPOINT` so recovery treats the run as committed.
1962    fn fabricate_committed_segment<L: LogBackend>(log: &mut L, records: &[CanonicalRecord]) {
1963        let mut buf = Vec::new();
1964        for r in records {
1965            encode_record(r, &mut buf);
1966        }
1967        log.append(&buf).expect("append");
1968        log.sync().expect("sync");
1969    }
1970
1971    #[test]
1972    fn reopen_replays_supersession_edges_into_dag() {
1973        // 6.2's replay contract: edge records (`SUPERSEDES` /
1974        // `CORRECTS` / `STALE_PARENT` / `RECONFIRMS`) appearing before
1975        // the last durable `CHECKPOINT` are replayed into
1976        // `Pipeline::dag` with full acyclicity enforcement.
1977        use crate::canonical::{CheckpointRecord, EdgeRecord};
1978        use crate::dag::EdgeKind;
1979
1980        let mut log = FaultyLog::new();
1981        let ep0 = SymbolId::new(100);
1982        let m1 = SymbolId::new(101);
1983        let m2 = SymbolId::new(102);
1984        let m3 = SymbolId::new(103);
1985        let ts = fixed_now();
1986
1987        let records = vec![
1988            // Synthetic episode-alloc + three memory IDs as Memory-kind symbols
1989            // so replay has the referenced IDs in the symbol table (not
1990            // required by the DAG but realistic).
1991            CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1992                symbol_id: ep0,
1993                name: "__ep_0".into(),
1994                symbol_kind: SymbolKind::Memory,
1995                at: ts,
1996            }),
1997            CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1998                symbol_id: m1,
1999                name: "__mem_0".into(),
2000                symbol_kind: SymbolKind::Memory,
2001                at: ts,
2002            }),
2003            CanonicalRecord::SymbolAlloc(SymbolEventRecord {
2004                symbol_id: m2,
2005                name: "__mem_1".into(),
2006                symbol_kind: SymbolKind::Memory,
2007                at: ts,
2008            }),
2009            CanonicalRecord::SymbolAlloc(SymbolEventRecord {
2010                symbol_id: m3,
2011                name: "__mem_2".into(),
2012                symbol_kind: SymbolKind::Memory,
2013                at: ts,
2014            }),
2015            // Two edges, acyclic: m1 -> m2, m2 -> m3.
2016            CanonicalRecord::Supersedes(EdgeRecord {
2017                from: m1,
2018                to: m2,
2019                at: ts,
2020            }),
2021            CanonicalRecord::Corrects(EdgeRecord {
2022                from: m2,
2023                to: m3,
2024                at: ts,
2025            }),
2026            CanonicalRecord::Checkpoint(CheckpointRecord {
2027                episode_id: ep0,
2028                at: ts,
2029                memory_count: 0,
2030            }),
2031        ];
2032        fabricate_committed_segment(&mut log, &records);
2033
2034        let store = Store::from_backend(log).expect("open");
2035        assert_eq!(store.pipeline.dag().len(), 2);
2036        let edges: Vec<_> = store.pipeline.dag().edges().to_vec();
2037        assert_eq!(edges[0].kind, EdgeKind::Supersedes);
2038        assert_eq!(edges[0].from, m1);
2039        assert_eq!(edges[0].to, m2);
2040        assert_eq!(edges[1].kind, EdgeKind::Corrects);
2041    }
2042
2043    #[test]
2044    fn reopen_surfaces_dag_replay_error_on_cyclic_edges() {
2045        // A log whose edges close a cycle must fail open with
2046        // `StoreError::DagReplay`, not a silent invariant break.
2047        use crate::canonical::{CheckpointRecord, EdgeRecord};
2048
2049        let mut log = FaultyLog::new();
2050        let ep0 = SymbolId::new(200);
2051        let m1 = SymbolId::new(201);
2052        let m2 = SymbolId::new(202);
2053        let ts = fixed_now();
2054
2055        let records = vec![
2056            CanonicalRecord::SymbolAlloc(SymbolEventRecord {
2057                symbol_id: ep0,
2058                name: "__ep_0".into(),
2059                symbol_kind: SymbolKind::Memory,
2060                at: ts,
2061            }),
2062            // Cycle: m1 -> m2, m2 -> m1.
2063            CanonicalRecord::Supersedes(EdgeRecord {
2064                from: m1,
2065                to: m2,
2066                at: ts,
2067            }),
2068            CanonicalRecord::Supersedes(EdgeRecord {
2069                from: m2,
2070                to: m1,
2071                at: ts,
2072            }),
2073            CanonicalRecord::Checkpoint(CheckpointRecord {
2074                episode_id: ep0,
2075                at: ts,
2076                memory_count: 0,
2077            }),
2078        ];
2079        fabricate_committed_segment(&mut log, &records);
2080
2081        let Err(err) = Store::from_backend(log) else {
2082            panic!("cyclic edges must not replay cleanly");
2083        };
2084        assert!(
2085            matches!(err, StoreError::DagReplay { .. }),
2086            "expected DagReplay, got {err:?}"
2087        );
2088    }
2089}