Skip to main content

mur_common/multimodal/
ledger.rs

1//! Append-only provenance ledger for `telemetry/inputs.jsonl`.
2//!
3//! Atomic via flock; readers skip malformed lines so a half-written
4//! entry from a crashed writer can't poison subsequent reads.
5
6use anyhow::{Context, Result};
7use fs2::FileExt;
8use std::fs::OpenOptions;
9use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
10use std::path::PathBuf;
11
12use super::ProvenanceEntry;
13
14pub struct ProvenanceLedger {
15    path: PathBuf,
16}
17
18impl ProvenanceLedger {
19    pub fn new(path: impl Into<PathBuf>) -> Self {
20        Self { path: path.into() }
21    }
22
23    /// Append one entry as a single JSON line. Atomic via flock.
24    /// Creates the parent directory if missing.
25    ///
26    /// Note on the open mode: we deliberately use `write(true)` +
27    /// explicit `seek(SeekFrom::End(0))` rather than `append(true)`.
28    /// On Windows, `append(true)` requests `FILE_APPEND_DATA` only,
29    /// which is *not* sufficient for `LockFileEx` (used internally by
30    /// `fs2::FileExt::lock_exclusive`) — Windows demands `GENERIC_WRITE`
31    /// access, otherwise the lock call fails with `ERROR_ACCESS_DENIED`.
32    /// Manual seek-to-end gives us append semantics on every platform
33    /// while keeping the flock contract intact.
34    pub fn append(&self, entry: &ProvenanceEntry) -> Result<()> {
35        if let Some(parent) = self.path.parent() {
36            std::fs::create_dir_all(parent)
37                .with_context(|| format!("create {}", parent.display()))?;
38        }
39        let mut f = OpenOptions::new()
40            .create(true)
41            .read(true)
42            .write(true)
43            .truncate(false)
44            .open(&self.path)
45            .with_context(|| format!("open {}", self.path.display()))?;
46        f.lock_exclusive().context("flock inputs.jsonl")?;
47        f.seek(SeekFrom::End(0))
48            .context("seek to end of inputs.jsonl")?;
49        let line = serde_json::to_string(entry).context("serialize provenance")?;
50        writeln!(f, "{line}").context("append provenance")?;
51        f.unlock().context("unlock inputs.jsonl")?;
52        Ok(())
53    }
54
55    /// Read every entry whose `turn_id == turn`. Malformed lines are
56    /// silently skipped (logged at warn level). Returns Ok(empty)
57    /// when the file does not exist.
58    pub fn read_turn(&self, turn: u64) -> Result<Vec<ProvenanceEntry>> {
59        let f = match OpenOptions::new().read(true).open(&self.path) {
60            Ok(f) => f,
61            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(vec![]),
62            Err(e) => {
63                return Err(anyhow::Error::from(e).context(format!("open {}", self.path.display())));
64            }
65        };
66        let r = BufReader::new(f);
67        let mut out = Vec::new();
68        for (i, line) in r.lines().enumerate() {
69            let Ok(line) = line else { continue };
70            match serde_json::from_str::<ProvenanceEntry>(&line) {
71                Ok(e) if e.turn_id == turn => out.push(e),
72                Ok(_) => continue,
73                Err(e) => {
74                    tracing::warn!(
75                        "ledger {}: skipping malformed line {}: {e}",
76                        self.path.display(),
77                        i + 1
78                    );
79                }
80            }
81        }
82        Ok(out)
83    }
84}