1use crate::event::Event;
2use anyhow::Context;
3use fd_lock::RwLock as FdLock;
4use std::fs::{File, OpenOptions};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7
8pub struct JsonlWriter {
19 path: PathBuf,
20 lock: FdLock<File>,
21}
22
23impl JsonlWriter {
24 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
25 let path = path.as_ref().to_path_buf();
26 if let Some(parent) = path.parent() {
27 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
28 }
29 let file = OpenOptions::new()
30 .create(true)
31 .append(true)
32 .open(&path)
33 .with_context(|| format!("open {path:?} for append"))?;
34 Ok(Self {
35 path,
36 lock: FdLock::new(file),
37 })
38 }
39
40 pub fn append(&mut self, event: &Event) -> anyhow::Result<()> {
41 let line = serde_json::to_string(event).context("serialize event")?;
42 let mut guard = self.lock.write().context("acquire exclusive file lock")?;
43 guard
44 .write_all(line.as_bytes())
45 .context("write event line")?;
46 guard.write_all(b"\n").context("write newline")?;
47 Ok(())
48 }
49
50 pub fn flush_durable(&mut self) -> anyhow::Result<()> {
54 let guard = self.lock.write().context("acquire exclusive file lock")?;
55 guard.sync_all().context("fsync events file")?;
56 Ok(())
57 }
58
59 pub fn path(&self) -> &Path {
60 &self.path
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use crate::event::*;
68 use tempfile::TempDir;
69
70 fn sample_event(text: &str) -> Event {
71 Event::new(
72 "tj-1",
73 EventType::Open,
74 Author::User,
75 Source::Cli,
76 text.into(),
77 )
78 }
79
80 #[test]
81 fn append_three_events_yields_three_lines() {
82 let dir = TempDir::new().unwrap();
83 let path = dir.path().join("events.jsonl");
84
85 let mut w = JsonlWriter::open(&path).unwrap();
86 w.append(&sample_event("a")).unwrap();
87 w.append(&sample_event("b")).unwrap();
88 w.append(&sample_event("c")).unwrap();
89 w.flush_durable().unwrap();
90 drop(w);
91
92 let body = std::fs::read_to_string(&path).unwrap();
93 let lines: Vec<&str> = body.lines().collect();
94 assert_eq!(lines.len(), 3);
95 for line in &lines {
96 let _: Event = serde_json::from_str(line).unwrap();
97 }
98 }
99
100 #[test]
101 fn reopen_appends_not_truncates() {
102 let dir = TempDir::new().unwrap();
103 let path = dir.path().join("events.jsonl");
104
105 {
106 let mut w = JsonlWriter::open(&path).unwrap();
107 w.append(&sample_event("a")).unwrap();
108 w.flush_durable().unwrap();
109 }
110 {
111 let mut w = JsonlWriter::open(&path).unwrap();
112 w.append(&sample_event("b")).unwrap();
113 w.flush_durable().unwrap();
114 }
115
116 let body = std::fs::read_to_string(&path).unwrap();
117 assert_eq!(body.lines().count(), 2);
118 }
119
120 #[test]
121 fn concurrent_appends_do_not_interleave_bytes() {
122 use std::sync::Arc;
127
128 let dir = TempDir::new().unwrap();
129 let path = Arc::new(dir.path().join("events.jsonl"));
130
131 let mut handles = Vec::with_capacity(8);
132 for thread_idx in 0..8 {
133 let path = path.clone();
134 handles.push(std::thread::spawn(move || {
135 let mut w = JsonlWriter::open(&*path).unwrap();
136 for i in 0..100 {
137 let mut e = Event::new(
138 format!("tj-t{thread_idx}"),
139 EventType::Open,
140 Author::User,
141 Source::Cli,
142 format!("thread {thread_idx} event {i}"),
143 );
144 e.meta = serde_json::json!({"thread": thread_idx, "i": i});
145 w.append(&e).unwrap();
146 }
147 w.flush_durable().unwrap();
148 }));
149 }
150 for h in handles {
151 h.join().expect("writer thread panicked");
152 }
153
154 let body = std::fs::read_to_string(&*path).unwrap();
155 let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
156 assert_eq!(lines.len(), 800, "expected 800 lines, got {}", lines.len());
157 for (idx, line) in lines.iter().enumerate() {
158 serde_json::from_str::<Event>(line)
159 .unwrap_or_else(|e| panic!("line {idx} not a valid Event: {e}\n line: {line}"));
160 }
161 }
162}