pub type Lsn = u64;
pub use config::WalConfig;
mod config;
pub(super) fn crc32(data: &[u8]) -> u32 {
let mut crc: u32 = 0xFFFFFFFF;
for byte in data {
crc ^= *byte as u32;
for _ in 0..8 {
if crc & 1 != 0 {
crc = (crc >> 1) ^ 0xEDB88320;
} else {
crc >>= 1;
}
}
}
!crc
}
pub use codec::{WalRecord, WalRecordType};
mod codec;
pub use error::WalError;
mod error;
pub use header::{RankRegime, WalHeader};
mod header;
pub use writer::WalWriter;
mod writer;
pub use reader::{WalReader, WalRecordIterator};
mod reader;
pub use async_config::AsyncWalConfig;
mod async_config;
pub use pending_segment::PendingSegment;
mod pending_segment;
pub use async_error::AsyncWalError;
mod async_error;
pub use sync_handle::SyncHandle;
mod sync_handle;
#[cfg(feature = "io-uring-backend")]
pub use sync_backend::IoUringFsync;
pub use sync_backend::{StdFsync, WalSyncBackend};
mod sync_backend;
pub use async_writer::{collect_all_segments, AsyncWalWriter, SegmentSyncManager};
mod async_writer;
#[cfg(test)]
mod tests {
use super::*;
use std::io;
use std::path::PathBuf;
use std::time::Duration;
use tempfile::tempdir;
#[test]
fn test_crc32() {
let data = b"hello world";
let crc = crc32(data);
assert_eq!(crc, 0x0D4A1185); }
#[test]
fn test_wal_record_serialize_deserialize() {
let record = WalRecord::Insert {
term: b"hello".to_vec(),
value: Some(b"world".to_vec()),
};
let payload = record.serialize_payload();
let deserialized =
WalRecord::deserialize(WalRecordType::Insert, &payload).expect("deserialize failed");
assert_eq!(record, deserialized);
}
#[test]
fn test_wal_record_remove() {
let record = WalRecord::Remove {
term: b"goodbye".to_vec(),
};
let payload = record.serialize_payload();
let deserialized =
WalRecord::deserialize(WalRecordType::Remove, &payload).expect("deserialize failed");
assert_eq!(record, deserialized);
}
#[test]
fn test_wal_create_and_append() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let wal = WalWriter::create(&wal_path).expect("create WAL");
let lsn1 = wal
.append(WalRecord::Insert {
term: b"hello".to_vec(),
value: None,
})
.expect("append");
let lsn2 = wal
.append(WalRecord::Insert {
term: b"world".to_vec(),
value: Some(b"value".to_vec()),
})
.expect("append");
assert_eq!(lsn1, 1);
assert_eq!(lsn2, 2);
wal.sync().expect("sync");
}
#[test]
fn test_wal_read_records() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"hello".to_vec(),
value: None,
})
.expect("append");
wal.append(WalRecord::Remove {
term: b"world".to_vec(),
})
.expect("append");
wal.sync().expect("sync");
}
let reader = WalReader::new(&wal_path).expect("open WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 2);
let (lsn1, rec1) = records[0].as_ref().expect("record 1");
assert_eq!(*lsn1, 1);
assert!(matches!(rec1, WalRecord::Insert { term, .. } if term == b"hello"));
let (lsn2, rec2) = records[1].as_ref().expect("record 2");
assert_eq!(*lsn2, 2);
assert!(matches!(rec2, WalRecord::Remove { term } if term == b"world"));
}
#[test]
fn test_wal_checkpoint() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
wal.checkpoint(1).expect("checkpoint");
}
let header = WalReader::read_header(&wal_path).expect("read header");
assert_eq!(header.checkpoint_lsn, 1);
}
#[test]
fn test_wal_reopen() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"first".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
}
{
let wal = WalWriter::open(&wal_path).expect("open WAL");
assert_eq!(wal.current_lsn(), 2); wal.append(WalRecord::Insert {
term: b"second".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
}
let reader = WalReader::new(&wal_path).expect("open WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 2);
}
#[test]
fn test_wal_truncate() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"first".to_vec(),
value: None,
})
.expect("append");
wal.append(WalRecord::Insert {
term: b"second".to_vec(),
value: None,
})
.expect("append");
wal.checkpoint(2).expect("checkpoint");
wal.sync().expect("sync");
assert_eq!(wal.current_lsn(), 4);
wal.truncate().expect("truncate");
assert_eq!(wal.current_lsn(), 1);
assert_eq!(wal.synced_lsn(), 0);
assert_eq!(wal.checkpoint_lsn(), 0);
}
let reader = WalReader::new(&wal_path).expect("open WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 0, "WAL should be empty after truncate");
{
let wal = WalWriter::open(&wal_path).expect("open WAL");
assert_eq!(wal.current_lsn(), 1);
let lsn = wal
.append(WalRecord::Insert {
term: b"new_record".to_vec(),
value: None,
})
.expect("append after truncate");
assert_eq!(lsn, 1);
wal.sync().expect("sync");
}
let reader = WalReader::new(&wal_path).expect("open WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 1);
let (lsn, rec) = records[0].as_ref().expect("record");
assert_eq!(*lsn, 1);
assert!(matches!(rec, WalRecord::Insert { term, .. } if term == b"new_record"));
}
#[test]
fn test_wal_archive_rotation() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let archive_dir = dir.path().join("wal_archive");
let config = WalConfig {
archive_enabled: true,
archive_dir: archive_dir.clone(),
max_segments: 10,
max_archive_bytes: 10 << 30, };
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"record1".to_vec(),
value: Some(b"value1".to_vec()),
})
.expect("append");
wal.append(WalRecord::Insert {
term: b"record2".to_vec(),
value: None,
})
.expect("append");
wal.checkpoint(2).expect("checkpoint");
wal.sync().expect("sync");
let archive_path = wal.rotate_to_archive(&config).expect("rotate");
assert!(archive_path.exists(), "Archive segment should exist");
assert!(
archive_path
.extension()
.map_or(false, |ext| ext == "segment"),
"Archive should have .segment extension"
);
assert!(wal_path.exists(), "Active WAL should exist");
let reader = WalReader::new(&wal_path).expect("open active WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(
records.len(),
0,
"Active WAL should be empty after rotation"
);
let reader = WalReader::new(&archive_path).expect("open archive");
let records: Vec<_> = reader.iter().collect();
assert_eq!(
records.len(),
3,
"Archive should have 3 records (2 inserts + 1 checkpoint)"
);
}
#[test]
fn test_wal_collect_segments() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let archive_dir = dir.path().join("wal_archive");
let config = WalConfig {
archive_enabled: true,
archive_dir: archive_dir.clone(),
max_segments: 10,
max_archive_bytes: 10 << 30,
};
let wal = WalWriter::create(&wal_path).expect("create WAL");
let segments = wal.collect_wal_segments(&config).expect("collect");
assert_eq!(segments.len(), 0, "No segments when WAL is empty");
for i in 0..3 {
wal.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
wal.checkpoint(i as u64 + 1).expect("checkpoint");
wal.sync().expect("sync");
wal.rotate_to_archive(&config).expect("rotate");
std::thread::sleep(std::time::Duration::from_millis(2));
}
wal.append(WalRecord::Insert {
term: b"active_term".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
let segments = wal.collect_wal_segments(&config).expect("collect");
assert_eq!(segments.len(), 4, "Should have 3 archived + 1 active");
for i in 0..3 {
let ext = segments[i].extension().unwrap_or_default();
assert_eq!(ext, "segment", "Archived segments should come first");
}
assert_eq!(segments[3], wal_path, "Active WAL should be last");
}
#[test]
fn test_wal_archive_pruning_by_count() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let archive_dir = dir.path().join("wal_archive");
let config = WalConfig {
archive_enabled: true,
archive_dir: archive_dir.clone(),
max_segments: 3, max_archive_bytes: u64::MAX,
};
let wal = WalWriter::create(&wal_path).expect("create WAL");
for i in 0..6 {
wal.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
let frontier = wal.current_lsn().saturating_sub(1);
wal.checkpoint(frontier).expect("checkpoint");
wal.rotate_to_archive(&config).expect("rotate");
std::thread::sleep(std::time::Duration::from_millis(2));
}
let segments: Vec<_> = std::fs::read_dir(&archive_dir)
.expect("read archive dir")
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().map_or(false, |ext| ext == "segment"))
.collect();
assert!(
segments.len() <= config.max_segments,
"Should have at most {} subsumed segments, found {}",
config.max_segments,
segments.len()
);
}
#[test]
fn test_wal_archive_disabled() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let archive_dir = dir.path().join("wal_archive");
let config = WalConfig {
archive_enabled: false, archive_dir: archive_dir.clone(),
max_segments: 10,
max_archive_bytes: 10 << 30,
};
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
let segments = wal.collect_wal_segments(&config).expect("collect");
assert_eq!(segments.len(), 1);
assert_eq!(segments[0], wal_path);
assert!(
!archive_dir.exists(),
"Archive dir should not be created when disabled"
);
}
#[test]
fn test_wal_config_default() {
let config = WalConfig::default();
assert!(config.archive_enabled);
assert_eq!(config.max_segments, 10);
assert_eq!(config.max_archive_bytes, 10 << 30); }
#[test]
fn test_batch_insert_serialize_deserialize() {
let record = WalRecord::BatchInsert { entries: vec![] };
let buf = record.serialize_payload();
let deserialized =
WalRecord::deserialize(WalRecordType::BatchInsert, &buf).expect("deserialize");
match deserialized {
WalRecord::BatchInsert { entries } => {
assert_eq!(entries.len(), 0);
}
_ => panic!("Expected BatchInsert"),
}
let entries = vec![
(b"hello".to_vec(), Some(b"world".to_vec())),
(b"foo".to_vec(), None),
(b"bar".to_vec(), Some(b"baz".to_vec())),
];
let record = WalRecord::BatchInsert {
entries: entries.clone(),
};
let buf = record.serialize_payload();
let deserialized =
WalRecord::deserialize(WalRecordType::BatchInsert, &buf).expect("deserialize");
match deserialized {
WalRecord::BatchInsert {
entries: deserialized_entries,
} => {
assert_eq!(deserialized_entries.len(), 3);
assert_eq!(deserialized_entries[0].0, b"hello");
assert_eq!(
deserialized_entries[0].1.as_ref().map(|v| v.as_slice()),
Some(b"world".as_slice())
);
assert_eq!(deserialized_entries[1].0, b"foo");
assert!(deserialized_entries[1].1.is_none());
assert_eq!(deserialized_entries[2].0, b"bar");
assert_eq!(
deserialized_entries[2].1.as_ref().map(|v| v.as_slice()),
Some(b"baz".as_slice())
);
}
_ => panic!("Expected BatchInsert"),
}
}
#[test]
fn test_wal_append_batch() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
{
let wal = WalWriter::create(&wal_path).expect("create WAL");
let entries = vec![
(b"term1".to_vec(), Some(b"value1".to_vec())),
(b"term2".to_vec(), None),
(b"term3".to_vec(), Some(b"value3".to_vec())),
];
let lsn = wal.append_batch(&entries).expect("append_batch");
assert_eq!(lsn, 1);
wal.sync().expect("sync");
}
let reader = WalReader::new(&wal_path).expect("open WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 1);
let (lsn, record) = records[0].as_ref().expect("record");
assert_eq!(*lsn, 1);
match record {
WalRecord::BatchInsert { entries } => {
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].0, b"term1");
assert_eq!(entries[1].0, b"term2");
assert_eq!(entries[2].0, b"term3");
}
_ => panic!("Expected BatchInsert"),
}
}
#[test]
fn test_wal_append_batch_empty() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("test.wal");
let wal = WalWriter::create(&wal_path).expect("create WAL");
let lsn = wal.append_batch(&[]).expect("append_batch empty");
assert_eq!(lsn, 1);
wal.sync().expect("sync");
let reader = WalReader::new(&wal_path).expect("open WAL");
let records: Vec<_> = reader.iter().collect();
assert_eq!(records.len(), 1);
let (_, record) = records[0].as_ref().expect("record");
match record {
WalRecord::BatchInsert { entries } => {
assert_eq!(entries.len(), 0);
}
_ => panic!("Expected BatchInsert"),
}
}
#[test]
fn test_batch_insert_record_type() {
let record = WalRecord::BatchInsert {
entries: vec![(b"test".to_vec(), None)],
};
assert_eq!(record.record_type(), WalRecordType::BatchInsert);
}
#[test]
fn test_open_or_create_toctou_safety() {
use std::sync::{Arc, Barrier};
use std::thread;
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("concurrent.wal");
let num_threads = 10;
let barrier = Arc::new(Barrier::new(num_threads));
let path = Arc::new(wal_path.clone());
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let barrier = Arc::clone(&barrier);
let path = Arc::clone(&path);
thread::spawn(move || {
barrier.wait();
WalWriter::open_or_create(path.as_ref())
})
})
.collect();
let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
assert!(successes >= 1, "At least one thread should succeed");
let toctou_failures = results
.iter()
.filter(|r| matches!(r, Err(WalError::NotFound) | Err(WalError::AlreadyExists)))
.count();
assert_eq!(
toctou_failures, 0,
"No threads should fail with TOCTOU-related errors (NotFound/AlreadyExists)"
);
assert!(
wal_path.exists(),
"WAL file should exist after concurrent access"
);
}
#[test]
fn test_create_exclusive_concurrent() {
use std::sync::{Arc, Barrier};
use std::thread;
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("exclusive.wal");
let num_threads = 10;
let barrier = Arc::new(Barrier::new(num_threads));
let path = Arc::new(wal_path);
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let barrier = Arc::clone(&barrier);
let path = Arc::clone(&path);
thread::spawn(move || {
barrier.wait();
WalWriter::create(path.as_ref())
})
})
.collect();
let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
let already_exists = results
.iter()
.filter(|r| matches!(r, Err(WalError::AlreadyExists)))
.count();
assert_eq!(successes, 1, "Exactly one thread should create the file");
assert_eq!(
already_exists,
num_threads - 1,
"All other threads should get AlreadyExists"
);
}
#[test]
fn test_open_handles_concurrent_delete() {
use std::sync::{Arc, Barrier};
use std::thread;
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("delete_race.wal");
let wal = WalWriter::create(&wal_path).expect("create WAL");
wal.sync().expect("sync");
drop(wal);
let barrier = Arc::new(Barrier::new(2));
let path = Arc::new(wal_path.clone());
let open_barrier = Arc::clone(&barrier);
let open_path = Arc::clone(&path);
let open_handle = thread::spawn(move || {
open_barrier.wait();
WalWriter::open(open_path.as_ref())
});
let delete_barrier = Arc::clone(&barrier);
let delete_path = Arc::clone(&path);
let delete_handle = thread::spawn(move || {
delete_barrier.wait();
std::fs::remove_file(delete_path.as_ref())
});
let open_result = open_handle.join().unwrap();
let delete_result = delete_handle.join().unwrap();
let open_valid = match &open_result {
Ok(_) => true,
Err(WalError::NotFound) => true,
Err(WalError::Io(_)) => true, Err(WalError::CorruptedRecord(_)) => true, Err(WalError::UnexpectedEof) => true, _ => false,
};
let delete_ok = delete_result.is_ok();
let delete_not_found = delete_result
.as_ref()
.err()
.map_or(false, |e| e.kind() == std::io::ErrorKind::NotFound);
assert!(
open_valid,
"Open should succeed or fail with expected error (NotFound, Io, etc.)"
);
assert!(
delete_ok || delete_not_found,
"Delete should succeed or fail with NotFound"
);
}
#[test]
fn test_open_or_create_creates_new() {
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("new.wal");
assert!(!wal_path.exists());
let wal = WalWriter::open_or_create(&wal_path).expect("open_or_create");
assert!(wal_path.exists());
let lsn = wal
.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
assert_eq!(lsn, 1);
}
#[test]
fn test_open_or_create_opens_existing() {
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("existing.wal");
{
let wal = WalWriter::create(&wal_path).expect("create");
wal.append(WalRecord::Insert {
term: b"first".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
}
let wal = WalWriter::open_or_create(&wal_path).expect("open_or_create");
assert_eq!(wal.current_lsn(), 2);
let lsn = wal
.append(WalRecord::Insert {
term: b"second".to_vec(),
value: None,
})
.expect("append");
assert_eq!(lsn, 2);
}
#[test]
fn test_create_already_exists() {
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("already_exists.wal");
let _wal = WalWriter::create(&wal_path).expect("create");
let result = WalWriter::create(&wal_path);
assert!(
matches!(result, Err(WalError::AlreadyExists)),
"Expected AlreadyExists error"
);
}
#[test]
fn test_open_not_found() {
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("nonexistent.wal");
let result = WalWriter::open(&wal_path);
assert!(
matches!(result, Err(WalError::NotFound)),
"Expected NotFound error"
);
}
#[test]
fn test_create_creates_parent_dirs() {
let temp_dir = tempdir().expect("create temp dir");
let wal_path = temp_dir.path().join("nested/dirs/test.wal");
assert!(!wal_path.parent().unwrap().exists());
let wal = WalWriter::create(&wal_path).expect("create with nested dirs");
assert!(wal_path.exists());
assert!(wal_path.parent().unwrap().exists());
let lsn = wal
.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
assert_eq!(lsn, 1);
}
#[test]
fn test_async_wal_create_and_append() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("async_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
let lsn1 = wal
.append(WalRecord::Insert {
term: b"hello".to_vec(),
value: None,
})
.expect("append");
assert_eq!(lsn1, 1);
let lsn2 = wal
.append(WalRecord::Insert {
term: b"world".to_vec(),
value: Some(b"value".to_vec()),
})
.expect("append");
assert_eq!(lsn2, 2);
assert_eq!(wal.current_lsn(), 3);
}
#[test]
fn test_async_wal_sync_blocking() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("async_sync_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
wal.append(WalRecord::Insert {
term: b"term1".to_vec(),
value: None,
})
.expect("append");
wal.append(WalRecord::Insert {
term: b"term2".to_vec(),
value: None,
})
.expect("append");
let synced = wal.sync().expect("sync");
assert_eq!(synced, 2);
assert_eq!(wal.synced_lsn(), 2);
}
#[test]
fn test_async_wal_sync_async_handle() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("async_handle_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
for i in 0..5 {
wal.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: None,
})
.expect("append");
}
let handle = wal.sync_async().expect("sync_async");
assert_eq!(handle.target_lsn(), 5);
handle.wait().expect("wait");
assert!(handle.is_synced());
assert_eq!(wal.synced_lsn(), 5);
}
#[test]
fn test_async_wal_concurrent_append_during_sync() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("concurrent_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
for i in 0..10 {
wal.append(WalRecord::Insert {
term: format!("batch1_term{}", i).into_bytes(),
value: None,
})
.expect("append");
}
let handle = wal.sync_async().expect("sync_async");
assert_eq!(handle.target_lsn(), 10);
for i in 0..5 {
let lsn = wal
.append(WalRecord::Insert {
term: format!("batch2_term{}", i).into_bytes(),
value: None,
})
.expect("append during sync");
assert_eq!(lsn, 11 + i as u64);
}
handle.wait().expect("wait");
assert!(wal.synced_lsn() >= 10);
let synced = wal.sync().expect("sync second batch");
assert!(synced >= 15);
}
#[test]
fn test_async_wal_multiple_concurrent_syncs() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("multi_sync_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
max_pending_segments: 8, ..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
let mut handles = Vec::new();
for batch in 0..3 {
for i in 0..3 {
wal.append(WalRecord::Insert {
term: format!("batch{}_term{}", batch, i).into_bytes(),
value: None,
})
.expect("append");
}
let handle = wal.sync_async().expect("sync_async");
handles.push(handle);
}
for (i, handle) in handles.into_iter().enumerate() {
handle.wait().expect("wait");
assert!(handle.target_lsn() >= ((i + 1) * 3) as u64);
}
assert!(wal.synced_lsn() >= 9);
}
#[test]
fn test_async_wal_sync_timeout() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("timeout_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
wal.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
let handle = wal.sync_async().expect("sync_async");
let completed = handle
.wait_timeout(Duration::from_secs(10))
.expect("wait_timeout");
assert!(completed, "Sync should complete within timeout");
}
#[test]
fn test_async_wal_empty_sync() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("empty_sync_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
let handle = wal.sync_async().expect("sync_async empty");
assert!(handle.is_synced()); }
#[test]
fn test_async_wal_recovery_with_pending_segments() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("recovery_test.wal");
let pending_dir = dir.path().join("wal_pending");
let archive_dir = dir.path().join("wal_archive");
let config = AsyncWalConfig {
pending_dir: pending_dir.clone(),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: archive_dir.clone(),
..Default::default()
};
{
let wal = AsyncWalWriter::create(&wal_path, config.clone(), archive_config.clone())
.expect("create async WAL");
for i in 0..10 {
wal.append(WalRecord::Insert {
term: format!("term{}", i).into_bytes(),
value: Some(format!("value{}", i).into_bytes()),
})
.expect("append");
}
wal.sync().expect("sync");
}
let segments =
collect_all_segments(&wal_path, &archive_config, &config).expect("collect segments");
assert!(!segments.is_empty(), "Should have at least one segment");
let mut total_records = 0;
for segment in &segments {
if let Ok(reader) = WalReader::new(segment) {
for result in reader.iter() {
if result.is_ok() {
total_records += 1;
}
}
}
}
assert_eq!(total_records, 10, "Should recover all 10 records");
}
#[test]
fn test_async_wal_into_sync() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("into_sync_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
wal.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
wal.sync().expect("sync");
let sync_writer = wal.into_sync().expect("into_sync");
let lsn = sync_writer
.append(WalRecord::Insert {
term: b"after_convert".to_vec(),
value: None,
})
.expect("append after convert");
assert!(lsn >= 1, "LSN should be at least 1");
}
#[test]
fn test_async_wal_backpressure() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("backpressure_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
max_pending_segments: 2, max_pending_bytes: 1024 * 1024, ..Default::default()
};
let archive_config = WalConfig {
archive_enabled: true,
archive_dir: dir.path().join("wal_archive"),
..Default::default()
};
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
for batch in 0..5 {
for i in 0..10 {
wal.append(WalRecord::Insert {
term: format!("batch{}_term{}", batch, i).into_bytes(),
value: Some(vec![0u8; 100]), })
.expect("append");
}
let handle = wal.sync_async().expect("sync_async");
handle.wait().expect("wait");
}
assert!(wal.synced_lsn() >= 50);
}
#[test]
fn test_sync_handle_debug() {
let dir = tempdir().expect("create temp dir");
let wal_path = dir.path().join("debug_test.wal");
let config = AsyncWalConfig {
pending_dir: dir.path().join("wal_pending"),
..Default::default()
};
let archive_config = WalConfig::default();
let wal =
AsyncWalWriter::create(&wal_path, config, archive_config).expect("create async WAL");
wal.append(WalRecord::Insert {
term: b"test".to_vec(),
value: None,
})
.expect("append");
let handle = wal.sync_async().expect("sync_async");
let debug_str = format!("{:?}", handle);
assert!(debug_str.contains("SyncHandle"));
assert!(debug_str.contains("target_lsn"));
}
#[test]
fn test_async_wal_config_defaults() {
let config = AsyncWalConfig::default();
assert_eq!(config.max_pending_segments, 4);
assert_eq!(config.max_pending_bytes, 256 * 1024 * 1024);
assert_eq!(config.idle_check_interval_ms, 10);
}
#[test]
fn test_async_wal_error_display() {
let wal_error = AsyncWalError::Wal(WalError::NotFound);
let display = format!("{}", wal_error);
assert!(display.contains("WAL error"));
let sync_failed = AsyncWalError::SegmentSyncFailed {
path: PathBuf::from("/test/path"),
attempts: 5,
last_error: io::Error::new(io::ErrorKind::Other, "test error"),
};
let display = format!("{}", sync_failed);
assert!(display.contains("5 attempts"));
let rotation_failed = AsyncWalError::RotationFailed {
reason: "test reason".to_string(),
source: None,
};
let display = format!("{}", rotation_failed);
assert!(display.contains("test reason"));
let timeout = AsyncWalError::SyncTimeout {
target_lsn: 100,
current_synced: 50,
timeout_ms: 1000,
};
let display = format!("{}", timeout);
assert!(display.contains("100"));
assert!(display.contains("50"));
}
#[test]
fn test_deserialize_insert_payload_too_short() {
let payload = vec![0, 0, 0]; let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
let payload = vec![0, 0, 0, 0];
let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_insert_term_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[b'a', b'b', b'c', b'd']); let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("term truncated"))
);
}
#[test]
fn test_deserialize_insert_value_length_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(1); let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("value length truncated"))
);
payload.extend_from_slice(&[0, 0]); let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("value length truncated"))
);
}
#[test]
fn test_deserialize_insert_value_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(1); payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[1, 2, 3, 4, 5]); let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("value truncated"))
);
}
#[test]
fn test_deserialize_insert_no_value_success() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(0); let result = WalRecord::deserialize(WalRecordType::Insert, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::Insert { term, value } => {
assert_eq!(term, b"hello");
assert!(value.is_none());
}
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_deserialize_remove_payload_too_short() {
let payload = vec![0, 0]; let result = WalRecord::deserialize(WalRecordType::Remove, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_remove_term_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[b'a', b'b', b'c']); let result = WalRecord::deserialize(WalRecordType::Remove, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("term truncated"))
);
}
#[test]
fn test_deserialize_checkpoint_payload_too_short() {
let payload = vec![0; 10]; let result = WalRecord::deserialize(WalRecordType::Checkpoint, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
let payload = vec![0; 15];
let result = WalRecord::deserialize(WalRecordType::Checkpoint, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_begin_tx_payload_too_short() {
let payload = vec![0; 5]; let result = WalRecord::deserialize(WalRecordType::BeginTx, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_commit_tx_payload_too_short() {
let payload = vec![0; 7]; let result = WalRecord::deserialize(WalRecordType::CommitTx, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_abort_tx_payload_too_short() {
let payload = vec![0; 3]; let result = WalRecord::deserialize(WalRecordType::AbortTx, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_increment_payload_too_short() {
let payload = vec![0; 2]; let result = WalRecord::deserialize(WalRecordType::Increment, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_increment_payload_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.extend_from_slice(&[0; 10]); let result = WalRecord::deserialize(WalRecordType::Increment, &payload);
assert!(matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("truncated")));
}
#[test]
fn test_deserialize_upsert_payload_too_short() {
let payload = vec![0; 3]; let result = WalRecord::deserialize(WalRecordType::Upsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_upsert_term_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); let result = WalRecord::deserialize(WalRecordType::Upsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("term truncated"))
);
}
#[test]
fn test_deserialize_upsert_value_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[1, 2, 3]); let result = WalRecord::deserialize(WalRecordType::Upsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("value truncated"))
);
}
#[test]
fn test_deserialize_cas_payload_too_short() {
let payload = vec![0; 2]; let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_cas_term_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("term truncated"))
);
}
#[test]
fn test_deserialize_cas_expected_length_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(1); let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("expected length truncated"))
);
}
#[test]
fn test_deserialize_cas_expected_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(1); payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[1, 2, 3]); let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("expected truncated"))
);
}
#[test]
fn test_deserialize_cas_new_value_length_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(0); let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("new_value length truncated"))
);
}
#[test]
fn test_deserialize_cas_new_value_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(0); payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[1, 2, 3, 4, 5]); let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("new_value truncated"))
);
}
#[test]
fn test_deserialize_batch_insert_payload_too_short() {
let payload = vec![0; 2]; let result = WalRecord::deserialize(WalRecordType::BatchInsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("payload too short"))
);
}
#[test]
fn test_deserialize_batch_insert_entry_term_len_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&2u32.to_le_bytes()); payload.extend_from_slice(&[0, 0]); let result = WalRecord::deserialize(WalRecordType::BatchInsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("entry 0 term_len truncated"))
);
}
#[test]
fn test_deserialize_batch_insert_entry_term_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&1u32.to_le_bytes()); payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[b'a', b'b', b'c']); let result = WalRecord::deserialize(WalRecordType::BatchInsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("entry 0 term truncated"))
);
}
#[test]
fn test_deserialize_batch_insert_entry_value_len_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&1u32.to_le_bytes()); payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(1); let result = WalRecord::deserialize(WalRecordType::BatchInsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("entry 0 value_len truncated"))
);
}
#[test]
fn test_deserialize_batch_insert_entry_value_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&1u32.to_le_bytes()); payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"hello"); payload.push(1); payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[1, 2, 3]); let result = WalRecord::deserialize(WalRecordType::BatchInsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("entry 0 value truncated"))
);
}
#[test]
fn test_deserialize_batch_insert_second_entry_truncated() {
let mut payload = Vec::new();
payload.extend_from_slice(&2u32.to_le_bytes());
payload.extend_from_slice(&3u32.to_le_bytes()); payload.extend_from_slice(b"foo"); payload.push(0);
payload.extend_from_slice(&10u32.to_le_bytes()); payload.extend_from_slice(&[b'a', b'b']);
let result = WalRecord::deserialize(WalRecordType::BatchInsert, &payload);
assert!(
matches!(result, Err(WalError::CorruptedRecord(msg)) if msg.contains("entry 1 term truncated"))
);
}
#[test]
fn test_deserialize_valid_increment() {
let mut payload = Vec::new();
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"count"); payload.extend_from_slice(&42i64.to_le_bytes()); payload.extend_from_slice(&100i64.to_le_bytes());
let result = WalRecord::deserialize(WalRecordType::Increment, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::Increment {
term,
delta,
result: res,
} => {
assert_eq!(term, b"count");
assert_eq!(delta, 42);
assert_eq!(res, 100);
}
_ => panic!("Expected Increment"),
}
}
#[test]
fn test_deserialize_valid_cas_with_expected() {
let mut payload = Vec::new();
payload.extend_from_slice(&3u32.to_le_bytes()); payload.extend_from_slice(b"key"); payload.push(1); payload.extend_from_slice(&3u32.to_le_bytes()); payload.extend_from_slice(b"old"); payload.extend_from_slice(&3u32.to_le_bytes()); payload.extend_from_slice(b"new"); payload.push(1);
let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::CompareAndSwap {
term,
expected,
new_value,
success,
} => {
assert_eq!(term, b"key");
assert_eq!(expected, Some(b"old".to_vec()));
assert_eq!(new_value, b"new");
assert!(success);
}
_ => panic!("Expected CompareAndSwap"),
}
}
#[test]
fn test_deserialize_valid_cas_without_expected() {
let mut payload = Vec::new();
payload.extend_from_slice(&3u32.to_le_bytes()); payload.extend_from_slice(b"key"); payload.push(0); payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(b"value"); payload.push(0);
let result = WalRecord::deserialize(WalRecordType::CompareAndSwap, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::CompareAndSwap {
term,
expected,
new_value,
success,
} => {
assert_eq!(term, b"key");
assert!(expected.is_none());
assert_eq!(new_value, b"value");
assert!(!success);
}
_ => panic!("Expected CompareAndSwap"),
}
}
#[test]
fn test_deserialize_valid_transaction_records() {
let payload = 12345u64.to_le_bytes().to_vec();
let result = WalRecord::deserialize(WalRecordType::BeginTx, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::BeginTx { tx_id } => assert_eq!(tx_id, 12345),
_ => panic!("Expected BeginTx"),
}
let result = WalRecord::deserialize(WalRecordType::CommitTx, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::CommitTx { tx_id } => assert_eq!(tx_id, 12345),
_ => panic!("Expected CommitTx"),
}
let result = WalRecord::deserialize(WalRecordType::AbortTx, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::AbortTx { tx_id } => assert_eq!(tx_id, 12345),
_ => panic!("Expected AbortTx"),
}
}
#[test]
fn test_deserialize_valid_checkpoint() {
let mut payload = Vec::new();
payload.extend_from_slice(&100u64.to_le_bytes()); payload.extend_from_slice(&1234567890u64.to_le_bytes());
let result = WalRecord::deserialize(WalRecordType::Checkpoint, &payload);
assert!(result.is_ok());
match result.unwrap() {
WalRecord::Checkpoint {
checkpoint_lsn,
timestamp,
} => {
assert_eq!(checkpoint_lsn, 100);
assert_eq!(timestamp, 1234567890);
}
_ => panic!("Expected Checkpoint"),
}
}
#[test]
fn test_invalid_record_type() {
let result = WalRecordType::try_from(0u8);
assert!(matches!(result, Err(WalError::InvalidRecordType(0))));
let result = WalRecordType::try_from(16u8);
assert!(matches!(result, Err(WalError::InvalidRecordType(16))));
let result = WalRecordType::try_from(255u8);
assert!(matches!(result, Err(WalError::InvalidRecordType(255))));
assert!(WalRecordType::try_from(1u8).is_ok()); assert!(WalRecordType::try_from(10u8).is_ok()); assert!(WalRecordType::try_from(12u8).is_ok()); assert!(WalRecordType::try_from(14u8).is_ok()); assert!(WalRecordType::try_from(15u8).is_ok()); }
#[test]
fn test_wal_error_display_and_source() {
let io_err = WalError::Io(io::Error::new(io::ErrorKind::Other, "test io error"));
let display = format!("{}", io_err);
assert!(display.contains("WAL I/O error"));
let invalid = WalError::InvalidRecordType(99);
let display = format!("{}", invalid);
assert!(display.contains("99"));
let corrupted = WalError::CorruptedRecord("test corruption".into());
let display = format!("{}", corrupted);
assert!(display.contains("test corruption"));
let eof = WalError::UnexpectedEof;
let display = format!("{}", eof);
assert!(display.contains("Unexpected end"));
let exists = WalError::AlreadyExists;
let display = format!("{}", exists);
assert!(display.contains("already exists"));
let not_found = WalError::NotFound;
let display = format!("{}", not_found);
assert!(display.contains("not found"));
let parent_not_found = WalError::ParentNotFound(PathBuf::from("/test/path"));
let display = format!("{}", parent_not_found);
assert!(display.contains("/test/path"));
use std::error::Error;
let io_err = WalError::Io(io::Error::new(io::ErrorKind::Other, "test"));
assert!(io_err.source().is_some());
let corrupted = WalError::CorruptedRecord("test".into());
assert!(corrupted.source().is_none());
}
#[test]
fn test_version_update_roundtrip() {
let record = WalRecord::VersionUpdate {
version_id: 42,
root_ptr: 0x1234_5678_9ABC_DEF0,
node_count: 1000,
timestamp: 1699999999,
};
assert_eq!(record.record_type(), WalRecordType::VersionUpdate);
let payload = record.serialize_payload();
assert_eq!(payload.len(), 32);
let deserialized =
WalRecord::deserialize(WalRecordType::VersionUpdate, &payload).expect("deserialize");
match deserialized {
WalRecord::VersionUpdate {
version_id,
root_ptr,
node_count,
timestamp,
} => {
assert_eq!(version_id, 42);
assert_eq!(root_ptr, 0x1234_5678_9ABC_DEF0);
assert_eq!(node_count, 1000);
assert_eq!(timestamp, 1699999999);
}
_ => panic!("Expected VersionUpdate"),
}
}
#[test]
fn test_version_durable_roundtrip() {
let record = WalRecord::VersionDurable {
version_id: 99,
checksum: 0xDEAD_BEEF,
};
assert_eq!(record.record_type(), WalRecordType::VersionDurable);
let payload = record.serialize_payload();
assert_eq!(payload.len(), 12);
let deserialized =
WalRecord::deserialize(WalRecordType::VersionDurable, &payload).expect("deserialize");
match deserialized {
WalRecord::VersionDurable {
version_id,
checksum,
} => {
assert_eq!(version_id, 99);
assert_eq!(checksum, 0xDEAD_BEEF);
}
_ => panic!("Expected VersionDurable"),
}
}
#[test]
fn test_version_gc_roundtrip() {
let record = WalRecord::VersionGc {
version_ids: vec![1, 5, 10, 42, 100],
};
assert_eq!(record.record_type(), WalRecordType::VersionGc);
let payload = record.serialize_payload();
assert_eq!(payload.len(), 4 + 5 * 8);
let deserialized =
WalRecord::deserialize(WalRecordType::VersionGc, &payload).expect("deserialize");
match deserialized {
WalRecord::VersionGc { version_ids } => {
assert_eq!(version_ids, vec![1, 5, 10, 42, 100]);
}
_ => panic!("Expected VersionGc"),
}
}
#[test]
fn test_version_gc_empty() {
let record = WalRecord::VersionGc {
version_ids: vec![],
};
let payload = record.serialize_payload();
assert_eq!(payload.len(), 4);
let deserialized =
WalRecord::deserialize(WalRecordType::VersionGc, &payload).expect("deserialize");
match deserialized {
WalRecord::VersionGc { version_ids } => {
assert!(version_ids.is_empty());
}
_ => panic!("Expected VersionGc"),
}
}
#[test]
fn test_version_update_too_short() {
let result = WalRecord::deserialize(WalRecordType::VersionUpdate, &[0; 31]);
assert!(result.is_err());
}
#[test]
fn test_version_durable_too_short() {
let result = WalRecord::deserialize(WalRecordType::VersionDurable, &[0; 11]);
assert!(result.is_err());
}
#[test]
fn test_version_gc_too_short() {
let mut payload = vec![];
payload.extend_from_slice(&5u32.to_le_bytes()); payload.extend_from_slice(&1u64.to_le_bytes());
payload.extend_from_slice(&2u64.to_le_bytes());
payload.extend_from_slice(&3u64.to_le_bytes());
let result = WalRecord::deserialize(WalRecordType::VersionGc, &payload);
assert!(result.is_err());
}
}