use anyhow::{Context, Result};
use chrono::Local;
use serde::{Serialize, de::DeserializeOwned};
use std::{
fs::{self, OpenOptions},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
time::{Duration, Instant},
};
pub struct Ledger<E> {
base_dir: PathBuf,
last_fsync: Instant,
_marker: std::marker::PhantomData<E>,
}
impl<E: Serialize + DeserializeOwned> Ledger<E> {
pub fn open(base_dir: &Path) -> Result<Self> {
fs::create_dir_all(base_dir).context("create ledger dir")?;
Ok(Self {
base_dir: base_dir.to_path_buf(),
last_fsync: Instant::now(),
_marker: std::marker::PhantomData,
})
}
pub fn append(&mut self, event: &E) -> Result<()> {
let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
let path = self.base_dir.join(format!("{today}.jsonl"));
let mut f = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.with_context(|| format!("open ledger {}", path.display()))?;
let line = serde_json::to_string(event).context("serialize event")?;
writeln!(f, "{line}").context("write event")?;
if self.last_fsync.elapsed() > Duration::from_secs(1) {
f.sync_data().context("fsync ledger")?;
self.last_fsync = Instant::now();
}
Ok(())
}
pub fn base_dir(&self) -> &Path {
&self.base_dir
}
pub fn flush(&mut self) -> Result<()> {
let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
let path = self.base_dir.join(format!("{today}.jsonl"));
if path.exists() {
let f = OpenOptions::new()
.write(true)
.open(&path)
.with_context(|| format!("open for fsync {}", path.display()))?;
f.sync_data().context("fsync ledger")?;
}
self.last_fsync = Instant::now();
Ok(())
}
pub fn scan_days(base_dir: &Path, days: u32) -> Vec<Result<E>> {
let mut out = Vec::new();
if !base_dir.exists() {
return out;
}
let today = Local::now().date_naive();
let mut dates: Vec<_> = (0..days as i64)
.map(|i| today - chrono::Duration::days(i))
.collect();
dates.sort();
for date in dates {
let p = base_dir.join(format!("{}.jsonl", date.format("%Y-%m-%d")));
if !p.exists() {
continue;
}
let f = match std::fs::File::open(&p) {
Ok(f) => f,
Err(_) => continue,
};
for (i, line) in BufReader::new(f).lines().enumerate() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<E>(&line) {
Ok(e) => out.push(Ok(e)),
Err(err) => {
tracing::warn!(
"ledger {}:{} skip malformed line: {err}",
p.display(),
i + 1
);
}
}
}
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
use tempfile::TempDir;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct TestEvent {
msg: String,
n: u32,
}
#[test]
fn append_and_scan_roundtrip() {
let tmp = TempDir::new().unwrap();
let mut ledger = Ledger::<TestEvent>::open(tmp.path()).unwrap();
ledger
.append(&TestEvent {
msg: "hello".into(),
n: 1,
})
.unwrap();
ledger
.append(&TestEvent {
msg: "world".into(),
n: 2,
})
.unwrap();
ledger.flush().unwrap();
drop(ledger);
let results = Ledger::<TestEvent>::scan_days(tmp.path(), 1);
let events: Vec<_> = results.into_iter().filter_map(|r| r.ok()).collect();
assert_eq!(events.len(), 2);
assert_eq!(events[0].msg, "hello");
assert_eq!(events[1].msg, "world");
}
#[test]
fn open_creates_missing_dir() {
let tmp = TempDir::new().unwrap();
let sub = tmp.path().join("sub");
let _ledger = Ledger::<TestEvent>::open(&sub).unwrap();
assert!(sub.exists());
}
#[test]
fn scan_empty_dir_returns_empty() {
let tmp = TempDir::new().unwrap();
let results = Ledger::<TestEvent>::scan_days(tmp.path(), 7);
assert!(results.is_empty());
}
#[test]
fn scan_skips_malformed_lines() {
let tmp = TempDir::new().unwrap();
let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
let path = tmp.path().join(format!("{today}.jsonl"));
std::fs::write(
&path,
r#"{"msg":"ok","n":1}"#.to_string()
+ "\n"
+ "garbage\n"
+ r#"{"msg":"also ok","n":3}"#
+ "\n",
)
.unwrap();
let results = Ledger::<TestEvent>::scan_days(tmp.path(), 1);
let events: Vec<_> = results.into_iter().filter_map(|r| r.ok()).collect();
assert_eq!(events.len(), 2);
assert_eq!(events[0].msg, "ok");
assert_eq!(events[1].msg, "also ok");
}
}