#[cfg(test)]
mod tests {
use crate::wal::tests::helpers::*;
use crate::wal::{Wal, WalError};
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
use tempfile::TempDir;
#[test]
fn corrupted_header_checksum() {
init_tracing();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("000000.log");
let _wal: Wal<MemTableRecord> = Wal::open(&path, None).unwrap();
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
f.seek(SeekFrom::Start(2)).unwrap();
f.write_all(&[0x99]).unwrap();
f.sync_all().unwrap();
let err = Wal::<MemTableRecord>::open(&path, None).unwrap_err();
assert!(matches!(err, WalError::InvalidHeader(_)));
assert!(err.to_string().contains("header checksum mismatch"));
}
#[test]
fn corrupted_record_length() {
init_tracing();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("000000.log");
let wal = Wal::open(&path, None).unwrap();
let record = MemTableRecord {
key: b"a".to_vec(),
value: Some(b"v1".to_vec()),
timestamp: 1,
deleted: false,
};
wal.append(&record).unwrap();
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
f.seek(SeekFrom::Start((WAL_HDR_SIZE + WAL_CRC32_SIZE) as u64))
.unwrap();
f.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
f.sync_all().unwrap();
let err = collect_iter(&wal).unwrap_err();
assert!(matches!(err, WalError::RecordTooLarge(_)));
}
#[test]
fn corrupted_record_data_checksum() {
init_tracing();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("000000.log");
let wal = Wal::open(&path, None).unwrap();
let record = ManifestRecord {
id: 999,
path: "/db/table-999".to_string(),
creation_timestamp: 9999,
};
wal.append(&record).unwrap();
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
f.seek(SeekFrom::End(-3)).unwrap(); f.write_all(&[0xAA, 0xBB, 0xCC]).unwrap();
f.sync_all().unwrap();
let err = collect_iter(&wal).unwrap_err();
assert!(matches!(err, WalError::ChecksumMismatch));
}
#[test]
fn corrupted_record_data() {
init_tracing();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("000000.log");
let wal = Wal::open(&path, None).unwrap();
let insert = vec![
MemTableRecord {
key: b"a".to_vec(),
value: Some(b"v1".to_vec()),
timestamp: 1,
deleted: false,
},
MemTableRecord {
key: b"b".to_vec(),
value: None,
timestamp: 2,
deleted: true,
},
];
for record in &insert {
wal.append(record).unwrap();
}
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let corrupt_offset = (WAL_HDR_SIZE + WAL_CRC32_SIZE + 5) as u64;
f.seek(SeekFrom::Start(corrupt_offset)).unwrap();
f.write_all(&[0xFF, 0x00, 0xEE]).unwrap();
f.sync_all().unwrap();
let err = collect_iter(&wal).unwrap_err();
assert!(matches!(err, WalError::ChecksumMismatch));
}
#[test]
fn partial_replay_after_last_record_corrupted() {
init_tracing();
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("000000.log");
let wal = Wal::open(&path, None).unwrap();
let records = vec![
ManifestRecord {
id: 100,
path: "/db/table-100".to_string(),
creation_timestamp: 1000,
},
ManifestRecord {
id: 101,
path: "/db/table-101".to_string(),
creation_timestamp: 1001,
},
ManifestRecord {
id: 102,
path: "/db/table-102".to_string(),
creation_timestamp: 1002,
},
];
for record in &records {
wal.append(record).unwrap();
}
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
f.seek(SeekFrom::End(-2)).unwrap(); f.write_all(&[0x99, 0x77]).unwrap();
f.sync_all().unwrap();
let iter = wal.replay_iter().unwrap();
let mut replayed = vec![];
for res in iter {
match res {
Ok(record) => replayed.push(record),
Err(WalError::ChecksumMismatch) => break,
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
assert_eq!(replayed.len(), 2, "Only first two records should be valid");
assert_eq!(replayed[0].path, "/db/table-100".to_string());
assert_eq!(replayed[1].path, "/db/table-101".to_string());
}
}