use ironwal::{SyncMode, Wal, WalOptions};
use proptest::prelude::*;
use std::collections::HashMap;
use tempfile::TempDir;
type Model = HashMap<String, Vec<Vec<u8>>>;
#[derive(Debug, Clone)]
enum Action {
Append { stream_id: u8, data: Vec<u8> },
AppendBatch { stream_id: u8, batch: Vec<Vec<u8>> },
Restart,
}
fn action_strategy() -> impl Strategy<Value = Action> {
prop_oneof![
4 => (0..3u8, prop::collection::vec(any::<u8>(), 0..128))
.prop_map(|(s, d)| Action::Append { stream_id: s, data: d }),
4 => (0..3u8, prop::collection::vec(prop::collection::vec(any::<u8>(), 0..64), 1..10))
.prop_map(|(s, b)| Action::AppendBatch { stream_id: s, batch: b }),
1 => Just(Action::Restart),
]
}
fn stream_name(id: u8) -> String {
format!("stream_{}", id)
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(50))]
#[test]
fn fuzz_wal_consistency(actions in prop::collection::vec(action_strategy(), 1..100)) {
let dir = TempDir::new().unwrap();
let root = dir.path().to_path_buf();
let mut opts = WalOptions::new(&root);
opts.sync_mode = SyncMode::BatchOnly; opts.max_entries_per_segment = 5;
opts.max_segment_size = u64::MAX;
let mut wal = Wal::new(opts.clone()).unwrap();
let mut model: Model = HashMap::new();
for action in actions {
match action {
Action::Append { stream_id, data } => {
let name = stream_name(stream_id);
let stream_data = model.entry(name.clone()).or_default();
let expected_id = stream_data.len() as u64;
stream_data.push(data.clone());
let id = wal.append(&name, &data).unwrap();
assert_eq!(id, expected_id, "Sequence ID mismatch for stream {}", name);
},
Action::AppendBatch { stream_id, batch } => {
let name = stream_name(stream_id);
let stream_data = model.entry(name.clone()).or_default();
let start_id = stream_data.len() as u64;
let count = batch.len() as u64;
stream_data.extend(batch.clone());
let refs: Vec<&[u8]> = batch.iter().map(|v| v.as_slice()).collect();
let range = wal.append_batch(&name, &refs).unwrap();
assert_eq!(range, start_id..(start_id + count), "Batch range mismatch for {}", name);
},
Action::Restart => {
drop(wal);
wal = Wal::new(opts.clone()).unwrap();
}
}
for i in 0..3 {
let name = stream_name(i);
if let Some(model_data) = model.get(&name) {
if !model_data.is_empty() {
let last_idx = model_data.len() as u64 - 1;
let val = wal.get(&name, last_idx).unwrap();
assert_eq!(val.as_ref(), Some(&model_data[last_idx as usize]), "Data mismatch at tip for {}", name);
let first_val = wal.get(&name, 0).unwrap();
assert_eq!(first_val.as_ref(), Some(&model_data[0]), "Data mismatch at genesis for {}", name);
}
}
}
}
for (name, data) in model {
let mut iter = wal.iter(&name, 0).unwrap();
let mut count = 0;
for (i, expected_item) in data.iter().enumerate() {
let wal_item = iter.next().expect("WAL ended prematurely").unwrap();
assert_eq!(&wal_item, expected_item, "Mismatch at index {} in stream {}", i, name);
count += 1;
}
assert!(iter.next().is_none(), "WAL has extra items in stream {}", name);
assert_eq!(count, data.len(), "Total count mismatch for {}", name);
}
}
}