use std::path::PathBuf;
use anyhow::{Context, Result};
use stackpatrol_core::event::EventEnvelope;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
const MAX_BUFFER_BYTES: u64 = 1_048_576;
pub struct Buffer {
path: PathBuf,
}
impl Buffer {
pub fn new(path: PathBuf) -> Self {
Self { path }
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub async fn append(&self, envelope: &EventEnvelope) -> Result<()> {
if let Ok(meta) = tokio::fs::metadata(&self.path).await {
if meta.len() >= MAX_BUFFER_BYTES {
eprintln!(
"buffer: cap reached ({} bytes) — dropping event {:?}",
meta.len(),
envelope.event,
);
return Ok(());
}
}
if let Some(parent) = self.path.parent() {
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("creating {}", parent.display()))?;
}
let line = serde_json::to_string(envelope).context("serializing envelope")?;
let mut f = OpenOptions::new()
.append(true)
.create(true)
.open(&self.path)
.await
.with_context(|| format!("opening {}", self.path.display()))?;
f.write_all(line.as_bytes()).await?;
f.write_all(b"\n").await?;
f.flush().await?;
Ok(())
}
pub async fn read_all(&self) -> Result<Vec<EventEnvelope>> {
let f = match File::open(&self.path).await {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => {
return Err(e)
.with_context(|| format!("opening {}", self.path.display()));
}
};
let mut lines = BufReader::new(f).lines();
let mut envelopes = Vec::new();
while let Some(line) = lines.next_line().await? {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<EventEnvelope>(&line) {
Ok(e) => envelopes.push(e),
Err(err) => {
eprintln!("buffer: skipping unparseable line: {err}");
}
}
}
Ok(envelopes)
}
pub async fn rewrite(&self, remaining: &[EventEnvelope]) -> Result<()> {
if remaining.is_empty() {
match tokio::fs::remove_file(&self.path).await {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => {
return Err(e)
.with_context(|| format!("removing {}", self.path.display()));
}
}
}
let tmp = self.path.with_extension("jsonl.tmp");
if let Some(parent) = self.path.parent() {
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("creating {}", parent.display()))?;
}
let mut f = File::create(&tmp)
.await
.with_context(|| format!("creating {}", tmp.display()))?;
for env in remaining {
let line = serde_json::to_string(env).context("serializing envelope")?;
f.write_all(line.as_bytes()).await?;
f.write_all(b"\n").await?;
}
f.flush().await?;
drop(f);
tokio::fs::rename(&tmp, &self.path)
.await
.with_context(|| format!("renaming {} to {}", tmp.display(), self.path.display()))?;
Ok(())
}
}
pub fn default_path() -> Result<PathBuf> {
let dir = dirs::data_dir().context("could not resolve data dir")?;
Ok(dir.join("stackpatrol").join("buffer.jsonl"))
}
#[cfg(test)]
mod tests {
use super::*;
use stackpatrol_core::event::Event;
use tempfile::tempdir;
fn env(name: &str, ts: i64) -> EventEnvelope {
EventEnvelope {
server_name: "test".into(),
timestamp: ts,
event: Event::ServiceDown { name: name.into() },
}
}
#[tokio::test]
async fn append_and_read_roundtrip() {
let dir = tempdir().unwrap();
let buf = Buffer::new(dir.path().join("buffer.jsonl"));
buf.append(&env("web", 1)).await.unwrap();
buf.append(&env("db", 2)).await.unwrap();
let read = buf.read_all().await.unwrap();
assert_eq!(read.len(), 2);
assert_eq!(read[0].timestamp, 1);
assert_eq!(read[1].timestamp, 2);
}
#[tokio::test]
async fn read_all_returns_empty_when_missing() {
let dir = tempdir().unwrap();
let buf = Buffer::new(dir.path().join("nope.jsonl"));
assert!(buf.read_all().await.unwrap().is_empty());
}
#[tokio::test]
async fn rewrite_with_empty_removes_file() {
let dir = tempdir().unwrap();
let buf = Buffer::new(dir.path().join("buffer.jsonl"));
buf.append(&env("web", 1)).await.unwrap();
assert!(buf.path().exists());
buf.rewrite(&[]).await.unwrap();
assert!(!buf.path().exists());
}
#[tokio::test]
async fn rewrite_keeps_only_remaining() {
let dir = tempdir().unwrap();
let buf = Buffer::new(dir.path().join("buffer.jsonl"));
buf.append(&env("a", 1)).await.unwrap();
buf.append(&env("b", 2)).await.unwrap();
buf.append(&env("c", 3)).await.unwrap();
let all = buf.read_all().await.unwrap();
let keep: Vec<_> = all.into_iter().filter(|e| e.timestamp != 2).collect();
buf.rewrite(&keep).await.unwrap();
let read = buf.read_all().await.unwrap();
assert_eq!(read.len(), 2);
assert_eq!(read[0].timestamp, 1);
assert_eq!(read[1].timestamp, 3);
}
#[tokio::test]
async fn unparseable_line_is_skipped() {
let dir = tempdir().unwrap();
let path = dir.path().join("buffer.jsonl");
tokio::fs::write(&path, b"{not json}\n{\"server_name\":\"x\",\"timestamp\":5,\"event\":{\"kind\":\"heartbeat\",\"uptime_secs\":1}}\n")
.await
.unwrap();
let buf = Buffer::new(path);
let read = buf.read_all().await.unwrap();
assert_eq!(read.len(), 1);
assert_eq!(read[0].timestamp, 5);
}
}