use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Write};
use std::path::{Path, PathBuf};
use super::change_feed::ChangeEvent;
pub const ROTATE_THRESHOLD_BYTES: u64 = 8 * 1024 * 1024;
const FILE_PREFIX: &str = "events-";
const FILE_SUFFIX: &str = ".jsonl";
const SEQ_PAD_WIDTH: usize = 20;
fn file_name_for(start_seq: u64) -> String {
format!("{FILE_PREFIX}{start_seq:0SEQ_PAD_WIDTH$}{FILE_SUFFIX}")
}
fn start_seq_from_name(name: &str) -> Option<u64> {
name.strip_prefix(FILE_PREFIX)?
.strip_suffix(FILE_SUFFIX)?
.parse::<u64>()
.ok()
}
fn list_journal_files(dir: &Path) -> io::Result<Vec<(u64, PathBuf)>> {
let mut files: Vec<(u64, PathBuf)> = Vec::new();
let read_dir = match std::fs::read_dir(dir) {
Ok(rd) => rd,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(files),
Err(e) => return Err(e),
};
for entry in read_dir {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(start) = start_seq_from_name(&name) {
files.push((start, entry.path()));
}
}
files.sort_by_key(|(start, _)| *start);
Ok(files)
}
fn recover_max_seq(dir: &Path) -> io::Result<u64> {
let files = list_journal_files(dir)?;
let Some((_, path)) = files.last() else {
return Ok(0);
};
let contents = std::fs::read_to_string(path)?;
let mut max_seq = 0u64;
for line in contents.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(ce) = serde_json::from_str::<ChangeEvent>(line) {
max_seq = max_seq.max(ce.seq);
}
}
Ok(max_seq)
}
pub struct EventJournal {
dir: PathBuf,
current: Option<BufWriter<File>>,
bytes_written: u64,
rotate_threshold: u64,
}
impl EventJournal {
pub fn open(dir: PathBuf) -> io::Result<(Self, u64)> {
Self::open_with_threshold(dir, ROTATE_THRESHOLD_BYTES)
}
pub fn open_with_threshold(dir: PathBuf, rotate_threshold: u64) -> io::Result<(Self, u64)> {
std::fs::create_dir_all(&dir)?;
let max_seq = recover_max_seq(&dir)?;
Ok((
Self {
dir,
current: None,
bytes_written: 0,
rotate_threshold,
},
max_seq,
))
}
pub fn append(&mut self, ce: &ChangeEvent) -> io::Result<()> {
if self.current.is_none() {
let path = self.dir.join(file_name_for(ce.seq));
let file = OpenOptions::new().create(true).append(true).open(&path)?;
self.current = Some(BufWriter::new(file));
self.bytes_written = 0;
}
let mut line = serde_json::to_string(ce).map_err(io::Error::other)?;
line.push('\n');
let writer = self
.current
.as_mut()
.expect("current writer set above when None");
writer.write_all(line.as_bytes())?;
writer.flush()?;
self.bytes_written += line.len() as u64;
if self.bytes_written >= self.rotate_threshold {
self.current = None;
}
Ok(())
}
}
pub fn read_since(dir: &Path, since: u64) -> io::Result<Vec<ChangeEvent>> {
let files = list_journal_files(dir)?;
let mut out: Vec<ChangeEvent> = Vec::new();
for (idx, (_start, path)) in files.iter().enumerate() {
if let Some((next_start, _)) = files.get(idx + 1) {
if next_start.saturating_sub(1) <= since {
continue;
}
}
let contents = std::fs::read_to_string(path)?;
for line in contents.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(ce) = serde_json::from_str::<ChangeEvent>(line) {
if ce.seq > since {
out.push(ce);
}
}
}
}
Ok(out)
}
pub fn oldest_seq(dir: &Path) -> io::Result<Option<u64>> {
let files = list_journal_files(dir)?;
Ok(files.first().map(|(start, _)| *start))
}
pub fn prune(dir: &Path, max_files: usize) -> io::Result<usize> {
let files = list_journal_files(dir)?;
if files.len() <= max_files {
return Ok(0);
}
let to_delete = files.len() - max_files;
let mut deleted = 0;
for (_, path) in files.iter().take(to_delete) {
match std::fs::remove_file(path) {
Ok(()) => deleted += 1,
Err(e) => tracing::warn!("failed to prune journal file {}: {e}", path.display()),
}
}
Ok(deleted)
}
#[cfg(test)]
mod tests {
use super::*;
use bamboo_agent_core::AgentEvent;
use chrono::Utc;
fn ev(seq: u64) -> ChangeEvent {
ChangeEvent {
seq,
ts: Utc::now(),
session_id: Some(format!("s{seq}")),
event: AgentEvent::SessionDeleted {
session_id: format!("s{seq}"),
},
}
}
#[test]
fn round_trips_and_reads_since() {
let dir = tempfile::tempdir().unwrap();
let (mut j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
assert_eq!(max, 0);
for seq in 1..=5 {
j.append(&ev(seq)).unwrap();
}
let got = read_since(dir.path(), 0).unwrap();
assert_eq!(got.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![1, 2, 3, 4, 5]);
let tail = read_since(dir.path(), 3).unwrap();
assert_eq!(tail.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![4, 5]);
let none = read_since(dir.path(), 5).unwrap();
assert!(none.is_empty());
}
#[test]
fn recovers_max_seq_across_reopen() {
let dir = tempfile::tempdir().unwrap();
{
let (mut j, _) = EventJournal::open(dir.path().to_path_buf()).unwrap();
for seq in 1..=3 {
j.append(&ev(seq)).unwrap();
}
}
let (_j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
assert_eq!(max, 3);
}
#[test]
fn rotates_by_size_and_reads_across_files() {
let dir = tempfile::tempdir().unwrap();
let (mut j, _) = EventJournal::open_with_threshold(dir.path().to_path_buf(), 1).unwrap();
for seq in 1..=4 {
j.append(&ev(seq)).unwrap();
}
let files = list_journal_files(dir.path()).unwrap();
assert!(files.len() >= 2, "expected rotation into multiple files");
let got = read_since(dir.path(), 0).unwrap();
assert_eq!(got.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![1, 2, 3, 4]);
assert_eq!(oldest_seq(dir.path()).unwrap(), Some(1));
}
#[test]
fn prune_keeps_newest_files_and_advances_oldest() {
let dir = tempfile::tempdir().unwrap();
let (mut j, _) = EventJournal::open_with_threshold(dir.path().to_path_buf(), 1).unwrap();
for seq in 1..=6 {
j.append(&ev(seq)).unwrap();
}
assert_eq!(list_journal_files(dir.path()).unwrap().len(), 6);
let deleted = prune(dir.path(), 2).unwrap();
assert_eq!(deleted, 4);
assert_eq!(oldest_seq(dir.path()).unwrap(), Some(5));
let remaining = read_since(dir.path(), 0).unwrap();
assert_eq!(remaining.iter().map(|e| e.seq).collect::<Vec<_>>(), vec![5, 6]);
assert_eq!(prune(dir.path(), 10).unwrap(), 0);
}
#[test]
fn tolerates_torn_final_line_on_recovery() {
let dir = tempfile::tempdir().unwrap();
{
let (mut j, _) = EventJournal::open(dir.path().to_path_buf()).unwrap();
for seq in 1..=3 {
j.append(&ev(seq)).unwrap();
}
}
let files = list_journal_files(dir.path()).unwrap();
let (_, path) = files.last().unwrap();
let mut f = OpenOptions::new().append(true).open(path).unwrap();
f.write_all(b"{\"seq\":4,\"ts\":\"broke").unwrap();
drop(f);
let (_j, max) = EventJournal::open(dir.path().to_path_buf()).unwrap();
assert_eq!(max, 3, "torn line must be ignored");
}
}