Skip to main content

tj_core/
storage.rs

1use crate::event::Event;
2use anyhow::Context;
3use fd_lock::RwLock as FdLock;
4use std::fs::{File, OpenOptions};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7
8/// Append-only writer for the events JSONL log. Holds an advisory
9/// cross-platform file lock around each append + fsync, so that
10/// concurrent producers (auto-capture hook + manual `task-journal
11/// event` + MCP server) cannot interleave bytes — `O_APPEND` alone
12/// is not atomic on Windows.
13///
14/// The trade-off: every append takes one syscall to acquire the
15/// lock and one more to release it. For a journal — which sees a
16/// handful of events per minute — this overhead is negligible and
17/// far cheaper than recovery from a corrupt JSONL line.
18pub struct JsonlWriter {
19    path: PathBuf,
20    lock: FdLock<File>,
21}
22
23impl JsonlWriter {
24    pub fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
25        let path = path.as_ref().to_path_buf();
26        if let Some(parent) = path.parent() {
27            std::fs::create_dir_all(parent).with_context(|| format!("create dir {parent:?}"))?;
28        }
29        // `read(true)` is required on Windows: `fd_lock` calls
30        // `LockFileEx` on the underlying handle, which fails with
31        // `os error 5 (Access is denied)` if the file was opened
32        // append-only — the API needs GENERIC_READ access on the
33        // handle. Linux's flock() doesn't care, so the omission was
34        // silent on POSIX. See windows-rs / fd_lock notes.
35        let file = OpenOptions::new()
36            .create(true)
37            .read(true)
38            .append(true)
39            .open(&path)
40            .with_context(|| format!("open {path:?} for append"))?;
41        Ok(Self {
42            path,
43            lock: FdLock::new(file),
44        })
45    }
46
47    pub fn append(&mut self, event: &Event) -> anyhow::Result<()> {
48        let line = serde_json::to_string(event).context("serialize event")?;
49        let mut guard = self.lock.write().context("acquire exclusive file lock")?;
50        guard
51            .write_all(line.as_bytes())
52            .context("write event line")?;
53        guard.write_all(b"\n").context("write newline")?;
54        Ok(())
55    }
56
57    /// Force the file's bytes through to durable storage. Holds the
58    /// exclusive lock so no concurrent writer can sneak an append
59    /// between us and the fsync.
60    pub fn flush_durable(&mut self) -> anyhow::Result<()> {
61        let guard = self.lock.write().context("acquire exclusive file lock")?;
62        guard.sync_all().context("fsync events file")?;
63        Ok(())
64    }
65
66    pub fn path(&self) -> &Path {
67        &self.path
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74    use crate::event::*;
75    use tempfile::TempDir;
76
77    fn sample_event(text: &str) -> Event {
78        Event::new(
79            "tj-1",
80            EventType::Open,
81            Author::User,
82            Source::Cli,
83            text.into(),
84        )
85    }
86
87    #[test]
88    fn append_three_events_yields_three_lines() {
89        let dir = TempDir::new().unwrap();
90        let path = dir.path().join("events.jsonl");
91
92        let mut w = JsonlWriter::open(&path).unwrap();
93        w.append(&sample_event("a")).unwrap();
94        w.append(&sample_event("b")).unwrap();
95        w.append(&sample_event("c")).unwrap();
96        w.flush_durable().unwrap();
97        drop(w);
98
99        let body = std::fs::read_to_string(&path).unwrap();
100        let lines: Vec<&str> = body.lines().collect();
101        assert_eq!(lines.len(), 3);
102        for line in &lines {
103            let _: Event = serde_json::from_str(line).unwrap();
104        }
105    }
106
107    #[test]
108    fn reopen_appends_not_truncates() {
109        let dir = TempDir::new().unwrap();
110        let path = dir.path().join("events.jsonl");
111
112        {
113            let mut w = JsonlWriter::open(&path).unwrap();
114            w.append(&sample_event("a")).unwrap();
115            w.flush_durable().unwrap();
116        }
117        {
118            let mut w = JsonlWriter::open(&path).unwrap();
119            w.append(&sample_event("b")).unwrap();
120            w.flush_durable().unwrap();
121        }
122
123        let body = std::fs::read_to_string(&path).unwrap();
124        assert_eq!(body.lines().count(), 2);
125    }
126
127    #[test]
128    fn concurrent_appends_do_not_interleave_bytes() {
129        // Eight threads, each owning its own JsonlWriter (own File handle
130        // + own fd_lock::RwLock instance) on the same path, race to write
131        // 100 events apiece. The exclusive advisory lock must serialize
132        // them so every line is a parseable Event with no torn writes.
133        use std::sync::Arc;
134
135        let dir = TempDir::new().unwrap();
136        let path = Arc::new(dir.path().join("events.jsonl"));
137
138        let mut handles = Vec::with_capacity(8);
139        for thread_idx in 0..8 {
140            let path = path.clone();
141            handles.push(std::thread::spawn(move || {
142                let mut w = JsonlWriter::open(&*path).unwrap();
143                for i in 0..100 {
144                    let mut e = Event::new(
145                        format!("tj-t{thread_idx}"),
146                        EventType::Open,
147                        Author::User,
148                        Source::Cli,
149                        format!("thread {thread_idx} event {i}"),
150                    );
151                    e.meta = serde_json::json!({"thread": thread_idx, "i": i});
152                    w.append(&e).unwrap();
153                }
154                w.flush_durable().unwrap();
155            }));
156        }
157        for h in handles {
158            h.join().expect("writer thread panicked");
159        }
160
161        let body = std::fs::read_to_string(&*path).unwrap();
162        let lines: Vec<&str> = body.lines().filter(|l| !l.is_empty()).collect();
163        assert_eq!(lines.len(), 800, "expected 800 lines, got {}", lines.len());
164        for (idx, line) in lines.iter().enumerate() {
165            serde_json::from_str::<Event>(line)
166                .unwrap_or_else(|e| panic!("line {idx} not a valid Event: {e}\n  line: {line}"));
167        }
168    }
169}