1use crate::event::Event;
2use anyhow::Context;
3use std::fs::{File, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6
7pub struct JsonlWriter {
8 path: PathBuf,
9 inner: BufWriter<File>,
10}
11
12impl JsonlWriter {
13 pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
14 let path = path.as_ref().to_path_buf();
15 if let Some(parent) = path.parent() {
16 std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
17 }
18 let file = OpenOptions::new()
19 .create(true)
20 .append(true)
21 .open(&path)
22 .with_context(|| format!("open {path:?} for append"))?;
23 Ok(Self {
24 path,
25 inner: BufWriter::new(file),
26 })
27 }
28
29 pub fn append(&mut self, event: &Event) -> anyhow::Result<()> {
30 let line = serde_json::to_string(event).context("serialize event")?;
31 self.inner
32 .write_all(line.as_bytes())
33 .context("write event line")?;
34 self.inner.write_all(b"\n").context("write newline")?;
35 Ok(())
36 }
37
38 pub fn flush_durable(&mut self) -> anyhow::Result<()> {
41 self.inner.flush().context("flush BufWriter")?;
42 self.inner
43 .get_ref()
44 .sync_all()
45 .context("fsync events file")?;
46 Ok(())
47 }
48
49 pub fn path(&self) -> &Path {
50 &self.path
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57 use crate::event::*;
58 use tempfile::TempDir;
59
60 fn sample_event(text: &str) -> Event {
61 Event::new(
62 "tj-1",
63 EventType::Open,
64 Author::User,
65 Source::Cli,
66 text.into(),
67 )
68 }
69
70 #[test]
71 fn append_three_events_yields_three_lines() {
72 let dir = TempDir::new().unwrap();
73 let path = dir.path().join("events.jsonl");
74
75 let mut w = JsonlWriter::open(&path).unwrap();
76 w.append(&sample_event("a")).unwrap();
77 w.append(&sample_event("b")).unwrap();
78 w.append(&sample_event("c")).unwrap();
79 w.flush_durable().unwrap();
80 drop(w);
81
82 let body = std::fs::read_to_string(&path).unwrap();
83 let lines: Vec<&str> = body.lines().collect();
84 assert_eq!(lines.len(), 3);
85 for line in &lines {
86 let _: Event = serde_json::from_str(line).unwrap();
87 }
88 }
89
90 #[test]
91 fn reopen_appends_not_truncates() {
92 let dir = TempDir::new().unwrap();
93 let path = dir.path().join("events.jsonl");
94
95 {
96 let mut w = JsonlWriter::open(&path).unwrap();
97 w.append(&sample_event("a")).unwrap();
98 w.flush_durable().unwrap();
99 }
100 {
101 let mut w = JsonlWriter::open(&path).unwrap();
102 w.append(&sample_event("b")).unwrap();
103 w.flush_durable().unwrap();
104 }
105
106 let body = std::fs::read_to_string(&path).unwrap();
107 assert_eq!(body.lines().count(), 2);
108 }
109}