mod common;
use std::path::Path;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
use aa_storage_sqlite_buffer::EventBuffer;
use common::{sample_entry, CollectingSink};
use tempfile::tempdir;
const CHILD_DB_ENV: &str = "AA_SQLITE_BUFFER_KILL_CHILD_DB";
const EVENT_COUNT: u64 = 5;
#[test]
fn child_enqueue_then_block() {
let Ok(db) = std::env::var(CHILD_DB_ENV) else {
return;
};
let buffer = EventBuffer::new(&db, 100).expect("child: open buffer");
for i in 0..EVENT_COUNT {
buffer
.enqueue(&sample_entry(i, &format!("event-{i}")))
.expect("child: enqueue");
}
std::fs::write(format!("{db}.ready"), b"ready").expect("child: write ready file");
loop {
std::thread::sleep(Duration::from_secs(3600));
}
}
#[tokio::test]
async fn survives_sigkill_and_replays_in_order() {
let dir = tempdir().unwrap();
let db = dir.path().join("buffer.db");
let db_str = db.to_str().unwrap().to_string();
let ready = format!("{db_str}.ready");
let exe = std::env::current_exe().expect("locate test binary");
let mut child = Command::new(exe)
.args(["--exact", "child_enqueue_then_block"])
.env(CHILD_DB_ENV, &db_str)
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("spawn child");
let start = Instant::now();
while !Path::new(&ready).exists() {
assert!(
start.elapsed() < Duration::from_secs(30),
"child never signalled readiness"
);
if let Ok(Some(status)) = child.try_wait() {
panic!("child exited early with {status}");
}
std::thread::sleep(Duration::from_millis(20));
}
child.kill().expect("SIGKILL child");
let _ = child.wait();
let buffer = EventBuffer::new(&db, 100).unwrap();
assert_eq!(buffer.len().unwrap(), EVENT_COUNT as usize);
let sink = CollectingSink::default();
let flushed = buffer.drain_and_send(&sink).await.unwrap();
assert_eq!(flushed, EVENT_COUNT as usize);
let expected: Vec<_> = (0..EVENT_COUNT)
.map(|i| sample_entry(i, &format!("event-{i}")))
.collect();
assert_eq!(sink.entries(), expected, "events replay in FIFO order after kill -9");
}