Skip to main content

mur_common/
ledger.rs

1use anyhow::{Context, Result};
2use chrono::Local;
3use serde::{Serialize, de::DeserializeOwned};
4use std::{
5    fs::{self, OpenOptions},
6    io::{BufRead, BufReader, Write},
7    path::{Path, PathBuf},
8    time::{Duration, Instant},
9};
10
11/// Append-only JSONL ledger with per-day file rotation.
12///
13/// Generic over the event type `E`. For multi-writer scenarios,
14/// wrap in `Arc<Mutex<Ledger<E>>>` — the file-level `flock` is
15/// handled by the OS when opening for append.
16pub struct Ledger<E> {
17    base_dir: PathBuf,
18    last_fsync: Instant,
19    _marker: std::marker::PhantomData<E>,
20}
21
22impl<E: Serialize + DeserializeOwned> Ledger<E> {
23    /// Open (or create) a ledger directory.
24    pub fn open(base_dir: &Path) -> Result<Self> {
25        fs::create_dir_all(base_dir).context("create ledger dir")?;
26        Ok(Self {
27            base_dir: base_dir.to_path_buf(),
28            last_fsync: Instant::now(),
29            _marker: std::marker::PhantomData,
30        })
31    }
32
33    /// Append one event to today's JSONL file. Debounced fsync (≤1s).
34    pub fn append(&mut self, event: &E) -> Result<()> {
35        let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
36        let path = self.base_dir.join(format!("{today}.jsonl"));
37        let mut f = OpenOptions::new()
38            .create(true)
39            .append(true)
40            .open(&path)
41            .with_context(|| format!("open ledger {}", path.display()))?;
42        let line = serde_json::to_string(event).context("serialize event")?;
43        writeln!(f, "{line}").context("write event")?;
44
45        if self.last_fsync.elapsed() > Duration::from_secs(1) {
46            f.sync_data().context("fsync ledger")?;
47            self.last_fsync = Instant::now();
48        }
49        Ok(())
50    }
51
52    pub fn base_dir(&self) -> &Path {
53        &self.base_dir
54    }
55
56    pub fn flush(&mut self) -> Result<()> {
57        let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
58        let path = self.base_dir.join(format!("{today}.jsonl"));
59        if path.exists() {
60            let f = OpenOptions::new()
61                .write(true)
62                .open(&path)
63                .with_context(|| format!("open for fsync {}", path.display()))?;
64            f.sync_data().context("fsync ledger")?;
65        }
66        self.last_fsync = Instant::now();
67        Ok(())
68    }
69
70    /// Scan the most recent `days` daily files in chronological order.
71    /// Skips malformed lines and missing files.
72    pub fn scan_days(base_dir: &Path, days: u32) -> Vec<Result<E>> {
73        let mut out = Vec::new();
74        if !base_dir.exists() {
75            return out;
76        }
77        let today = Local::now().date_naive();
78        let mut dates: Vec<_> = (0..days as i64)
79            .map(|i| today - chrono::Duration::days(i))
80            .collect();
81        dates.sort();
82        for date in dates {
83            let p = base_dir.join(format!("{}.jsonl", date.format("%Y-%m-%d")));
84            if !p.exists() {
85                continue;
86            }
87            let f = match std::fs::File::open(&p) {
88                Ok(f) => f,
89                Err(_) => continue,
90            };
91            for (i, line) in BufReader::new(f).lines().enumerate() {
92                let line = match line {
93                    Ok(l) => l,
94                    Err(_) => continue,
95                };
96                if line.trim().is_empty() {
97                    continue;
98                }
99                match serde_json::from_str::<E>(&line) {
100                    Ok(e) => out.push(Ok(e)),
101                    Err(err) => {
102                        tracing::warn!(
103                            "ledger {}:{} skip malformed line: {err}",
104                            p.display(),
105                            i + 1
106                        );
107                    }
108                }
109            }
110        }
111        out
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use serde::Deserialize;
119    use tempfile::TempDir;
120
121    #[derive(Debug, Serialize, Deserialize, PartialEq)]
122    struct TestEvent {
123        msg: String,
124        n: u32,
125    }
126
127    #[test]
128    fn append_and_scan_roundtrip() {
129        let tmp = TempDir::new().unwrap();
130        let mut ledger = Ledger::<TestEvent>::open(tmp.path()).unwrap();
131        ledger
132            .append(&TestEvent {
133                msg: "hello".into(),
134                n: 1,
135            })
136            .unwrap();
137        ledger
138            .append(&TestEvent {
139                msg: "world".into(),
140                n: 2,
141            })
142            .unwrap();
143        // Must flush so the file is on disk before scanning
144        ledger.flush().unwrap();
145        drop(ledger);
146
147        let results = Ledger::<TestEvent>::scan_days(tmp.path(), 1);
148        let events: Vec<_> = results.into_iter().filter_map(|r| r.ok()).collect();
149        assert_eq!(events.len(), 2);
150        assert_eq!(events[0].msg, "hello");
151        assert_eq!(events[1].msg, "world");
152    }
153
154    #[test]
155    fn open_creates_missing_dir() {
156        let tmp = TempDir::new().unwrap();
157        let sub = tmp.path().join("sub");
158        let _ledger = Ledger::<TestEvent>::open(&sub).unwrap();
159        assert!(sub.exists());
160    }
161
162    #[test]
163    fn scan_empty_dir_returns_empty() {
164        let tmp = TempDir::new().unwrap();
165        let results = Ledger::<TestEvent>::scan_days(tmp.path(), 7);
166        assert!(results.is_empty());
167    }
168
169    #[test]
170    fn scan_skips_malformed_lines() {
171        let tmp = TempDir::new().unwrap();
172        let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
173        let path = tmp.path().join(format!("{today}.jsonl"));
174        std::fs::write(
175            &path,
176            r#"{"msg":"ok","n":1}"#.to_string()
177                + "\n"
178                + "garbage\n"
179                + r#"{"msg":"also ok","n":3}"#
180                + "\n",
181        )
182        .unwrap();
183
184        let results = Ledger::<TestEvent>::scan_days(tmp.path(), 1);
185        let events: Vec<_> = results.into_iter().filter_map(|r| r.ok()).collect();
186        assert_eq!(events.len(), 2);
187        assert_eq!(events[0].msg, "ok");
188        assert_eq!(events[1].msg, "also ok");
189    }
190}