Skip to main content

tj_core/
storage.rs

1use crate::event::Event;
2use anyhow::Context;
3use std::fs::{File, OpenOptions};
4use std::io::{BufWriter, Write};
5use std::path::{Path, PathBuf};
6
7pub struct JsonlWriter {
8    path: PathBuf,
9    inner: BufWriter<File>,
10}
11
12impl JsonlWriter {
13    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
14        let path = path.as_ref().to_path_buf();
15        if let Some(parent) = path.parent() {
16            std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
17        }
18        let file = OpenOptions::new()
19            .create(true)
20            .append(true)
21            .open(&path)
22            .with_context(|| format!("open {path:?} for append"))?;
23        Ok(Self {
24            path,
25            inner: BufWriter::new(file),
26        })
27    }
28
29    pub fn append(&mut self, event: &Event) -> anyhow::Result<()> {
30        let line = serde_json::to_string(event).context("serialize event")?;
31        self.inner
32            .write_all(line.as_bytes())
33            .context("write event line")?;
34        self.inner.write_all(b"\n").context("write newline")?;
35        Ok(())
36    }
37
38    /// Flush user buffers to OS, then fsync the underlying file so the bytes
39    /// survive a crash. Call after every batch of appends that must be durable.
40    pub fn flush_durable(&mut self) -> anyhow::Result<()> {
41        self.inner.flush().context("flush BufWriter")?;
42        self.inner
43            .get_ref()
44            .sync_all()
45            .context("fsync events file")?;
46        Ok(())
47    }
48
49    pub fn path(&self) -> &Path {
50        &self.path
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use crate::event::*;
58    use tempfile::TempDir;
59
60    fn sample_event(text: &str) -> Event {
61        Event::new(
62            "tj-1",
63            EventType::Open,
64            Author::User,
65            Source::Cli,
66            text.into(),
67        )
68    }
69
70    #[test]
71    fn append_three_events_yields_three_lines() {
72        let dir = TempDir::new().unwrap();
73        let path = dir.path().join("events.jsonl");
74
75        let mut w = JsonlWriter::open(&path).unwrap();
76        w.append(&sample_event("a")).unwrap();
77        w.append(&sample_event("b")).unwrap();
78        w.append(&sample_event("c")).unwrap();
79        w.flush_durable().unwrap();
80        drop(w);
81
82        let body = std::fs::read_to_string(&path).unwrap();
83        let lines: Vec<&str> = body.lines().collect();
84        assert_eq!(lines.len(), 3);
85        for line in &lines {
86            let _: Event = serde_json::from_str(line).unwrap();
87        }
88    }
89
90    #[test]
91    fn reopen_appends_not_truncates() {
92        let dir = TempDir::new().unwrap();
93        let path = dir.path().join("events.jsonl");
94
95        {
96            let mut w = JsonlWriter::open(&path).unwrap();
97            w.append(&sample_event("a")).unwrap();
98            w.flush_durable().unwrap();
99        }
100        {
101            let mut w = JsonlWriter::open(&path).unwrap();
102            w.append(&sample_event("b")).unwrap();
103            w.flush_durable().unwrap();
104        }
105
106        let body = std::fs::read_to_string(&path).unwrap();
107        assert_eq!(body.lines().count(), 2);
108    }
109}