Skip to main content

bamboo_engine/events/
journal.rs

1//! Durable JSONL journal for the account change feed.
2//!
3//! Each [`ChangeEvent`] is appended as one JSON line to a rotating set of files
4//! under `<bamboo_dir>/events/`, named `events-<20-digit-start-seq>.jsonl`. The
5//! zero-padded start-seq in the filename makes the lexical file order match the
6//! numeric seq order, so seeking and finding the oldest/newest file are cheap.
7//!
8//! ## Ownership model
9//!
10//! Writes are owned by a single writer task (see [`super::account_sink`]); this
11//! type's `&mut self` append methods are never shared. Reads ([`read_since`],
12//! [`oldest_seq`]) are stateless free functions that open files read-only, so a
13//! `/stream` replay never contends with the writer.
14//!
15//! ## Rotation & recovery
16//!
17//! A new file is opened lazily on the first append, named by that event's seq.
18//! When the current file passes [`ROTATE_THRESHOLD_BYTES`] it is closed and the
19//! next append opens a fresh file — so files never need to be reopened for
20//! append, which sidesteps torn-line-on-append entirely. On boot,
21//! [`EventJournal::open`] recovers the max seq by reading the last *complete*
22//! line of the newest file (a partial trailing line from a crash mid-write is
23//! tolerated).
24
25use std::fs::{File, OpenOptions};
26use std::io::{self, BufWriter, Write};
27use std::path::{Path, PathBuf};
28
29use super::change_feed::ChangeEvent;
30
31/// Roll to a new journal file once the current one passes this size.
32pub const ROTATE_THRESHOLD_BYTES: u64 = 8 * 1024 * 1024;
33
34const FILE_PREFIX: &str = "events-";
35const FILE_SUFFIX: &str = ".jsonl";
36const SEQ_PAD_WIDTH: usize = 20;
37
38fn file_name_for(start_seq: u64) -> String {
39    format!("{FILE_PREFIX}{start_seq:0SEQ_PAD_WIDTH$}{FILE_SUFFIX}")
40}
41
42/// Parse the start-seq encoded in a journal filename, if it matches the scheme.
43fn start_seq_from_name(name: &str) -> Option<u64> {
44    name.strip_prefix(FILE_PREFIX)?
45        .strip_suffix(FILE_SUFFIX)?
46        .parse::<u64>()
47        .ok()
48}
49
50/// List journal files in ascending seq order (by encoded start-seq).
51fn list_journal_files(dir: &Path) -> io::Result<Vec<(u64, PathBuf)>> {
52    let mut files: Vec<(u64, PathBuf)> = Vec::new();
53    let read_dir = match std::fs::read_dir(dir) {
54        Ok(rd) => rd,
55        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(files),
56        Err(e) => return Err(e),
57    };
58    for entry in read_dir {
59        let entry = entry?;
60        let name = entry.file_name();
61        let name = name.to_string_lossy();
62        if let Some(start) = start_seq_from_name(&name) {
63            files.push((start, entry.path()));
64        }
65    }
66    files.sort_by_key(|(start, _)| *start);
67    Ok(files)
68}
69
70/// The greatest `seq` present across all journal files, or 0 if empty.
71///
72/// Reads only the last file and scans it for the last complete line, tolerating
73/// a torn trailing line from a crash mid-write.
74fn recover_max_seq(dir: &Path) -> io::Result<u64> {
75    let files = list_journal_files(dir)?;
76    let Some((_, path)) = files.last() else {
77        return Ok(0);
78    };
79    let contents = std::fs::read_to_string(path)?;
80    let mut max_seq = 0u64;
81    for line in contents.lines() {
82        let line = line.trim();
83        if line.is_empty() {
84            continue;
85        }
86        // A torn final line will fail to parse; skip it (only complete lines
87        // were ever fully flushed).
88        if let Ok(ce) = serde_json::from_str::<ChangeEvent>(line) {
89            max_seq = max_seq.max(ce.seq);
90        }
91    }
92    Ok(max_seq)
93}
94
95/// Append-only writer for the change-feed journal.
96pub struct EventJournal {
97    dir: PathBuf,
98    current: Option<BufWriter<File>>,
99    bytes_written: u64,
100    rotate_threshold: u64,
101}
102
103impl EventJournal {
104    /// Open (or create) the journal directory and recover the max seq seen.
105    ///
106    /// Returns the writer plus the recovered max seq (0 for a fresh journal),
107    /// which the caller uses to seed the sequence counter.
108    pub fn open(dir: PathBuf) -> io::Result<(Self, u64)> {
109        Self::open_with_threshold(dir, ROTATE_THRESHOLD_BYTES)
110    }
111
112    /// Like [`Self::open`] but with a custom rotation threshold (bytes). A small
113    /// threshold forces one file per event, which retention/resume tests use to
114    /// build a multi-file journal deterministically.
115    pub fn open_with_threshold(dir: PathBuf, rotate_threshold: u64) -> io::Result<(Self, u64)> {
116        std::fs::create_dir_all(&dir)?;
117        let max_seq = recover_max_seq(&dir)?;
118        Ok((
119            Self {
120                dir,
121                current: None,
122                bytes_written: 0,
123                rotate_threshold,
124            },
125            max_seq,
126        ))
127    }
128
129    /// Append one change event as a JSON line, rotating first if needed.
130    ///
131    /// The line is flushed immediately: change events are low-volume, and
132    /// flushing keeps concurrent `/stream` replays able to read the latest
133    /// line without waiting on a timer.
134    pub fn append(&mut self, ce: &ChangeEvent) -> io::Result<()> {
135        if self.current.is_none() {
136            let path = self.dir.join(file_name_for(ce.seq));
137            let file = OpenOptions::new().create(true).append(true).open(&path)?;
138            self.current = Some(BufWriter::new(file));
139            self.bytes_written = 0;
140        }
141
142        let mut line = serde_json::to_string(ce).map_err(io::Error::other)?;
143        line.push('\n');
144
145        let writer = self
146            .current
147            .as_mut()
148            .expect("current writer set above when None");
149        writer.write_all(line.as_bytes())?;
150        writer.flush()?;
151        self.bytes_written += line.len() as u64;
152
153        if self.bytes_written >= self.rotate_threshold {
154            // Drop the current writer; the next append opens a fresh file named
155            // by that event's seq.
156            self.current = None;
157        }
158        Ok(())
159    }
160}
161
162/// Read all change events with `seq > since`, in ascending seq order.
163///
164/// Scans the journal files whose range may contain matching events. Used by the
165/// `/stream` resume path and by delta `/history?since`.
166pub fn read_since(dir: &Path, since: u64) -> io::Result<Vec<ChangeEvent>> {
167    let files = list_journal_files(dir)?;
168    let mut out: Vec<ChangeEvent> = Vec::new();
169    for (idx, (_start, path)) in files.iter().enumerate() {
170        // Skip a file only when the *next* file's start-seq proves this whole
171        // file precedes `since` (i.e. every event here is <= since).
172        if let Some((next_start, _)) = files.get(idx + 1) {
173            if next_start.saturating_sub(1) <= since {
174                continue;
175            }
176        }
177        let contents = std::fs::read_to_string(path)?;
178        for line in contents.lines() {
179            let line = line.trim();
180            if line.is_empty() {
181                continue;
182            }
183            if let Ok(ce) = serde_json::from_str::<ChangeEvent>(line) {
184                if ce.seq > since {
185                    out.push(ce);
186                }
187            }
188        }
189    }
190    Ok(out)
191}
192
193/// The smallest `seq` still retained in the journal, or `None` if empty.
194///
195/// Derived from the lexically-smallest filename's encoded start-seq. Used to
196/// decide whether a resuming client's cursor predates the retained window
197/// (triggering a `feed_reset`).
198pub fn oldest_seq(dir: &Path) -> io::Result<Option<u64>> {
199    let files = list_journal_files(dir)?;
200    Ok(files.first().map(|(start, _)| *start))
201}
202
203/// Delete the oldest journal files, keeping at most `max_files` newest.
204///
205/// Called at boot to bound disk usage. Change events are low-volume, so a
206/// file-count cap (each file ~[`ROTATE_THRESHOLD_BYTES`]) is a predictable
207/// proxy for a size budget. A client whose cursor falls below the new oldest
208/// retained seq is told to full-resync via a `feed_reset` on `/stream`.
209///
210/// Returns the number of files deleted.
211pub fn prune(dir: &Path, max_files: usize) -> io::Result<usize> {
212    let files = list_journal_files(dir)?;
213    if files.len() <= max_files {
214        return Ok(0);
215    }
216    let to_delete = files.len() - max_files;
217    let mut deleted = 0;
218    for (_, path) in files.iter().take(to_delete) {
219        match std::fs::remove_file(path) {
220            Ok(()) => deleted += 1,
221            Err(e) => tracing::warn!("failed to prune journal file {}: {e}", path.display()),
222        }
223    }
224    Ok(deleted)
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use bamboo_agent_core::AgentEvent;
231    use chrono::Utc;
232
233    fn ev(seq: u64) -> ChangeEvent {
234        ChangeEvent {
235            seq,
236            ts: Utc::now(),
237            session_id: Some(format!("s{seq}")),
238            event: AgentEvent::SessionDeleted {
239                session_id: format!("s{seq}"),
240            },
241        }
242    }
243
244    #[test]
245    fn round_trips_and_reads_since() {
246        let dir = tempfile::tempdir().unwrap();
247        let (mut j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
248        assert_eq!(max, 0);
249        for seq in 1..=5 {
250            j.append(&ev(seq)).unwrap();
251        }
252        let got = read_since(dir.path(), 0).unwrap();
253        assert_eq!(
254            got.iter().map(|e| e.seq).collect::<Vec<_>>(),
255            vec![1, 2, 3, 4, 5]
256        );
257
258        let tail = read_since(dir.path(), 3).unwrap();
259        assert_eq!(tail.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![4, 5]);
260
261        let none = read_since(dir.path(), 5).unwrap();
262        assert!(none.is_empty());
263    }
264
265    #[test]
266    fn recovers_max_seq_across_reopen() {
267        let dir = tempfile::tempdir().unwrap();
268        {
269            let (mut j, _) = EventJournal::open(dir.path().to_path_buf()).unwrap();
270            for seq in 1..=3 {
271                j.append(&ev(seq)).unwrap();
272            }
273        }
274        let (_j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
275        assert_eq!(max, 3);
276    }
277
278    #[test]
279    fn rotates_by_size_and_reads_across_files() {
280        let dir = tempfile::tempdir().unwrap();
281        // Tiny threshold so each append rotates.
282        let (mut j, _) = EventJournal::open_with_threshold(dir.path().to_path_buf(), 1).unwrap();
283        for seq in 1..=4 {
284            j.append(&ev(seq)).unwrap();
285        }
286        let files = list_journal_files(dir.path()).unwrap();
287        assert!(files.len() >= 2, "expected rotation into multiple files");
288        let got = read_since(dir.path(), 0).unwrap();
289        assert_eq!(
290            got.iter().map(|e| e.seq).collect::<Vec<_>>(),
291            vec![1, 2, 3, 4]
292        );
293        assert_eq!(oldest_seq(dir.path()).unwrap(), Some(1));
294    }
295
296    #[test]
297    fn prune_keeps_newest_files_and_advances_oldest() {
298        let dir = tempfile::tempdir().unwrap();
299        // One file per event.
300        let (mut j, _) = EventJournal::open_with_threshold(dir.path().to_path_buf(), 1).unwrap();
301        for seq in 1..=6 {
302            j.append(&ev(seq)).unwrap();
303        }
304        assert_eq!(list_journal_files(dir.path()).unwrap().len(), 6);
305
306        let deleted = prune(dir.path(), 2).unwrap();
307        assert_eq!(deleted, 4);
308        // Only the two newest files (seq 5, 6) remain.
309        assert_eq!(oldest_seq(dir.path()).unwrap(), Some(5));
310        let remaining = read_since(dir.path(), 0).unwrap();
311        assert_eq!(
312            remaining.iter().map(|e| e.seq).collect::<Vec<_>>(),
313            vec![5, 6]
314        );
315
316        // Pruning below the file count is a no-op.
317        assert_eq!(prune(dir.path(), 10).unwrap(), 0);
318    }
319
320    #[test]
321    fn tolerates_torn_final_line_on_recovery() {
322        let dir = tempfile::tempdir().unwrap();
323        {
324            let (mut j, _) = EventJournal::open(dir.path().to_path_buf()).unwrap();
325            for seq in 1..=3 {
326                j.append(&ev(seq)).unwrap();
327            }
328        }
329        // Append a partial (torn) line directly to the newest file.
330        let files = list_journal_files(dir.path()).unwrap();
331        let (_, path) = files.last().unwrap();
332        let mut f = OpenOptions::new().append(true).open(path).unwrap();
333        f.write_all(b"{\"seq\":4,\"ts\":\"broke").unwrap();
334        drop(f);
335
336        let (_j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
337        assert_eq!(max, 3, "torn line must be ignored");
338    }
339}