use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use actionqueue_core::ids::{RunId, TaskId};
use actionqueue_core::run::state::RunState;
use actionqueue_core::task::constraints::TaskConstraints;
use actionqueue_core::task::metadata::TaskMetadata;
use actionqueue_core::task::run_policy::RunPolicy;
use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
use actionqueue_storage::recovery::reducer::ReplayReducer;
use actionqueue_storage::recovery::replay::ReplayDriver;
use actionqueue_storage::wal::codec;
use actionqueue_storage::wal::event::{WalEvent, WalEventType};
use actionqueue_storage::wal::fs_reader::WalFsReader;
use actionqueue_storage::wal::fs_writer::{WalFsWriter, WalFsWriterInitError};
use actionqueue_storage::wal::reader::{WalReader, WalReaderError};
use actionqueue_storage::wal::tail_validation::{WalCorruption, WalCorruptionReasonCode};
use actionqueue_storage::wal::writer::WalWriter;
static TEST_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
fn temp_wal_path() -> PathBuf {
let dir = std::env::temp_dir();
let count = TEST_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let path = dir.join(format!("actionqueue_wal_strict_tail_test_{count}.tmp"));
let _ = fs::remove_file(&path);
path
}
fn test_task_spec(id: u128) -> TaskSpec {
TaskSpec::new(
TaskId::from_uuid(uuid::Uuid::from_u128(id)),
TaskPayload::with_content_type(vec![1, 2, 3], "application/octet-stream"),
RunPolicy::Once,
TaskConstraints::default(),
TaskMetadata::default(),
)
.expect("test task spec should be valid")
}
fn task_created(seq: u64, task_id: u128) -> WalEvent {
WalEvent::new(
seq,
WalEventType::TaskCreated { task_spec: test_task_spec(task_id), timestamp: 0 },
)
}
fn run_created(seq: u64, task_id: u128, run_id: u128) -> WalEvent {
let run = actionqueue_core::run::run_instance::RunInstance::new_scheduled_with_id(
RunId::from_uuid(uuid::Uuid::from_u128(run_id)),
TaskId::from_uuid(uuid::Uuid::from_u128(task_id)),
1000,
1000,
)
.expect("test run should be valid");
WalEvent::new(seq, WalEventType::RunCreated { run_instance: run })
}
fn run_state_changed(seq: u64, run_id: u128, previous: RunState, new: RunState) -> WalEvent {
WalEvent::new(
seq,
WalEventType::RunStateChanged {
run_id: RunId::from_uuid(uuid::Uuid::from_u128(run_id)),
previous_state: previous,
new_state: new,
timestamp: 2000,
},
)
}
fn append_bytes(path: &Path, bytes: &[u8]) {
let mut file =
fs::OpenOptions::new().append(true).create(true).open(path).expect("open append");
file.write_all(bytes).expect("append bytes");
file.sync_all().expect("sync bytes");
}
fn assert_writer_corruption(
path: &Path,
expected_offset: u64,
expected_reason: WalCorruptionReasonCode,
) {
let result = WalFsWriter::new(path.to_path_buf());
assert!(matches!(
result,
Err(WalFsWriterInitError::Corruption(WalCorruption { offset, reason }))
if offset == expected_offset && reason == expected_reason
));
}
fn assert_reader_corruption(
path: &Path,
expected_offset: u64,
expected_reason: WalCorruptionReasonCode,
) {
let mut reader = WalFsReader::new(path.to_path_buf()).expect("reader open");
loop {
match reader.read_next() {
Ok(Some(_)) => continue,
Ok(None) => panic!("expected corruption but reached clean EOF"),
Err(error) => {
assert!(matches!(
error,
WalReaderError::Corruption(WalCorruption { offset, reason })
if offset == expected_offset && reason == expected_reason
));
break;
}
}
}
}
fn assert_replay_corruption(
path: &Path,
expected_offset: u64,
expected_reason: WalCorruptionReasonCode,
) {
let reader = WalFsReader::new(path.to_path_buf()).expect("reader open for replay");
let mut driver = ReplayDriver::new(reader, ReplayReducer::new());
let result = driver.run();
assert!(matches!(
result,
Err(WalReaderError::Corruption(WalCorruption { offset, reason }))
if offset == expected_offset && reason == expected_reason
));
}
#[test]
fn d02_t_p1_clean_wal_bootstrap_succeeds_and_sequence_continues() {
let path = temp_wal_path();
{
let mut writer = WalFsWriter::new(path.clone()).expect("writer open");
writer.append(&task_created(1, 1000)).expect("append seq 1");
writer.append(&task_created(2, 2000)).expect("append seq 2");
writer.flush().expect("flush");
}
let mut reopened = WalFsWriter::new(path.clone()).expect("writer reopen");
reopened.append(&task_created(3, 3000)).expect("append seq 3 after bootstrap");
let _ = fs::remove_file(path);
}
#[test]
fn d02_t_p2_clean_reader_and_replay_flows_stay_green() {
let path = temp_wal_path();
{
let mut writer = WalFsWriter::new(path.clone()).expect("writer open");
writer.append(&task_created(1, 1000)).expect("append task");
writer.append(&run_created(2, 1000, 2000)).expect("append run");
writer
.append(&run_state_changed(3, 2000, RunState::Scheduled, RunState::Ready))
.expect("append transition");
}
let mut reader = WalFsReader::new(path.clone()).expect("reader open");
assert!(reader.read_next().expect("read 1").is_some());
assert!(reader.read_next().expect("read 2").is_some());
assert!(reader.read_next().expect("read 3").is_some());
assert!(reader.read_next().expect("read eof").is_none());
let replay_reader = WalFsReader::new(path.clone()).expect("reader replay open");
let mut driver = ReplayDriver::new(replay_reader, ReplayReducer::new());
assert!(driver.run().is_ok(), "clean replay should succeed");
let _ = fs::remove_file(path);
}
#[test]
fn d02_t_n1_partial_trailing_header_fails_writer_reader_and_replay() {
let path = temp_wal_path();
let first = task_created(1, 1000);
let first_len = codec::encode(&first).expect("encode should succeed").len() as u64;
{
let mut writer = WalFsWriter::new(path.clone()).expect("writer open");
writer.append(&first).expect("append first");
}
append_bytes(&path, &codec::VERSION.to_le_bytes()[..2]);
assert_writer_corruption(&path, first_len, WalCorruptionReasonCode::IncompleteHeader);
assert_reader_corruption(&path, first_len, WalCorruptionReasonCode::IncompleteHeader);
assert_replay_corruption(&path, first_len, WalCorruptionReasonCode::IncompleteHeader);
let mut reader = WalFsReader::new(path.clone()).expect("seek reader open");
let seek = reader.seek_to_sequence(1);
assert!(matches!(
seek,
Err(WalReaderError::Corruption(WalCorruption {
offset,
reason: WalCorruptionReasonCode::IncompleteHeader
})) if offset == first_len
));
let _ = fs::remove_file(path);
}
#[test]
fn d02_t_n2_partial_trailing_payload_fails_writer_reader_and_replay() {
let path = temp_wal_path();
let first = task_created(1, 1000);
let first_len = codec::encode(&first).expect("encode should succeed").len() as u64;
{
let mut writer = WalFsWriter::new(path.clone()).expect("writer open");
writer.append(&first).expect("append first");
}
let mut partial = Vec::new();
partial.extend_from_slice(&codec::VERSION.to_le_bytes());
partial.extend_from_slice(&10u32.to_le_bytes());
partial.extend_from_slice(&0u32.to_le_bytes()); partial.extend_from_slice(&[0u8; 3]); append_bytes(&path, &partial);
assert_writer_corruption(&path, first_len, WalCorruptionReasonCode::IncompletePayload);
assert_reader_corruption(&path, first_len, WalCorruptionReasonCode::IncompletePayload);
assert_replay_corruption(&path, first_len, WalCorruptionReasonCode::IncompletePayload);
let _ = fs::remove_file(path);
}
#[test]
fn d02_t_n3_decode_invalid_tail_fails_writer_reader_and_replay() {
let path = temp_wal_path();
let first = task_created(1, 1000);
let first_len = codec::encode(&first).expect("encode should succeed").len() as u64;
{
let mut writer = WalFsWriter::new(path.clone()).expect("writer open");
writer.append(&first).expect("append first");
}
let payload = b"nope";
let mut invalid = Vec::new();
invalid.extend_from_slice(&codec::VERSION.to_le_bytes());
invalid.extend_from_slice(&(payload.len() as u32).to_le_bytes());
invalid.extend_from_slice(&0xDEADBEEFu32.to_le_bytes()); invalid.extend_from_slice(payload);
append_bytes(&path, &invalid);
assert_writer_corruption(&path, first_len, WalCorruptionReasonCode::CrcMismatch);
assert_reader_corruption(&path, first_len, WalCorruptionReasonCode::CrcMismatch);
assert_replay_corruption(&path, first_len, WalCorruptionReasonCode::CrcMismatch);
let _ = fs::remove_file(path);
}
#[test]
fn d02_t_n4_corruption_offset_is_record_start_boundary() {
let path = temp_wal_path();
let first = task_created(1, 1000);
let second = task_created(2, 2000);
let expected_offset = (codec::encode(&first).expect("encode should succeed").len()
+ codec::encode(&second).expect("encode should succeed").len())
as u64;
{
let mut writer = WalFsWriter::new(path.clone()).expect("writer open");
writer.append(&first).expect("append first");
writer.append(&second).expect("append second");
}
let mut partial = Vec::new();
partial.extend_from_slice(&codec::VERSION.to_le_bytes());
partial.extend_from_slice(&9u32.to_le_bytes());
partial.extend_from_slice(&0u32.to_le_bytes()); partial.extend_from_slice(&[1u8, 2u8]); append_bytes(&path, &partial);
assert_writer_corruption(&path, expected_offset, WalCorruptionReasonCode::IncompletePayload);
assert_reader_corruption(&path, expected_offset, WalCorruptionReasonCode::IncompletePayload);
assert_replay_corruption(&path, expected_offset, WalCorruptionReasonCode::IncompletePayload);
let _ = fs::remove_file(path);
}
#[test]
fn d02_t_n5_reducer_errors_remain_distinct_from_wal_corruption_errors() {
let reducer_path = temp_wal_path();
{
let mut writer = WalFsWriter::new(reducer_path.clone()).expect("writer open reducer case");
writer.append(&task_created(1, 1000)).expect("append task");
writer.append(&run_created(2, 1000, 2000)).expect("append run");
writer
.append(&run_state_changed(3, 2000, RunState::Ready, RunState::Scheduled))
.expect("append semantically invalid transition event");
}
let reducer_reader = WalFsReader::new(reducer_path.clone()).expect("reader open reducer case");
let mut reducer_driver = ReplayDriver::new(reducer_reader, ReplayReducer::new());
assert!(matches!(reducer_driver.run(), Err(WalReaderError::ReducerError(_))));
let corruption_path = temp_wal_path();
let first = task_created(1, 3000);
let first_len = codec::encode(&first).expect("encode should succeed").len() as u64;
{
let mut writer =
WalFsWriter::new(corruption_path.clone()).expect("writer open corruption case");
writer.append(&first).expect("append first");
}
append_bytes(&corruption_path, &codec::VERSION.to_le_bytes()[..1]);
let corruption_reader =
WalFsReader::new(corruption_path.clone()).expect("reader open corruption case");
let mut corruption_driver = ReplayDriver::new(corruption_reader, ReplayReducer::new());
assert!(matches!(
corruption_driver.run(),
Err(WalReaderError::Corruption(WalCorruption {
offset,
reason: WalCorruptionReasonCode::IncompleteHeader
})) if offset == first_len
));
let _ = fs::remove_file(reducer_path);
let _ = fs::remove_file(corruption_path);
}