Skip to main content

mimir_core/
log.rs

1//! Append-only canonical log per `docs/concepts/write-protocol.md`.
2//!
3//! [`CanonicalLog`] is the single durable file backing a workspace. It
4//! exposes only four operations: append, sync (fsync), truncate, and
5//! scan for the last CHECKPOINT. The write protocol (see `store.rs`)
6//! composes these into two-phase commits.
7//!
8//! # File format
9//!
10//! Every canonical log starts with an 8-byte header:
11//!
12//! ```text
13//! offset 0..4 : ASCII magic `MIMR` (4 bytes)
14//! offset 4..8 : little-endian u32 format version
15//! offset 8..  : record stream (opcode + varint length + body, repeating)
16//! ```
17//!
18//! The header is written eagerly when [`CanonicalLog::open`] is called
19//! against an empty (or non-existent) file. On reopen, the header is
20//! validated and a non-Mimir or wrong-version file is rejected with
21//! [`LogError::IncompatibleFormat`] BEFORE any truncation, append, or
22//! recovery logic runs. This closes the destructive-truncate footgun
23//! where opening `Store` against a misrouted path would zero an
24//! arbitrary file.
25//!
26//! From the [`LogBackend`] trait's perspective the header is invisible:
27//! [`len`](LogBackend::len), [`read_all`](LogBackend::read_all),
28//! [`last_checkpoint_end`](LogBackend::last_checkpoint_end), and
29//! [`truncate`](LogBackend::truncate) all operate in **logical bytes**
30//! (record stream only). [`CanonicalLog`] handles the physical header
31//! transparently; in-memory test backends like `FaultyLog` carry no
32//! header because they never persist.
33//!
34//! Engineering notes:
35//!
36//! - Plain file handle; no mmap, no `O_DIRECT`.
37//! - `sync()` is `fsync` (full metadata + data per spec § 6.2).
38//! - Orphan detection via forward scan from start. Spec § 10.1 suggests
39//!   a backward-scan optimization for healthy logs; deferred until we
40//!   have a realistic benchmark.
41//!
42//! The [`LogBackend`] trait abstracts the four filesystem primitives
43//! so [`Store`](crate::store::Store) can be parameterized over a
44//! fault-injecting test backend. Production code uses [`CanonicalLog`]
45//! (the default).
46
47use std::fs::{File, OpenOptions};
48use std::io::{Read, Seek, SeekFrom, Write};
49use std::path::{Path, PathBuf};
50
51use thiserror::Error;
52
53use crate::canonical::{decode_record, CanonicalRecord};
54
55/// 4-byte ASCII magic prefix identifying an Mimir canonical log.
56pub const LOG_MAGIC: [u8; 4] = *b"MIMR";
57
58/// Current canonical-log format version. Bumped on any wire-format
59/// break that the decoder cannot handle transparently.
60pub const LOG_FORMAT_VERSION: u32 = 1;
61
62/// Physical byte length of the on-disk header (magic + version).
63pub const LOG_HEADER_SIZE: u64 = 8;
64
65/// The filesystem primitives a `Store` needs from its underlying log.
66///
67/// The production implementation is [`CanonicalLog`]; tests and crash-
68/// injection harnesses implement this trait to arm failures on
69/// specific operations (see `store::tests::FaultyLog`).
70pub trait LogBackend {
71    /// Append `bytes` at the current end. No fsync is implied.
72    ///
73    /// # Errors
74    ///
75    /// Implementations return [`LogError`] on failure. A failed append
76    /// may leave partial bytes written; callers are responsible for
77    /// truncating back to their pre-write offset.
78    fn append(&mut self, bytes: &[u8]) -> Result<(), LogError>;
79
80    /// Fsync the log. Spec § 6 — data + metadata.
81    ///
82    /// # Errors
83    ///
84    /// Returns [`LogError`] on failure. Per spec § 7's `fsync-fails`
85    /// row, a sync failure is treated by callers as uncommitted.
86    fn sync(&mut self) -> Result<(), LogError>;
87
88    /// Truncate the log to `new_len` bytes (and fsync the truncation).
89    ///
90    /// # Errors
91    ///
92    /// - [`LogError::TruncateBeyondEnd`] if `new_len > self.len()`.
93    /// - [`LogError::Io`] on other failures.
94    fn truncate(&mut self, new_len: u64) -> Result<(), LogError>;
95
96    /// Read the entire log into a buffer.
97    ///
98    /// # Errors
99    ///
100    /// Returns [`LogError`] on read failure.
101    fn read_all(&mut self) -> Result<Vec<u8>, LogError>;
102
103    /// Byte length of the log.
104    fn len(&self) -> u64;
105
106    /// `true` if the log is empty.
107    fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    /// Scan forward from offset `0` and return the byte offset after
112    /// the last `Checkpoint` record. Returns `0` if no Checkpoint has
113    /// committed yet.
114    ///
115    /// Decode errors past the last good Checkpoint stop the scan —
116    /// that matches spec § 10's orphan-truncation contract.
117    ///
118    /// # Errors
119    ///
120    /// Returns [`LogError`] on read failure.
121    fn last_checkpoint_end(&mut self) -> Result<u64, LogError>;
122}
123
124/// The append-only canonical log file.
125#[derive(Debug)]
126pub struct CanonicalLog {
127    file: File,
128    path: PathBuf,
129    len: u64,
130}
131
132impl CanonicalLog {
133    /// Open or create the canonical log at `path`. File is opened in
134    /// read+write+append-friendly mode; existing content is preserved.
135    ///
136    /// On a freshly-created (or pre-existing zero-byte) file, the
137    /// 8-byte header (`MIMR` + format version `1` LE) is written and
138    /// fsync'd before this returns. On a file that already has bytes,
139    /// the first 8 bytes are validated against the expected header and
140    /// rejected with [`LogError::IncompatibleFormat`] on mismatch —
141    /// **before** any truncation, append, or replay path runs. This
142    /// guards against the destructive-truncate footgun where pointing
143    /// `Store::open` at a misrouted path would silently zero the file.
144    ///
145    /// # Errors
146    ///
147    /// - [`LogError::Io`] if the file cannot be created or opened.
148    /// - [`LogError::IncompatibleFormat`] if the file's first bytes are
149    ///   not a valid Mimir log header (truncated, wrong magic, or
150    ///   wrong format version).
151    pub fn open(path: impl AsRef<Path>) -> Result<Self, LogError> {
152        let path = path.as_ref().to_path_buf();
153        let mut file = OpenOptions::new()
154            .read(true)
155            .write(true)
156            .create(true)
157            .truncate(false)
158            .open(&path)
159            .map_err(LogError::Io)?;
160        let physical_len = file.metadata().map_err(LogError::Io)?.len();
161
162        if physical_len == 0 {
163            // Fresh file: write the header eagerly + fsync so a crash
164            // between open and the first append still leaves a
165            // recognizable Mimir log.
166            file.seek(SeekFrom::Start(0)).map_err(LogError::Io)?;
167            let mut header = [0_u8; 8];
168            header[0..4].copy_from_slice(&LOG_MAGIC);
169            header[4..8].copy_from_slice(&LOG_FORMAT_VERSION.to_le_bytes());
170            file.write_all(&header).map_err(LogError::Io)?;
171            file.sync_all().map_err(LogError::Io)?;
172            return Ok(Self { file, path, len: 0 });
173        }
174
175        if physical_len < LOG_HEADER_SIZE {
176            return Err(LogError::IncompatibleFormat {
177                reason: format!(
178                    "file is {physical_len} bytes; expected at least \
179                     {LOG_HEADER_SIZE}-byte Mimir header"
180                ),
181            });
182        }
183
184        // Read + validate header.
185        file.seek(SeekFrom::Start(0)).map_err(LogError::Io)?;
186        let mut header = [0_u8; 8];
187        file.read_exact(&mut header).map_err(LogError::Io)?;
188        if header[0..4] != LOG_MAGIC {
189            return Err(LogError::IncompatibleFormat {
190                reason: format!(
191                    "magic mismatch: got {:?}, expected {:?} ({:?})",
192                    &header[0..4],
193                    LOG_MAGIC,
194                    std::str::from_utf8(&LOG_MAGIC).unwrap_or("?"),
195                ),
196            });
197        }
198        let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
199        if version != LOG_FORMAT_VERSION {
200            return Err(LogError::IncompatibleFormat {
201                reason: format!(
202                    "format version {version} not supported \
203                     (this build supports version {LOG_FORMAT_VERSION})"
204                ),
205            });
206        }
207
208        // Logical length excludes the header; payload starts at offset
209        // LOG_HEADER_SIZE.
210        let len = physical_len - LOG_HEADER_SIZE;
211        Ok(Self { file, path, len })
212    }
213
214    /// The filesystem path of this log.
215    #[must_use]
216    pub fn path(&self) -> &Path {
217        &self.path
218    }
219}
220
221impl LogBackend for CanonicalLog {
222    fn append(&mut self, bytes: &[u8]) -> Result<(), LogError> {
223        self.file.seek(SeekFrom::End(0)).map_err(LogError::Io)?;
224        self.file.write_all(bytes).map_err(LogError::Io)?;
225        self.len = self
226            .len
227            .checked_add(bytes.len() as u64)
228            .ok_or(LogError::LogOverflow)?;
229        Ok(())
230    }
231
232    fn sync(&mut self) -> Result<(), LogError> {
233        self.file.sync_all().map_err(LogError::Io)
234    }
235
236    fn truncate(&mut self, new_len: u64) -> Result<(), LogError> {
237        if new_len > self.len {
238            return Err(LogError::TruncateBeyondEnd {
239                requested: new_len,
240                current: self.len,
241            });
242        }
243        // Logical truncation: physical file length is `header + new_len`.
244        // The header is never touched, so a `truncate(0)` rollback still
245        // leaves a valid (empty-payload) Mimir log on disk.
246        let physical_new_len = LOG_HEADER_SIZE
247            .checked_add(new_len)
248            .ok_or(LogError::LogOverflow)?;
249        self.file.set_len(physical_new_len).map_err(LogError::Io)?;
250        self.file.sync_all().map_err(LogError::Io)?;
251        self.len = new_len;
252        Ok(())
253    }
254
255    fn read_all(&mut self) -> Result<Vec<u8>, LogError> {
256        // Skip the header — callers see only the logical record stream.
257        self.file
258            .seek(SeekFrom::Start(LOG_HEADER_SIZE))
259            .map_err(LogError::Io)?;
260        let capacity = usize::try_from(self.len).unwrap_or(usize::MAX);
261        let mut buf = Vec::with_capacity(capacity);
262        self.file.read_to_end(&mut buf).map_err(LogError::Io)?;
263        Ok(buf)
264    }
265
266    fn len(&self) -> u64 {
267        self.len
268    }
269
270    fn last_checkpoint_end(&mut self) -> Result<u64, LogError> {
271        let bytes = self.read_all()?;
272        let mut pos: usize = 0;
273        let mut last_checkpoint_end: u64 = 0;
274        while pos < bytes.len() {
275            match decode_record(&bytes[pos..]) {
276                Ok((record, consumed)) => {
277                    pos += consumed;
278                    if matches!(record, CanonicalRecord::Checkpoint(_)) {
279                        last_checkpoint_end = pos as u64;
280                    }
281                }
282                Err(_) => break,
283            }
284        }
285        Ok(last_checkpoint_end)
286    }
287}
288
289/// Errors produced by [`CanonicalLog`].
290#[derive(Debug, Error)]
291pub enum LogError {
292    /// Underlying filesystem / I/O error.
293    #[error("log I/O error: {0}")]
294    Io(#[source] std::io::Error),
295
296    /// The log's byte length would exceed `u64::MAX`. A single-workspace
297    /// log is not expected to hit this — included for completeness.
298    #[error("log length would overflow u64")]
299    LogOverflow,
300
301    /// Truncation target is beyond the current log length.
302    #[error("truncate target {requested} exceeds current length {current}")]
303    TruncateBeyondEnd {
304        /// Requested truncation offset.
305        requested: u64,
306        /// Current log length.
307        current: u64,
308    },
309
310    /// The file at the supplied path is not a recognizable Mimir
311    /// canonical log. Either the magic prefix doesn't match, the
312    /// declared format version isn't supported by this build, or the
313    /// file is too short to carry the 8-byte header. Surfaced
314    /// **before** any truncation or recovery logic so misrouted-path
315    /// opens cannot silently destroy data.
316    #[error("incompatible canonical-log format: {reason}")]
317    IncompatibleFormat {
318        /// Human-readable diagnostic; never include the actual bytes
319        /// of any payload past the header (no PII / value leakage).
320        reason: String,
321    },
322}
323
324impl PartialEq for LogError {
325    fn eq(&self, other: &Self) -> bool {
326        match (self, other) {
327            (Self::Io(a), Self::Io(b)) => a.kind() == b.kind(),
328            (Self::LogOverflow, Self::LogOverflow) => true,
329            (
330                Self::TruncateBeyondEnd {
331                    requested: ra,
332                    current: ca,
333                },
334                Self::TruncateBeyondEnd {
335                    requested: rb,
336                    current: cb,
337                },
338            ) => ra == rb && ca == cb,
339            (Self::IncompatibleFormat { reason: ra }, Self::IncompatibleFormat { reason: rb }) => {
340                ra == rb
341            }
342            _ => false,
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::canonical::{encode_record, CheckpointRecord};
351    use crate::clock::ClockTime;
352    use crate::symbol::SymbolId;
353    use std::fs;
354    use tempfile::TempDir;
355
356    fn checkpoint_bytes(seed: u64) -> Vec<u8> {
357        let mut buf = Vec::new();
358        encode_record(
359            &CanonicalRecord::Checkpoint(CheckpointRecord {
360                episode_id: SymbolId::new(seed),
361                at: ClockTime::try_from_millis(seed * 1000).expect("non-sentinel"),
362                memory_count: 1,
363            }),
364            &mut buf,
365        );
366        buf
367    }
368
369    #[test]
370    fn open_creates_empty_log_and_writes_header() {
371        let tmp = TempDir::new().expect("tmp");
372        let path = tmp.path().join("canonical.log");
373        let log = CanonicalLog::open(&path).expect("open");
374        assert!(
375            log.is_empty(),
376            "logical length is 0 (header is transparent)"
377        );
378        assert_eq!(log.len(), 0);
379        // Physical: 8-byte header was written + fsync'd.
380        let physical = fs::metadata(&path).expect("stat").len();
381        assert_eq!(physical, LOG_HEADER_SIZE);
382        let raw = fs::read(&path).expect("read raw");
383        assert_eq!(&raw[0..4], &LOG_MAGIC, "magic prefix written");
384        assert_eq!(
385            u32::from_le_bytes([raw[4], raw[5], raw[6], raw[7]]),
386            LOG_FORMAT_VERSION,
387            "format version written LE"
388        );
389    }
390
391    #[test]
392    fn open_reopens_existing_log_preserving_length() {
393        let tmp = TempDir::new().expect("tmp");
394        let path = tmp.path().join("canonical.log");
395        let payload = checkpoint_bytes(1);
396        {
397            let mut log = CanonicalLog::open(&path).expect("open");
398            log.append(&payload).expect("append");
399            log.sync().expect("sync");
400        }
401        let log = CanonicalLog::open(&path).expect("reopen");
402        assert_eq!(log.len(), payload.len() as u64);
403    }
404
405    /// **Audit finding F1 (P1, 2026-04-19 fresh assessment):** opening
406    /// `Store` (which composes `CanonicalLog::open`) against a
407    /// misrouted path silently truncated the file to zero, because
408    /// `last_checkpoint_end` returned `0` for a non-Mimir file and
409    /// `Store::from_backend` truncated to that offset. The magic
410    /// header makes this impossible: the open call rejects the file
411    /// with `IncompatibleFormat` before any truncation runs, so the
412    /// non-Mimir file is preserved byte-for-byte.
413    #[test]
414    fn open_refuses_to_initialize_non_mimir_file() {
415        let tmp = TempDir::new().expect("tmp");
416        let path = tmp.path().join("not-a-mimir-log.cfg");
417        let original: &[u8] = b"some_other_format=hello\nimportant_data=42\n";
418        fs::write(&path, original).expect("write fixture");
419
420        let err = CanonicalLog::open(&path).expect_err("must reject non-Mimir file");
421        assert!(
422            matches!(err, LogError::IncompatibleFormat { .. }),
423            "expected IncompatibleFormat, got {err:?}"
424        );
425
426        // CRITICAL invariant: the file was NOT modified by the open
427        // attempt. If this assertion ever regresses, the destructive-
428        // truncate footgun is back.
429        let after = fs::read(&path).expect("read post-open");
430        assert_eq!(
431            after, original,
432            "non-Mimir file must be preserved byte-for-byte on rejected open"
433        );
434    }
435
436    #[test]
437    fn open_refuses_truncated_header() {
438        let tmp = TempDir::new().expect("tmp");
439        let path = tmp.path().join("canonical.log");
440        // 5 bytes — less than the 8-byte header, so we can't even read
441        // the version field. Has the magic prefix to rule out
442        // confusion with the magic-mismatch case.
443        fs::write(&path, b"MIMR\x01").expect("write fixture");
444        let err = CanonicalLog::open(&path).expect_err("must reject truncated header");
445        assert!(
446            matches!(err, LogError::IncompatibleFormat { .. }),
447            "expected IncompatibleFormat, got {err:?}"
448        );
449        // Bytes preserved.
450        assert_eq!(fs::read(&path).expect("read"), b"MIMR\x01");
451    }
452
453    #[test]
454    fn open_refuses_wrong_magic() {
455        let tmp = TempDir::new().expect("tmp");
456        let path = tmp.path().join("canonical.log");
457        // Right size, wrong magic. Plausible scenario: an older
458        // Mimir-related tool that wrote a different prefix.
459        fs::write(&path, b"WICK\x01\x00\x00\x00").expect("write fixture");
460        let err = CanonicalLog::open(&path).expect_err("must reject wrong magic");
461        assert!(
462            matches!(err, LogError::IncompatibleFormat { .. }),
463            "expected IncompatibleFormat, got {err:?}"
464        );
465    }
466
467    #[test]
468    fn open_refuses_unsupported_format_version() {
469        let tmp = TempDir::new().expect("tmp");
470        let path = tmp.path().join("canonical.log");
471        // Right magic, far-future version.
472        let mut header = Vec::with_capacity(8);
473        header.extend_from_slice(&LOG_MAGIC);
474        header.extend_from_slice(&999_u32.to_le_bytes());
475        fs::write(&path, &header).expect("write fixture");
476        let err = CanonicalLog::open(&path).expect_err("must reject unsupported version");
477        match err {
478            LogError::IncompatibleFormat { reason } => {
479                assert!(
480                    reason.contains("999"),
481                    "diagnostic should name the bad version, got: {reason}"
482                );
483            }
484            other => panic!("expected IncompatibleFormat, got {other:?}"),
485        }
486    }
487
488    #[test]
489    fn open_idempotent_against_reopen() {
490        // Opening a v1 log we just created must succeed without
491        // corrupting the header.
492        let tmp = TempDir::new().expect("tmp");
493        let path = tmp.path().join("canonical.log");
494        let _first = CanonicalLog::open(&path).expect("first open");
495        // Capture physical bytes after the first open.
496        let raw1 = fs::read(&path).expect("read 1");
497        // Reopen.
498        let _second = CanonicalLog::open(&path).expect("reopen");
499        let raw2 = fs::read(&path).expect("read 2");
500        assert_eq!(raw1, raw2, "reopen does not mutate the header");
501        assert_eq!(
502            raw1.len(),
503            usize::try_from(LOG_HEADER_SIZE).expect("header fits")
504        );
505    }
506
507    #[test]
508    fn append_sync_roundtrip_preserves_bytes() {
509        let tmp = TempDir::new().expect("tmp");
510        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
511        let payload = checkpoint_bytes(42);
512        log.append(&payload).expect("append");
513        log.sync().expect("sync");
514        let read = log.read_all().expect("read");
515        assert_eq!(read, payload);
516    }
517
518    #[test]
519    fn truncate_shrinks_log() {
520        let tmp = TempDir::new().expect("tmp");
521        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
522        let first = checkpoint_bytes(1);
523        let second = checkpoint_bytes(2);
524        log.append(&first).expect("append 1");
525        log.append(&second).expect("append 2");
526        log.sync().expect("sync");
527        log.truncate(first.len() as u64).expect("truncate");
528        assert_eq!(log.len(), first.len() as u64);
529        let read = log.read_all().expect("read");
530        assert_eq!(read, first);
531    }
532
533    #[test]
534    fn truncate_beyond_end_errors() {
535        let tmp = TempDir::new().expect("tmp");
536        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
537        let err = log.truncate(100).expect_err("beyond");
538        assert!(matches!(
539            err,
540            LogError::TruncateBeyondEnd {
541                requested: 100,
542                current: 0
543            }
544        ));
545    }
546
547    #[test]
548    fn truncate_to_zero_clears_the_log() {
549        let tmp = TempDir::new().expect("tmp");
550        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
551        let payload = checkpoint_bytes(1);
552        log.append(&payload).expect("append");
553        log.sync().expect("sync");
554        assert!(log.len() > 0);
555        log.truncate(0).expect("truncate to zero");
556        assert_eq!(log.len(), 0);
557        assert!(log.is_empty());
558        assert!(log.read_all().expect("read").is_empty());
559    }
560
561    #[test]
562    fn truncate_to_current_length_is_a_noop() {
563        let tmp = TempDir::new().expect("tmp");
564        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
565        let payload = checkpoint_bytes(1);
566        log.append(&payload).expect("append");
567        log.sync().expect("sync");
568        let before = log.len();
569        log.truncate(before).expect("truncate to current len");
570        assert_eq!(log.len(), before);
571        assert_eq!(log.read_all().expect("read"), payload);
572    }
573
574    #[test]
575    fn last_checkpoint_end_returns_zero_for_empty_log() {
576        let tmp = TempDir::new().expect("tmp");
577        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
578        assert_eq!(log.last_checkpoint_end().expect("scan"), 0);
579    }
580
581    #[test]
582    fn last_checkpoint_end_finds_the_final_checkpoint() {
583        let tmp = TempDir::new().expect("tmp");
584        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
585        let cp_a = checkpoint_bytes(1);
586        let cp_b = checkpoint_bytes(2);
587        log.append(&cp_a).expect("append a");
588        log.append(&cp_b).expect("append b");
589        log.sync().expect("sync");
590        let end = log.last_checkpoint_end().expect("scan");
591        assert_eq!(end, (cp_a.len() + cp_b.len()) as u64);
592    }
593
594    #[test]
595    fn last_checkpoint_end_stops_at_corruption() {
596        let tmp = TempDir::new().expect("tmp");
597        let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
598        let cp = checkpoint_bytes(1);
599        log.append(&cp).expect("append");
600        // Half-written orphan: a bare opcode byte with nothing following.
601        log.append(&[0x01_u8]).expect("append garbage");
602        log.sync().expect("sync");
603        let end = log.last_checkpoint_end().expect("scan");
604        // Scan stops at the committed CHECKPOINT boundary.
605        assert_eq!(end, cp.len() as u64);
606    }
607}