use super::log_payload::{LogPayloadStorage, SNAPSHOT_MAGIC, SNAPSHOT_VERSION};
use super::snapshot::crc32_hash;
use super::traits::PayloadStorage;
use serde_json::json;
use std::fs;
use tempfile::TempDir;
fn write_store_entry(buf: &mut Vec<u8>, id: u64, payload: &serde_json::Value) {
let payload_bytes = serde_json::to_vec(payload).expect("serialize");
buf.push(1u8); buf.extend_from_slice(&id.to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = payload_bytes.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&payload_bytes);
}
fn write_delete_entry(buf: &mut Vec<u8>, id: u64) {
buf.push(2u8); buf.extend_from_slice(&id.to_le_bytes());
}
fn open_storage_with_wal(wal_bytes: &[u8]) -> std::io::Result<(TempDir, LogPayloadStorage)> {
let temp = TempDir::new()?;
let log_path = temp.path().join("payloads.log");
fs::write(&log_path, wal_bytes)?;
let storage = LogPayloadStorage::new(temp.path())?;
Ok((temp, storage))
}
fn build_valid_wal(count: u64) -> Vec<u8> {
let mut buf = Vec::new();
for i in 1..=count {
write_store_entry(&mut buf, i, &json!({"id": i}));
}
buf
}
#[test]
fn test_wal_recovery_truncated_header_single_byte() {
let wal = vec![1u8]; let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => assert_eq!(storage.ids().len(), 0),
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
}
}
#[test]
fn test_wal_recovery_truncated_id_bytes() {
let mut wal = vec![1u8]; wal.extend_from_slice(&42u64.to_le_bytes()[..4]);
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => assert_eq!(storage.ids().len(), 0),
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
}
}
#[test]
fn test_wal_recovery_truncated_payload_length() {
let mut wal = vec![1u8];
wal.extend_from_slice(&1u64.to_le_bytes());
wal.extend_from_slice(&[0x10, 0x00]);
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => assert_eq!(storage.ids().len(), 0),
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
}
}
#[test]
fn test_wal_recovery_truncated_payload_data() {
let mut wal = vec![1u8];
wal.extend_from_slice(&1u64.to_le_bytes());
let claimed_len: u32 = 100;
wal.extend_from_slice(&claimed_len.to_le_bytes());
wal.extend_from_slice(&[0xABu8; 10]);
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => {
let _ = storage.ids();
}
Err(_) => { }
}
}
#[test]
fn test_wal_recovery_zero_length_payload() {
let mut wal = Vec::new();
wal.push(1u8);
wal.extend_from_slice(&1u64.to_le_bytes());
wal.extend_from_slice(&0u32.to_le_bytes());
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(storage.ids().len(), 1);
assert!(storage.ids().contains(&1));
}
#[test]
fn test_wal_recovery_multiple_valid_then_truncated() {
let mut wal = build_valid_wal(5);
wal.push(1u8); wal.extend_from_slice(&6u64.to_le_bytes()[..3]);
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => {
let ids = storage.ids();
assert!(
ids.len() >= 5,
"Expected at least 5 recovered entries, got {}",
ids.len()
);
for i in 1..=5 {
assert!(ids.contains(&i), "Missing entry {i}");
}
}
Err(_) => {
}
}
}
#[test]
fn test_wal_recovery_write_interrupted_mid_vector() {
let payload = json!({"data": "this is a longer payload for testing truncation"});
let payload_bytes = serde_json::to_vec(&payload).expect("serialize");
let half_len = payload_bytes.len() / 2;
let mut wal = Vec::new();
wal.push(1u8);
wal.extend_from_slice(&1u64.to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let full_len = payload_bytes.len() as u32;
wal.extend_from_slice(&full_len.to_le_bytes());
wal.extend_from_slice(&payload_bytes[..half_len]);
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => {
let _ = storage.ids();
}
Err(_) => { }
}
}
#[test]
fn test_wal_recovery_invalid_marker_byte() {
let mut wal = Vec::new();
wal.push(0xFF); wal.extend_from_slice(&1u64.to_le_bytes());
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => assert_eq!(storage.ids().len(), 0),
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
}
}
#[test]
fn test_wal_recovery_flipped_marker_in_second_entry() {
let mut wal = Vec::new();
write_store_entry(&mut wal, 1, &json!({"ok": true}));
wal.push(0xFE); wal.extend_from_slice(&2u64.to_le_bytes());
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => {
let ids = storage.ids();
assert!(ids.contains(&1), "First valid entry should be recovered");
}
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
}
}
}
#[test]
fn test_wal_recovery_flipped_bits_in_payload() {
let mut wal = Vec::new();
let payload = json!({"key": "value"});
let mut payload_bytes = serde_json::to_vec(&payload).expect("serialize");
for byte in &mut payload_bytes {
*byte ^= 0xFF;
}
wal.push(1u8);
wal.extend_from_slice(&1u64.to_le_bytes());
#[allow(clippy::cast_possible_truncation)]
let len = payload_bytes.len() as u32;
wal.extend_from_slice(&len.to_le_bytes());
wal.extend_from_slice(&payload_bytes);
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open — structure is valid");
assert_eq!(storage.ids().len(), 1);
let result = storage.retrieve(1);
assert!(
result.is_err() || result.unwrap().is_none(),
"Corrupted payload should fail to deserialize"
);
}
#[test]
fn test_wal_recovery_oversized_payload_length() {
let mut wal = Vec::new();
wal.push(1u8);
wal.extend_from_slice(&1u64.to_le_bytes());
wal.extend_from_slice(&u32::MAX.to_le_bytes()); wal.extend_from_slice(b"tiny");
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => {
let _ = storage.ids();
}
Err(_) => { }
}
}
#[test]
fn test_wal_recovery_all_zero_bytes() {
let wal = vec![0u8; 64];
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => assert_eq!(storage.ids().len(), 0),
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
}
}
#[test]
fn test_wal_recovery_valid_entries_then_garbage() {
let mut wal = build_valid_wal(3);
wal.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE]);
let result = open_storage_with_wal(&wal);
match result {
Ok((_dir, storage)) => {
let ids = storage.ids();
assert!(!ids.is_empty(), "Should recover at least some entries");
}
Err(_) => {
}
}
}
fn build_snapshot(entries: &[(u64, u64)], wal_pos: u64) -> Vec<u8> {
let entry_count = entries.len() as u64;
let mut buf = Vec::new();
buf.extend_from_slice(SNAPSHOT_MAGIC);
buf.push(SNAPSHOT_VERSION);
buf.extend_from_slice(&wal_pos.to_le_bytes());
buf.extend_from_slice(&entry_count.to_le_bytes());
for &(id, offset) in entries {
buf.extend_from_slice(&id.to_le_bytes());
buf.extend_from_slice(&offset.to_le_bytes());
}
let crc = crc32_hash(&buf);
buf.extend_from_slice(&crc.to_le_bytes());
buf
}
#[test]
fn test_snapshot_invalid_magic() {
let temp = TempDir::new().expect("temp dir");
fs::write(temp.path().join("payloads.log"), b"").expect("write wal");
let mut snapshot = build_snapshot(&[], 0);
snapshot[0] = b'X'; fs::write(temp.path().join("payloads.snapshot"), &snapshot).expect("write snap");
let storage = LogPayloadStorage::new(temp.path()).expect("should open via fallback");
assert_eq!(storage.ids().len(), 0);
}
#[test]
fn test_snapshot_invalid_version() {
let temp = TempDir::new().expect("temp dir");
fs::write(temp.path().join("payloads.log"), b"").expect("write wal");
let mut snapshot = build_snapshot(&[], 0);
snapshot[4] = 99; fs::write(temp.path().join("payloads.snapshot"), &snapshot).expect("write snap");
let storage = LogPayloadStorage::new(temp.path()).expect("fallback to WAL");
assert_eq!(storage.ids().len(), 0);
}
#[test]
fn test_snapshot_crc_mismatch() {
let temp = TempDir::new().expect("temp dir");
fs::write(temp.path().join("payloads.log"), b"").expect("write wal");
let mut snapshot = build_snapshot(&[(1, 9), (2, 30)], 100);
let mid = snapshot.len() / 2;
snapshot[mid] ^= 0xFF;
fs::write(temp.path().join("payloads.snapshot"), &snapshot).expect("write snap");
let storage = LogPayloadStorage::new(temp.path()).expect("fallback to WAL");
assert_eq!(storage.ids().len(), 0);
}
#[test]
fn test_snapshot_truncated() {
let temp = TempDir::new().expect("temp dir");
fs::write(temp.path().join("payloads.log"), b"").expect("write wal");
fs::write(temp.path().join("payloads.snapshot"), b"VSNP").expect("write snap");
let storage = LogPayloadStorage::new(temp.path()).expect("fallback to WAL");
assert_eq!(storage.ids().len(), 0);
}
#[test]
fn test_crash_recovery_clean_shutdown() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=10 {
storage.store(i, &json!({"id": i})).expect("store");
}
storage.flush().expect("flush");
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
assert_eq!(storage.ids().len(), 10);
for i in 1..=10 {
let val = storage.retrieve(i).expect("retrieve").expect("exists");
assert_eq!(val["id"], i);
}
}
#[test]
fn test_crash_recovery_unclean_shutdown_no_flush() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=5 {
storage.store(i, &json!({"id": i})).expect("store");
}
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
let ids = storage.ids();
assert_eq!(ids.len(), 5, "All entries should survive (store flushes)");
}
#[test]
fn test_crash_recovery_stale_snapshot_with_wal_delta() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=50 {
storage
.store(i, &json!({"id": i, "phase": 1}))
.expect("store");
}
storage.create_snapshot().expect("snapshot");
for i in 51..=100 {
storage
.store(i, &json!({"id": i, "phase": 2}))
.expect("store");
}
storage.flush().expect("flush");
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
assert_eq!(storage.ids().len(), 100);
let v1 = storage.retrieve(25).expect("retrieve").expect("exists");
assert_eq!(v1["phase"], 1);
let v2 = storage.retrieve(75).expect("retrieve").expect("exists");
assert_eq!(v2["phase"], 2);
}
#[test]
fn test_crash_recovery_double_recovery_idempotent() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=20 {
storage.store(i, &json!({"id": i})).expect("store");
}
storage.flush().expect("flush");
}
let storage1 = LogPayloadStorage::new(temp.path()).expect("recovery 1");
let ids1: Vec<u64> = {
let mut ids = storage1.ids();
ids.sort_unstable();
ids
};
drop(storage1);
let storage2 = LogPayloadStorage::new(temp.path()).expect("recovery 2");
let ids2: Vec<u64> = {
let mut ids = storage2.ids();
ids.sort_unstable();
ids
};
assert_eq!(ids1, ids2, "Double recovery must produce identical state");
}
#[test]
fn test_crash_recovery_empty_wal_with_snapshot() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=10 {
storage.store(i, &json!({"id": i})).expect("store");
}
storage.create_snapshot().expect("snapshot");
}
let wal_path = temp.path().join("payloads.log");
fs::write(&wal_path, b"").expect("truncate wal");
let result = LogPayloadStorage::new(temp.path());
match result {
Ok(storage) => {
let ids = storage.ids();
assert_eq!(ids.len(), 10, "Snapshot data should be loaded");
}
Err(_) => {
}
}
}
#[test]
fn test_crash_recovery_no_snapshot_no_wal() {
let temp = TempDir::new().expect("temp dir");
let storage = LogPayloadStorage::new(temp.path()).expect("fresh open");
assert_eq!(storage.ids().len(), 0);
}
#[test]
fn test_crash_recovery_snapshot_with_deletes_in_delta() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=20 {
storage.store(i, &json!({"id": i})).expect("store");
}
storage.create_snapshot().expect("snapshot");
for i in 1..=10 {
storage.delete(i).expect("delete");
}
storage.flush().expect("flush");
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
let ids = storage.ids();
assert_eq!(ids.len(), 10, "Only 10 entries should remain after deletes");
for i in 1..=10 {
assert!(!ids.contains(&i), "Entry {i} should be deleted");
}
for i in 11..=20 {
assert!(ids.contains(&i), "Entry {i} should still exist");
}
}
#[test]
fn test_crash_recovery_wal_with_store_then_delete_same_id() {
let mut wal = Vec::new();
write_store_entry(&mut wal, 42, &json!({"data": "created"}));
write_delete_entry(&mut wal, 42);
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(storage.ids().len(), 0, "Deleted entry should not appear");
assert!(
storage.retrieve(42).expect("retrieve").is_none(),
"Entry 42 should not exist after delete"
);
}
#[test]
fn test_crash_recovery_wal_store_delete_re_store() {
let mut wal = Vec::new();
write_store_entry(&mut wal, 1, &json!({"version": 1}));
write_delete_entry(&mut wal, 1);
write_store_entry(&mut wal, 1, &json!({"version": 2}));
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(storage.ids().len(), 1);
let val = storage.retrieve(1).expect("retrieve").expect("exists");
assert_eq!(val["version"], 2, "Should have version 2 after re-store");
}
fn write_crc_store_entry(buf: &mut Vec<u8>, id: u64, payload: &serde_json::Value) {
let payload_bytes = serde_json::to_vec(payload).expect("serialize");
#[allow(clippy::cast_possible_truncation)]
let len = payload_bytes.len() as u32;
let mut crc_input = Vec::with_capacity(1 + 8 + 4 + payload_bytes.len());
crc_input.push(0xC3);
crc_input.extend_from_slice(&id.to_le_bytes());
crc_input.extend_from_slice(&len.to_le_bytes());
crc_input.extend_from_slice(&payload_bytes);
let crc = crc32_hash(&crc_input);
buf.extend_from_slice(&crc_input);
buf.extend_from_slice(&crc.to_le_bytes());
}
fn write_crc_delete_entry(buf: &mut Vec<u8>, id: u64) {
let mut crc_input = [0u8; 1 + 8];
crc_input[0] = 0xC4;
crc_input[1..9].copy_from_slice(&id.to_le_bytes());
let crc = crc32_hash(&crc_input);
buf.extend_from_slice(&crc_input);
buf.extend_from_slice(&crc.to_le_bytes());
}
#[test]
fn test_crc_store_and_retrieve_round_trip() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
storage
.store(1, &json!({"crc": "protected"}))
.expect("store");
storage.store(2, &json!({"also": "safe"})).expect("store");
storage.flush().expect("flush");
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
assert_eq!(storage.ids().len(), 2);
let v1 = storage.retrieve(1).expect("retrieve").expect("exists");
assert_eq!(v1["crc"], "protected");
let v2 = storage.retrieve(2).expect("retrieve").expect("exists");
assert_eq!(v2["also"], "safe");
}
#[test]
fn test_crc_delete_round_trip() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
storage.store(1, &json!({"keep": true})).expect("store");
storage.store(2, &json!({"remove": true})).expect("store");
storage.delete(2).expect("delete");
storage.flush().expect("flush");
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
assert_eq!(storage.ids().len(), 1);
assert!(storage.retrieve(1).expect("retrieve").is_some());
assert!(storage.retrieve(2).expect("retrieve").is_none());
}
#[test]
fn test_crc_store_corrupted_payload_skipped() {
let mut wal = Vec::new();
write_crc_store_entry(&mut wal, 1, &json!({"good": true}));
let corrupt_start = wal.len();
write_crc_store_entry(&mut wal, 2, &json!({"will_corrupt": true}));
wal[corrupt_start + 13] ^= 0xFF;
write_crc_store_entry(&mut wal, 3, &json!({"after_corrupt": true}));
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
let ids = storage.ids();
assert!(ids.contains(&1), "Entry 1 should survive (valid CRC)");
assert!(!ids.contains(&2), "Entry 2 should be skipped (corrupt CRC)");
assert!(ids.contains(&3), "Entry 3 should survive (valid CRC)");
}
#[test]
fn test_crc_store_corrupted_crc_field_skipped() {
let mut wal = Vec::new();
write_crc_store_entry(&mut wal, 1, &json!({"test": "data"}));
let last_idx = wal.len() - 1;
wal[last_idx] ^= 0xFF;
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(
storage.ids().len(),
0,
"Corrupted CRC entry should be skipped"
);
}
#[test]
fn test_crc_delete_corrupted_skipped() {
let mut wal = Vec::new();
write_store_entry(&mut wal, 1, &json!({"keep_me": true}));
let delete_start = wal.len();
write_crc_delete_entry(&mut wal, 1);
wal[delete_start + 9] ^= 0xFF;
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert!(
storage.ids().contains(&1),
"Entry 1 should survive — corrupted delete was skipped"
);
}
#[test]
fn test_mixed_legacy_and_crc_entries() {
let mut wal = Vec::new();
write_store_entry(&mut wal, 1, &json!({"format": "legacy"}));
write_store_entry(&mut wal, 2, &json!({"format": "legacy"}));
write_crc_store_entry(&mut wal, 3, &json!({"format": "crc"}));
write_crc_store_entry(&mut wal, 4, &json!({"format": "crc"}));
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(storage.ids().len(), 4);
for i in 1..=4 {
assert!(
storage.retrieve(i).expect("retrieve").is_some(),
"Entry {i} should exist"
);
}
}
#[test]
fn test_mixed_legacy_store_crc_delete() {
let mut wal = Vec::new();
write_store_entry(&mut wal, 1, &json!({"legacy": true}));
write_crc_delete_entry(&mut wal, 1);
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(
storage.ids().len(),
0,
"CRC delete should remove legacy entry"
);
}
#[test]
fn test_crc_store_legacy_delete() {
let mut wal = Vec::new();
write_crc_store_entry(&mut wal, 1, &json!({"crc": true}));
write_delete_entry(&mut wal, 1);
let (_dir, storage) = open_storage_with_wal(&wal).expect("should open");
assert_eq!(
storage.ids().len(),
0,
"Legacy delete should remove CRC-stored entry"
);
}
#[test]
fn test_crc_entries_survive_snapshot_cycle() {
let temp = TempDir::new().expect("temp dir");
{
let mut storage = LogPayloadStorage::new(temp.path()).expect("create");
for i in 1..=20 {
storage
.store(i, &json!({"id": i, "crc": true}))
.expect("store");
}
storage.create_snapshot().expect("snapshot");
}
let storage = LogPayloadStorage::new(temp.path()).expect("reopen");
assert_eq!(storage.ids().len(), 20);
let val = storage.retrieve(10).expect("retrieve").expect("exists");
assert_eq!(val["id"], 10);
}