use durability::publish::CheckpointPublisher;
use durability::recover::{CheckpointSegment, CheckpointState};
use durability::recover::{RecoveredState, RecoveryManager};
use durability::storage::{Directory, FsDirectory};
use durability::walog::{WalEntry, WalMaintenance, WalWriter};
use proptest::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
#[derive(Debug, Clone)]
enum Op {
AddSeg { seg: u64, doc_count: u32 },
Del { seg: u64, doc: u32 },
}
fn apply_ops(ops: &[Op]) -> HashMap<u64, (u32, HashSet<u32>)> {
let mut m: HashMap<u64, (u32, HashSet<u32>)> = HashMap::new();
for op in ops {
match *op {
Op::AddSeg { seg, doc_count } => {
m.insert(seg, (doc_count, HashSet::new()));
}
Op::Del { seg, doc } => {
if let Some((_dc, dels)) = m.get_mut(&seg) {
dels.insert(doc);
}
}
}
}
m
}
fn recovered_to_model(state: &RecoveredState) -> HashMap<u64, (u32, HashSet<u32>)> {
let mut m: HashMap<u64, (u32, HashSet<u32>)> = HashMap::new();
for s in &state.segments {
m.insert(s.segment_id, (s.doc_count, s.deleted_docs.clone()));
}
m
}
fn arb_ops() -> impl Strategy<Value = Vec<Op>> {
prop::collection::vec(
prop_oneof![
(1u64..40, 0u32..500).prop_map(|(seg, dc)| Op::AddSeg { seg, doc_count: dc }),
(1u64..40, 0u32..500).prop_map(|(seg, doc)| Op::Del { seg, doc }),
],
1..250,
)
}
proptest! {
#![proptest_config(ProptestConfig {
failure_persistence: None,
cases: 96,
.. ProptestConfig::default()
})]
#[test]
fn publish_then_truncate_preserves_recoverability(
ops in arb_ops(),
split in 0usize..250
) {
let split = split.min(ops.len());
let tmp = tempfile::tempdir().unwrap();
let dir: Arc<dyn Directory> = Arc::new(FsDirectory::new(tmp.path()).unwrap());
let mut wal = WalWriter::<WalEntry>::new(dir.clone());
let mut last_id: u64 = 0;
for op in &ops {
match *op {
Op::AddSeg { seg, doc_count } => {
last_id = wal.append(&WalEntry::AddSegment { segment_id: seg, doc_count }).unwrap();
}
Op::Del { seg, doc } => {
last_id = wal.append(&WalEntry::DeleteDocuments { deletes: vec![(seg, doc)] }).unwrap();
}
}
}
wal.flush_and_sync().unwrap();
let prefix_model = apply_ops(&ops[..split]);
let ckpt_state = CheckpointState {
segments: prefix_model
.iter()
.map(|(&seg, &(dc, ref dels))| CheckpointSegment {
segment_id: seg,
doc_count: dc,
deleted_docs: dels.iter().copied().collect(),
})
.collect(),
};
let ckpt_last_entry_id = split as u64; let ckpt_path = "checkpoints/prop.chk";
let pubr = CheckpointPublisher::new(dir.clone())
.publish_checkpoint(&mut wal, &ckpt_state, ckpt_last_entry_id, ckpt_path)
.unwrap();
prop_assert!(pubr.wal_checkpoint_entry_id > last_id);
let mgr = RecoveryManager::new(dir.clone());
let from_ckpt = mgr.recover(Some(ckpt_path)).unwrap();
let from_scratch = mgr.recover(None).unwrap();
prop_assert_eq!(recovered_to_model(&from_ckpt), recovered_to_model(&from_scratch));
let want = apply_ops(&ops);
prop_assert_eq!(recovered_to_model(&from_scratch), want);
let ranges = WalMaintenance::new(dir.clone()).segment_ranges_strict().unwrap();
prop_assert!(!ranges.is_empty());
}
}