bamboo_engine/events/
journal.rs1use std::fs::{File, OpenOptions};
26use std::io::{self, BufWriter, Write};
27use std::path::{Path, PathBuf};
28
29use super::change_feed::ChangeEvent;
30
31pub const ROTATE_THRESHOLD_BYTES: u64 = 8 * 1024 * 1024;
33
34const FILE_PREFIX: &str = "events-";
35const FILE_SUFFIX: &str = ".jsonl";
36const SEQ_PAD_WIDTH: usize = 20;
37
38fn file_name_for(start_seq: u64) -> String {
39 format!("{FILE_PREFIX}{start_seq:0SEQ_PAD_WIDTH$}{FILE_SUFFIX}")
40}
41
42fn start_seq_from_name(name: &str) -> Option<u64> {
44 name.strip_prefix(FILE_PREFIX)?
45 .strip_suffix(FILE_SUFFIX)?
46 .parse::<u64>()
47 .ok()
48}
49
50fn list_journal_files(dir: &Path) -> io::Result<Vec<(u64, PathBuf)>> {
52 let mut files: Vec<(u64, PathBuf)> = Vec::new();
53 let read_dir = match std::fs::read_dir(dir) {
54 Ok(rd) => rd,
55 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(files),
56 Err(e) => return Err(e),
57 };
58 for entry in read_dir {
59 let entry = entry?;
60 let name = entry.file_name();
61 let name = name.to_string_lossy();
62 if let Some(start) = start_seq_from_name(&name) {
63 files.push((start, entry.path()));
64 }
65 }
66 files.sort_by_key(|(start, _)| *start);
67 Ok(files)
68}
69
70fn recover_max_seq(dir: &Path) -> io::Result<u64> {
75 let files = list_journal_files(dir)?;
76 let Some((_, path)) = files.last() else {
77 return Ok(0);
78 };
79 let contents = std::fs::read_to_string(path)?;
80 let mut max_seq = 0u64;
81 for line in contents.lines() {
82 let line = line.trim();
83 if line.is_empty() {
84 continue;
85 }
86 if let Ok(ce) = serde_json::from_str::<ChangeEvent>(line) {
89 max_seq = max_seq.max(ce.seq);
90 }
91 }
92 Ok(max_seq)
93}
94
95pub struct EventJournal {
97 dir: PathBuf,
98 current: Option<BufWriter<File>>,
99 bytes_written: u64,
100 rotate_threshold: u64,
101}
102
103impl EventJournal {
104 pub fn open(dir: PathBuf) -> io::Result<(Self, u64)> {
109 Self::open_with_threshold(dir, ROTATE_THRESHOLD_BYTES)
110 }
111
112 pub fn open_with_threshold(dir: PathBuf, rotate_threshold: u64) -> io::Result<(Self, u64)> {
116 std::fs::create_dir_all(&dir)?;
117 let max_seq = recover_max_seq(&dir)?;
118 Ok((
119 Self {
120 dir,
121 current: None,
122 bytes_written: 0,
123 rotate_threshold,
124 },
125 max_seq,
126 ))
127 }
128
129 pub fn append(&mut self, ce: &ChangeEvent) -> io::Result<()> {
135 if self.current.is_none() {
136 let path = self.dir.join(file_name_for(ce.seq));
137 let file = OpenOptions::new().create(true).append(true).open(&path)?;
138 self.current = Some(BufWriter::new(file));
139 self.bytes_written = 0;
140 }
141
142 let mut line = serde_json::to_string(ce).map_err(io::Error::other)?;
143 line.push('\n');
144
145 let writer = self
146 .current
147 .as_mut()
148 .expect("current writer set above when None");
149 writer.write_all(line.as_bytes())?;
150 writer.flush()?;
151 self.bytes_written += line.len() as u64;
152
153 if self.bytes_written >= self.rotate_threshold {
154 self.current = None;
157 }
158 Ok(())
159 }
160}
161
162pub fn read_since(dir: &Path, since: u64) -> io::Result<Vec<ChangeEvent>> {
167 let files = list_journal_files(dir)?;
168 let mut out: Vec<ChangeEvent> = Vec::new();
169 for (idx, (_start, path)) in files.iter().enumerate() {
170 if let Some((next_start, _)) = files.get(idx + 1) {
173 if next_start.saturating_sub(1) <= since {
174 continue;
175 }
176 }
177 let contents = std::fs::read_to_string(path)?;
178 for line in contents.lines() {
179 let line = line.trim();
180 if line.is_empty() {
181 continue;
182 }
183 if let Ok(ce) = serde_json::from_str::<ChangeEvent>(line) {
184 if ce.seq > since {
185 out.push(ce);
186 }
187 }
188 }
189 }
190 Ok(out)
191}
192
193pub fn oldest_seq(dir: &Path) -> io::Result<Option<u64>> {
199 let files = list_journal_files(dir)?;
200 Ok(files.first().map(|(start, _)| *start))
201}
202
203pub fn prune(dir: &Path, max_files: usize) -> io::Result<usize> {
212 let files = list_journal_files(dir)?;
213 if files.len() <= max_files {
214 return Ok(0);
215 }
216 let to_delete = files.len() - max_files;
217 let mut deleted = 0;
218 for (_, path) in files.iter().take(to_delete) {
219 match std::fs::remove_file(path) {
220 Ok(()) => deleted += 1,
221 Err(e) => tracing::warn!("failed to prune journal file {}: {e}", path.display()),
222 }
223 }
224 Ok(deleted)
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use bamboo_agent_core::AgentEvent;
231 use chrono::Utc;
232
233 fn ev(seq: u64) -> ChangeEvent {
234 ChangeEvent {
235 seq,
236 ts: Utc::now(),
237 session_id: Some(format!("s{seq}")),
238 event: AgentEvent::SessionDeleted {
239 session_id: format!("s{seq}"),
240 },
241 }
242 }
243
244 #[test]
245 fn round_trips_and_reads_since() {
246 let dir = tempfile::tempdir().unwrap();
247 let (mut j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
248 assert_eq!(max, 0);
249 for seq in 1..=5 {
250 j.append(&ev(seq)).unwrap();
251 }
252 let got = read_since(dir.path(), 0).unwrap();
253 assert_eq!(
254 got.iter().map(|e| e.seq).collect::<Vec<_>>(),
255 vec![1, 2, 3, 4, 5]
256 );
257
258 let tail = read_since(dir.path(), 3).unwrap();
259 assert_eq!(tail.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![4, 5]);
260
261 let none = read_since(dir.path(), 5).unwrap();
262 assert!(none.is_empty());
263 }
264
265 #[test]
266 fn recovers_max_seq_across_reopen() {
267 let dir = tempfile::tempdir().unwrap();
268 {
269 let (mut j, _) = EventJournal::open(dir.path().to_path_buf()).unwrap();
270 for seq in 1..=3 {
271 j.append(&ev(seq)).unwrap();
272 }
273 }
274 let (_j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
275 assert_eq!(max, 3);
276 }
277
278 #[test]
279 fn rotates_by_size_and_reads_across_files() {
280 let dir = tempfile::tempdir().unwrap();
281 let (mut j, _) = EventJournal::open_with_threshold(dir.path().to_path_buf(), 1).unwrap();
283 for seq in 1..=4 {
284 j.append(&ev(seq)).unwrap();
285 }
286 let files = list_journal_files(dir.path()).unwrap();
287 assert!(files.len() >= 2, "expected rotation into multiple files");
288 let got = read_since(dir.path(), 0).unwrap();
289 assert_eq!(
290 got.iter().map(|e| e.seq).collect::<Vec<_>>(),
291 vec![1, 2, 3, 4]
292 );
293 assert_eq!(oldest_seq(dir.path()).unwrap(), Some(1));
294 }
295
296 #[test]
297 fn prune_keeps_newest_files_and_advances_oldest() {
298 let dir = tempfile::tempdir().unwrap();
299 let (mut j, _) = EventJournal::open_with_threshold(dir.path().to_path_buf(), 1).unwrap();
301 for seq in 1..=6 {
302 j.append(&ev(seq)).unwrap();
303 }
304 assert_eq!(list_journal_files(dir.path()).unwrap().len(), 6);
305
306 let deleted = prune(dir.path(), 2).unwrap();
307 assert_eq!(deleted, 4);
308 assert_eq!(oldest_seq(dir.path()).unwrap(), Some(5));
310 let remaining = read_since(dir.path(), 0).unwrap();
311 assert_eq!(
312 remaining.iter().map(|e| e.seq).collect::<Vec<_>>(),
313 vec![5, 6]
314 );
315
316 assert_eq!(prune(dir.path(), 10).unwrap(), 0);
318 }
319
320 #[test]
321 fn tolerates_torn_final_line_on_recovery() {
322 let dir = tempfile::tempdir().unwrap();
323 {
324 let (mut j, _) = EventJournal::open(dir.path().to_path_buf()).unwrap();
325 for seq in 1..=3 {
326 j.append(&ev(seq)).unwrap();
327 }
328 }
329 let files = list_journal_files(dir.path()).unwrap();
331 let (_, path) = files.last().unwrap();
332 let mut f = OpenOptions::new().append(true).open(path).unwrap();
333 f.write_all(b"{\"seq\":4,\"ts\":\"broke").unwrap();
334 drop(f);
335
336 let (_j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
337 assert_eq!(max, 3, "torn line must be ignored");
338 }
339}