Skip to main content

tj_core/
storage.rs

1use crate::event::Event;
2use anyhow::Context;
3use fd_lock::RwLock as FdLock;
4use std::fs::{File, OpenOptions};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7
8/// Append-only writer for the events JSONL log. Holds an advisory
9/// cross-platform file lock around each append + fsync, so that
10/// concurrent producers (auto-capture hook + manual `task-journal
11/// event` + MCP server) cannot interleave bytes — `O_APPEND` alone
12/// is not atomic on Windows.
13///
14/// The trade-off: every append takes one syscall to acquire the
15/// lock and one more to release it. For a journal — which sees a
16/// handful of events per minute — this overhead is negligible and
17/// far cheaper than recovery from a corrupt JSONL line.
18pub struct JsonlWriter {
19    path: PathBuf,
20    lock: FdLock<File>,
21}
22
23impl JsonlWriter {
24    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
25        let path = path.as_ref().to_path_buf();
26        if let Some(parent) = path.parent() {
27            std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
28        }
29        let file = OpenOptions::new()
30            .create(true)
31            .append(true)
32            .open(&path)
33            .with_context(|| format!("open {path:?} for append"))?;
34        Ok(Self {
35            path,
36            lock: FdLock::new(file),
37        })
38    }
39
40    pub fn append(&mut self, event: &Event) -> anyhow::Result<()> {
41        let line = serde_json::to_string(event).context("serialize event")?;
42        let mut guard = self.lock.write().context("acquire exclusive file lock")?;
43        guard
44            .write_all(line.as_bytes())
45            .context("write event line")?;
46        guard.write_all(b"\n").context("write newline")?;
47        Ok(())
48    }
49
50    /// Force the file's bytes through to durable storage. Holds the
51    /// exclusive lock so no concurrent writer can sneak an append
52    /// between us and the fsync.
53    pub fn flush_durable(&mut self) -> anyhow::Result<()> {
54        let guard = self.lock.write().context("acquire exclusive file lock")?;
55        guard.sync_all().context("fsync events file")?;
56        Ok(())
57    }
58
59    pub fn path(&self) -> &Path {
60        &self.path
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::event::*;
68    use tempfile::TempDir;
69
70    fn sample_event(text: &str) -> Event {
71        Event::new(
72            "tj-1",
73            EventType::Open,
74            Author::User,
75            Source::Cli,
76            text.into(),
77        )
78    }
79
80    #[test]
81    fn append_three_events_yields_three_lines() {
82        let dir = TempDir::new().unwrap();
83        let path = dir.path().join("events.jsonl");
84
85        let mut w = JsonlWriter::open(&path).unwrap();
86        w.append(&sample_event("a")).unwrap();
87        w.append(&sample_event("b")).unwrap();
88        w.append(&sample_event("c")).unwrap();
89        w.flush_durable().unwrap();
90        drop(w);
91
92        let body = std::fs::read_to_string(&path).unwrap();
93        let lines: Vec<&str> = body.lines().collect();
94        assert_eq!(lines.len(), 3);
95        for line in &lines {
96            let _: Event = serde_json::from_str(line).unwrap();
97        }
98    }
99
100    #[test]
101    fn reopen_appends_not_truncates() {
102        let dir = TempDir::new().unwrap();
103        let path = dir.path().join("events.jsonl");
104
105        {
106            let mut w = JsonlWriter::open(&path).unwrap();
107            w.append(&sample_event("a")).unwrap();
108            w.flush_durable().unwrap();
109        }
110        {
111            let mut w = JsonlWriter::open(&path).unwrap();
112            w.append(&sample_event("b")).unwrap();
113            w.flush_durable().unwrap();
114        }
115
116        let body = std::fs::read_to_string(&path).unwrap();
117        assert_eq!(body.lines().count(), 2);
118    }
119
120    #[test]
121    fn concurrent_appends_do_not_interleave_bytes() {
122        // Eight threads, each owning its own JsonlWriter (own File handle
123        // + own fd_lock::RwLock instance) on the same path, race to write
124        // 100 events apiece. The exclusive advisory lock must serialize
125        // them so every line is a parseable Event with no torn writes.
126        use std::sync::Arc;
127
128        let dir = TempDir::new().unwrap();
129        let path = Arc::new(dir.path().join("events.jsonl"));
130
131        let mut handles = Vec::with_capacity(8);
132        for thread_idx in 0..8 {
133            let path = path.clone();
134            handles.push(std::thread::spawn(move || {
135                let mut w = JsonlWriter::open(&*path).unwrap();
136                for i in 0..100 {
137                    let mut e = Event::new(
138                        format!("tj-t{thread_idx}"),
139                        EventType::Open,
140                        Author::User,
141                        Source::Cli,
142                        format!("thread {thread_idx} event {i}"),
143                    );
144                    e.meta = serde_json::json!({"thread": thread_idx, "i": i});
145                    w.append(&e).unwrap();
146                }
147                w.flush_durable().unwrap();
148            }));
149        }
150        for h in handles {
151            h.join().expect("writer thread panicked");
152        }
153
154        let body = std::fs::read_to_string(&*path).unwrap();
155        let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
156        assert_eq!(lines.len(), 800, "expected 800 lines, got {}", lines.len());
157        for (idx, line) in lines.iter().enumerate() {
158            serde_json::from_str::<Event>(line)
159                .unwrap_or_else(|e| panic!("line {idx} not a valid Event: {e}\n  line: {line}"));
160        }
161    }
162}