use std::fs;
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use actionqueue_core::ids::TaskId;
use actionqueue_core::task::constraints::TaskConstraints;
use actionqueue_core::task::metadata::TaskMetadata;
use actionqueue_core::task::run_policy::RunPolicy;
use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
use actionqueue_storage::wal::codec;
use actionqueue_storage::wal::event::{WalEvent, WalEventType};
use actionqueue_storage::wal::fs_reader::WalFsReader;
use actionqueue_storage::wal::reader::WalReader;
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn temp_wal_path() -> std::path::PathBuf {
let dir = std::env::temp_dir();
let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let path =
dir.join(format!("actionqueue_wal_streaming_test_{}_{}.tmp", std::process::id(), count));
let _ = fs::remove_file(&path);
path
}
fn create_test_event(seq: u64) -> WalEvent {
WalEvent::new(
seq,
WalEventType::TaskCreated {
task_spec: TaskSpec::new(
TaskId::new(),
TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
RunPolicy::Once,
TaskConstraints::default(),
TaskMetadata::default(),
)
.expect("test task spec should be valid"),
timestamp: seq * 1000,
},
)
}
#[test]
fn reads_many_events_correctly() {
let path = temp_wal_path();
let event_count = 1000;
{
let mut file = fs::File::create(&path).expect("create file");
for seq in 1..=event_count {
let event = create_test_event(seq);
let bytes = codec::encode(&event).expect("encode should succeed");
file.write_all(&bytes).expect("write should succeed");
}
file.flush().expect("flush should succeed");
}
let mut reader = WalFsReader::new(path.clone()).expect("reader should open");
let mut read_count = 0u64;
while let Some(event) = reader.read_next().expect("read should succeed") {
read_count += 1;
assert_eq!(event.sequence(), read_count, "event sequence mismatch at event {read_count}");
}
assert_eq!(read_count, event_count, "should read exactly {event_count} events");
assert!(reader.is_end());
let _ = fs::remove_file(path);
}
#[test]
fn handles_truncated_header_at_end() {
let path = temp_wal_path();
{
let event = create_test_event(1);
let bytes = codec::encode(&event).expect("encode should succeed");
let mut file = fs::File::create(&path).expect("create file");
file.write_all(&bytes).expect("write event");
file.write_all(&[0u8; 5]).expect("write partial header");
file.flush().expect("flush");
}
let mut reader = WalFsReader::new(path.clone()).expect("reader should open");
let event = reader.read_next().expect("first read should succeed");
assert!(event.is_some());
assert_eq!(event.unwrap().sequence(), 1);
let result = reader.read_next();
assert!(result.is_err(), "partial header should be corruption");
assert!(reader.is_end());
let _ = fs::remove_file(path);
}
#[test]
fn handles_truncated_payload_at_end() {
let path = temp_wal_path();
{
let event = create_test_event(1);
let bytes = codec::encode(&event).expect("encode should succeed");
let mut file = fs::File::create(&path).expect("create file");
file.write_all(&bytes).expect("write event");
file.write_all(&codec::VERSION.to_le_bytes()).expect("write version");
file.write_all(&50u32.to_le_bytes()).expect("write length");
file.write_all(&0u32.to_le_bytes()).expect("write CRC");
file.write_all(&[0xABu8; 10]).expect("write partial payload");
file.flush().expect("flush");
}
let mut reader = WalFsReader::new(path.clone()).expect("reader should open");
let event = reader.read_next().expect("first read should succeed");
assert!(event.is_some());
assert_eq!(event.unwrap().sequence(), 1);
let result = reader.read_next();
assert!(result.is_err(), "truncated payload should be corruption");
assert!(reader.is_end());
let _ = fs::remove_file(path);
}
#[test]
fn reports_correct_corruption_offset_for_partial_record() {
let path = temp_wal_path();
let event1 = create_test_event(1);
let event2 = create_test_event(2);
let bytes1 = codec::encode(&event1).expect("encode 1");
let bytes2 = codec::encode(&event2).expect("encode 2");
let expected_corruption_offset = (bytes1.len() + bytes2.len()) as u64;
{
let mut file = fs::File::create(&path).expect("create file");
file.write_all(&bytes1).expect("write 1");
file.write_all(&bytes2).expect("write 2");
file.write_all(&[0xFFu8; 3]).expect("write garbage");
file.flush().expect("flush");
}
let mut reader = WalFsReader::new(path.clone()).expect("reader should open");
let e1 = reader.read_next().expect("read 1").expect("should have event 1");
assert_eq!(e1.sequence(), 1);
let e2 = reader.read_next().expect("read 2").expect("should have event 2");
assert_eq!(e2.sequence(), 2);
match reader.read_next() {
Err(actionqueue_storage::wal::reader::WalReaderError::Corruption(details)) => {
assert_eq!(
details.offset, expected_corruption_offset,
"corruption offset should point to start of trailing garbage"
);
}
other => panic!("expected corruption error, got: {other:?}"),
}
let _ = fs::remove_file(path);
}
#[test]
fn empty_wal_returns_none_immediately() {
let path = temp_wal_path();
fs::write(&path, []).expect("create empty file");
let mut reader = WalFsReader::new(path.clone()).expect("reader should open");
let result = reader.read_next().expect("read should succeed");
assert!(result.is_none(), "empty WAL should return None");
assert!(reader.is_end());
let _ = fs::remove_file(path);
}