1use crate::event::Event;
2use anyhow::Context;
3use fd_lock::RwLock as FdLock;
4use std::fs::{File, OpenOptions};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7
8pub struct JsonlWriter {
19 path: PathBuf,
20 lock: FdLock<File>,
21}
22
23impl JsonlWriter {
24 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
25 let path = path.as_ref().to_path_buf();
26 if let Some(parent) = path.parent() {
27 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
28 }
29 let file = OpenOptions::new()
36 .create(true)
37 .read(true)
38 .append(true)
39 .open(&path)
40 .with_context(|| format!("open {path:?} for append"))?;
41 Ok(Self {
42 path,
43 lock: FdLock::new(file),
44 })
45 }
46
47 pub fn append(&mut self, event: &Event) -> anyhow::Result<()> {
48 let line = serde_json::to_string(event).context("serialize event")?;
49 let mut guard = self.lock.write().context("acquire exclusive file lock")?;
50 guard
51 .write_all(line.as_bytes())
52 .context("write event line")?;
53 guard.write_all(b"\n").context("write newline")?;
54 Ok(())
55 }
56
57 pub fn flush_durable(&mut self) -> anyhow::Result<()> {
61 let guard = self.lock.write().context("acquire exclusive file lock")?;
62 guard.sync_all().context("fsync events file")?;
63 Ok(())
64 }
65
66 pub fn path(&self) -> &Path {
67 &self.path
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74 use crate::event::*;
75 use tempfile::TempDir;
76
77 fn sample_event(text: &str) -> Event {
78 Event::new(
79 "tj-1",
80 EventType::Open,
81 Author::User,
82 Source::Cli,
83 text.into(),
84 )
85 }
86
87 #[test]
88 fn append_three_events_yields_three_lines() {
89 let dir = TempDir::new().unwrap();
90 let path = dir.path().join("events.jsonl");
91
92 let mut w = JsonlWriter::open(&path).unwrap();
93 w.append(&sample_event("a")).unwrap();
94 w.append(&sample_event("b")).unwrap();
95 w.append(&sample_event("c")).unwrap();
96 w.flush_durable().unwrap();
97 drop(w);
98
99 let body = std::fs::read_to_string(&path).unwrap();
100 let lines: Vec<&str> = body.lines().collect();
101 assert_eq!(lines.len(), 3);
102 for line in &lines {
103 let _: Event = serde_json::from_str(line).unwrap();
104 }
105 }
106
107 #[test]
108 fn reopen_appends_not_truncates() {
109 let dir = TempDir::new().unwrap();
110 let path = dir.path().join("events.jsonl");
111
112 {
113 let mut w = JsonlWriter::open(&path).unwrap();
114 w.append(&sample_event("a")).unwrap();
115 w.flush_durable().unwrap();
116 }
117 {
118 let mut w = JsonlWriter::open(&path).unwrap();
119 w.append(&sample_event("b")).unwrap();
120 w.flush_durable().unwrap();
121 }
122
123 let body = std::fs::read_to_string(&path).unwrap();
124 assert_eq!(body.lines().count(), 2);
125 }
126
127 #[test]
128 fn concurrent_appends_do_not_interleave_bytes() {
129 use std::sync::Arc;
134
135 let dir = TempDir::new().unwrap();
136 let path = Arc::new(dir.path().join("events.jsonl"));
137
138 let mut handles = Vec::with_capacity(8);
139 for thread_idx in 0..8 {
140 let path = path.clone();
141 handles.push(std::thread::spawn(move || {
142 let mut w = JsonlWriter::open(&*path).unwrap();
143 for i in 0..100 {
144 let mut e = Event::new(
145 format!("tj-t{thread_idx}"),
146 EventType::Open,
147 Author::User,
148 Source::Cli,
149 format!("thread {thread_idx} event {i}"),
150 );
151 e.meta = serde_json::json!({"thread": thread_idx, "i": i});
152 w.append(&e).unwrap();
153 }
154 w.flush_durable().unwrap();
155 }));
156 }
157 for h in handles {
158 h.join().expect("writer thread panicked");
159 }
160
161 let body = std::fs::read_to_string(&*path).unwrap();
162 let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
163 assert_eq!(lines.len(), 800, "expected 800 lines, got {}", lines.len());
164 for (idx, line) in lines.iter().enumerate() {
165 serde_json::from_str::<Event>(line)
166 .unwrap_or_else(|e| panic!("line {idx} not a valid Event: {e}\n line: {line}"));
167 }
168 }
169}