1use anyhow::{Context, Result};
2use chrono::Local;
3use serde::{Serialize, de::DeserializeOwned};
4use std::{
5 fs::{self, OpenOptions},
6 io::{BufRead, BufReader, Write},
7 path::{Path, PathBuf},
8 time::{Duration, Instant},
9};
10
11pub struct Ledger<E> {
17 base_dir: PathBuf,
18 last_fsync: Instant,
19 _marker: std::marker::PhantomData<E>,
20}
21
22impl<E: Serialize + DeserializeOwned> Ledger<E> {
23 pub fn open(base_dir: &Path) -> Result<Self> {
25 fs::create_dir_all(base_dir).context("create ledger dir")?;
26 Ok(Self {
27 base_dir: base_dir.to_path_buf(),
28 last_fsync: Instant::now(),
29 _marker: std::marker::PhantomData,
30 })
31 }
32
33 pub fn append(&mut self, event: &E) -> Result<()> {
35 let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
36 let path = self.base_dir.join(format!("{today}.jsonl"));
37 let mut f = OpenOptions::new()
38 .create(true)
39 .append(true)
40 .open(&path)
41 .with_context(|| format!("open ledger {}", path.display()))?;
42 let line = serde_json::to_string(event).context("serialize event")?;
43 writeln!(f, "{line}").context("write event")?;
44
45 if self.last_fsync.elapsed() > Duration::from_secs(1) {
46 f.sync_data().context("fsync ledger")?;
47 self.last_fsync = Instant::now();
48 }
49 Ok(())
50 }
51
52 pub fn base_dir(&self) -> &Path {
53 &self.base_dir
54 }
55
56 pub fn flush(&mut self) -> Result<()> {
57 let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
58 let path = self.base_dir.join(format!("{today}.jsonl"));
59 if path.exists() {
60 let f = OpenOptions::new()
61 .write(true)
62 .open(&path)
63 .with_context(|| format!("open for fsync {}", path.display()))?;
64 f.sync_data().context("fsync ledger")?;
65 }
66 self.last_fsync = Instant::now();
67 Ok(())
68 }
69
70 pub fn scan_days(base_dir: &Path, days: u32) -> Vec<Result<E>> {
73 let mut out = Vec::new();
74 if !base_dir.exists() {
75 return out;
76 }
77 let today = Local::now().date_naive();
78 let mut dates: Vec<_> = (0..days as i64)
79 .map(|i| today - chrono::Duration::days(i))
80 .collect();
81 dates.sort();
82 for date in dates {
83 let p = base_dir.join(format!("{}.jsonl", date.format("%Y-%m-%d")));
84 if !p.exists() {
85 continue;
86 }
87 let f = match std::fs::File::open(&p) {
88 Ok(f) => f,
89 Err(_) => continue,
90 };
91 for (i, line) in BufReader::new(f).lines().enumerate() {
92 let line = match line {
93 Ok(l) => l,
94 Err(_) => continue,
95 };
96 if line.trim().is_empty() {
97 continue;
98 }
99 match serde_json::from_str::<E>(&line) {
100 Ok(e) => out.push(Ok(e)),
101 Err(err) => {
102 tracing::warn!(
103 "ledger {}:{} skip malformed line: {err}",
104 p.display(),
105 i + 1
106 );
107 }
108 }
109 }
110 }
111 out
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use serde::Deserialize;
119 use tempfile::TempDir;
120
121 #[derive(Debug, Serialize, Deserialize, PartialEq)]
122 struct TestEvent {
123 msg: String,
124 n: u32,
125 }
126
127 #[test]
128 fn append_and_scan_roundtrip() {
129 let tmp = TempDir::new().unwrap();
130 let mut ledger = Ledger::<TestEvent>::open(tmp.path()).unwrap();
131 ledger
132 .append(&TestEvent {
133 msg: "hello".into(),
134 n: 1,
135 })
136 .unwrap();
137 ledger
138 .append(&TestEvent {
139 msg: "world".into(),
140 n: 2,
141 })
142 .unwrap();
143 ledger.flush().unwrap();
145 drop(ledger);
146
147 let results = Ledger::<TestEvent>::scan_days(tmp.path(), 1);
148 let events: Vec<_> = results.into_iter().filter_map(|r| r.ok()).collect();
149 assert_eq!(events.len(), 2);
150 assert_eq!(events[0].msg, "hello");
151 assert_eq!(events[1].msg, "world");
152 }
153
154 #[test]
155 fn open_creates_missing_dir() {
156 let tmp = TempDir::new().unwrap();
157 let sub = tmp.path().join("sub");
158 let _ledger = Ledger::<TestEvent>::open(&sub).unwrap();
159 assert!(sub.exists());
160 }
161
162 #[test]
163 fn scan_empty_dir_returns_empty() {
164 let tmp = TempDir::new().unwrap();
165 let results = Ledger::<TestEvent>::scan_days(tmp.path(), 7);
166 assert!(results.is_empty());
167 }
168
169 #[test]
170 fn scan_skips_malformed_lines() {
171 let tmp = TempDir::new().unwrap();
172 let today = Local::now().date_naive().format("%Y-%m-%d").to_string();
173 let path = tmp.path().join(format!("{today}.jsonl"));
174 std::fs::write(
175 &path,
176 r#"{"msg":"ok","n":1}"#.to_string()
177 + "\n"
178 + "garbage\n"
179 + r#"{"msg":"also ok","n":3}"#
180 + "\n",
181 )
182 .unwrap();
183
184 let results = Ledger::<TestEvent>::scan_days(tmp.path(), 1);
185 let events: Vec<_> = results.into_iter().filter_map(|r| r.ok()).collect();
186 assert_eq!(events.len(), 2);
187 assert_eq!(events[0].msg, "ok");
188 assert_eq!(events[1].msg, "also ok");
189 }
190}