use std::sync::Arc;
use std::thread;
use wal_db::{SegmentedStore, Wal};
#[test]
fn append_sync_reopen_across_many_segments() {
let dir = tempfile::tempdir().unwrap();
let records: Vec<Vec<u8>> = (0..500u32)
.map(|i| format!("record {i} with some content").into_bytes())
.collect();
{
let wal = Wal::open_segmented(dir.path(), 32).unwrap();
for record in &records {
let _ = wal.append(record).unwrap();
}
wal.sync().unwrap();
}
let segment_count = std::fs::read_dir(dir.path())
.unwrap()
.filter(|e| {
e.as_ref()
.unwrap()
.file_name()
.to_str()
.unwrap()
.ends_with(".wal")
})
.count();
assert!(
segment_count > 5,
"expected many segments, got {segment_count}"
);
let wal = Wal::open_segmented(dir.path(), 32).unwrap();
let read_back: Vec<Vec<u8>> = wal
.iter()
.unwrap()
.map(|entry| entry.unwrap().into_data())
.collect();
assert_eq!(read_back, records);
}
#[test]
fn record_larger_than_a_segment_spans_several() {
let dir = tempfile::tempdir().unwrap();
let big = vec![0x7Cu8; 1000];
{
let wal = Wal::open_segmented(dir.path(), 64).unwrap();
let _ = wal.append(&big).unwrap();
let _ = wal.append(b"after the big one").unwrap();
wal.sync().unwrap();
}
let wal = Wal::open_segmented(dir.path(), 64).unwrap();
let records: Vec<Vec<u8>> = wal
.iter()
.unwrap()
.map(|entry| entry.unwrap().into_data())
.collect();
assert_eq!(records, vec![big, b"after the big one".to_vec()]);
}
#[test]
fn concurrent_writers_across_segment_boundaries() {
const THREADS: usize = 8;
const PER_THREAD: usize = 300;
let dir = tempfile::tempdir().unwrap();
let wal = Arc::new(Wal::open_segmented(dir.path(), 48).unwrap());
let mut handles = Vec::new();
for t in 0..THREADS {
let wal = Arc::clone(&wal);
handles.push(thread::spawn(move || {
for i in 0..PER_THREAD {
let _ = wal.append(format!("t{t}-record-{i}").as_bytes()).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
wal.sync().unwrap();
let count = wal.iter().unwrap().count();
assert_eq!(count, THREADS * PER_THREAD);
drop(wal);
let reopened = Wal::open_segmented(dir.path(), 48).unwrap();
assert_eq!(reopened.iter().unwrap().count(), THREADS * PER_THREAD);
}
#[test]
fn recovery_truncates_a_torn_tail_in_the_last_segment() {
let dir = tempfile::tempdir().unwrap();
let clean_len;
{
let wal = Wal::open_segmented(dir.path(), 32).unwrap();
for i in 0..50 {
let _ = wal.append(format!("record {i}").as_bytes()).unwrap();
}
wal.sync().unwrap();
clean_len = wal.len();
}
{
let store = SegmentedStore::open(dir.path(), 32).unwrap();
store_write_garbage(&store, clean_len);
}
let wal = Wal::open_segmented(dir.path(), 32).unwrap();
assert_eq!(wal.iter().unwrap().count(), 50);
assert_eq!(wal.len(), clean_len);
}
fn store_write_garbage(store: &SegmentedStore, at: u64) {
use wal_db::WalStore;
store.write_at(at, &[0xFF; 6]).unwrap();
store.sync().unwrap();
}