use durability::error::{PersistenceError, PersistenceResult};
use durability::recover::{recover_with_wal, RecoveryOptions};
use durability::storage::FlushPolicy;
use durability::storage::{Directory, MemoryDirectory};
use durability::walog::{WalEntry, WalReader, WalWriter};
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[test]
fn corrupt_length_field_under_16_errors_in_strict_mode() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<WalEntry>::new(dir.clone());
w.append(&WalEntry::AddSegment {
segment_id: 1,
doc_count: 10,
})
.unwrap();
w.flush().unwrap();
drop(w);
let files = dir.list_dir("wal").unwrap();
let wal_file = files.iter().find(|f| f.ends_with(".log")).unwrap();
let wal_path = format!("wal/{wal_file}");
let mut data = Vec::new();
dir.open_file(&wal_path)
.unwrap()
.read_to_end(&mut data)
.unwrap();
data.extend_from_slice(&8u32.to_le_bytes());
data.extend_from_slice(&[0xAA; 16]);
dir.atomic_write(&wal_path, &data).unwrap();
let reader = WalReader::<WalEntry>::new(dir);
let err = reader.replay().unwrap_err();
match err {
PersistenceError::Format(msg) => {
assert!(
msg.contains("length < header"),
"expected length < header error, got: {msg}"
);
}
other => panic!("expected Format error, got: {other}"),
}
}
#[test]
fn corrupt_length_field_under_16_errors_in_best_effort() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<WalEntry>::new(dir.clone());
w.append(&WalEntry::AddSegment {
segment_id: 1,
doc_count: 10,
})
.unwrap();
w.flush().unwrap();
drop(w);
let files = dir.list_dir("wal").unwrap();
let wal_file = files.iter().find(|f| f.ends_with(".log")).unwrap();
let wal_path = format!("wal/{wal_file}");
let mut data = Vec::new();
dir.open_file(&wal_path)
.unwrap()
.read_to_end(&mut data)
.unwrap();
data.extend_from_slice(&4u32.to_le_bytes());
dir.atomic_write(&wal_path, &data).unwrap();
let reader = WalReader::<WalEntry>::new(dir);
let err = reader.replay_best_effort().unwrap_err();
match err {
PersistenceError::Format(msg) => {
assert!(msg.contains("length < header"));
}
other => panic!("expected Format error, got: {other}"),
}
}
#[test]
fn zero_length_is_eof_in_best_effort_but_error_in_strict() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<WalEntry>::new(dir.clone());
w.append(&WalEntry::AddSegment {
segment_id: 1,
doc_count: 10,
})
.unwrap();
w.flush().unwrap();
drop(w);
let files = dir.list_dir("wal").unwrap();
let wal_file = files.iter().find(|f| f.ends_with(".log")).unwrap();
let wal_path = format!("wal/{wal_file}");
let mut data = Vec::new();
dir.open_file(&wal_path)
.unwrap()
.read_to_end(&mut data)
.unwrap();
data.extend_from_slice(&[0u8; 16]);
dir.atomic_write(&wal_path, &data).unwrap();
let reader = WalReader::<WalEntry>::new(dir.clone());
let entries = reader.replay_best_effort().unwrap();
assert_eq!(entries.len(), 1);
let reader2 = WalReader::<WalEntry>::new(dir);
assert!(reader2.replay().is_err());
}
#[test]
fn resume_propagates_corrupt_length_in_last_segment() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<WalEntry>::new(dir.clone());
w.append(&WalEntry::AddSegment {
segment_id: 1,
doc_count: 10,
})
.unwrap();
w.flush().unwrap();
drop(w);
let files = dir.list_dir("wal").unwrap();
let wal_file = files.iter().find(|f| f.ends_with(".log")).unwrap();
let wal_path = format!("wal/{wal_file}");
let mut data = Vec::new();
dir.open_file(&wal_path)
.unwrap()
.read_to_end(&mut data)
.unwrap();
data.extend_from_slice(&12u32.to_le_bytes());
data.extend_from_slice(&[0xFF; 20]);
dir.atomic_write(&wal_path, &data).unwrap();
let err = WalWriter::<WalEntry>::resume(dir);
assert!(
err.is_err(),
"resume should error on corrupt non-zero length in tail"
);
}
struct FailingCreateDir {
inner: Arc<dyn Directory>,
fail_create: AtomicBool,
}
impl Directory for FailingCreateDir {
fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
if self.fail_create.load(Ordering::SeqCst) && path.contains("wal_") {
return Err(PersistenceError::Io(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"injected",
)));
}
self.inner.create_file(path)
}
fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
self.inner.append_file(path)
}
fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
self.inner.open_file(path)
}
fn exists(&self, path: &str) -> bool {
self.inner.exists(path)
}
fn delete(&self, path: &str) -> PersistenceResult<()> {
self.inner.delete(path)
}
fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
self.inner.atomic_rename(from, to)
}
fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
self.inner.list_dir(path)
}
fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
self.inner.atomic_write(path, data)
}
fn create_dir_all(&self, path: &str) -> PersistenceResult<()> {
self.inner.create_dir_all(path)
}
fn file_path(&self, _path: &str) -> Option<PathBuf> {
None
}
}
#[test]
fn writer_poisoned_after_rotation_failure() {
let inner = MemoryDirectory::arc();
let failing_dir = Arc::new(FailingCreateDir {
inner: inner.clone(),
fail_create: AtomicBool::new(false),
});
let dir: Arc<dyn Directory> = failing_dir.clone();
let mut w = WalWriter::<WalEntry>::with_options(dir, FlushPolicy::PerAppend, 0);
w.set_segment_size_limit_bytes(40);
w.append(&WalEntry::AddSegment {
segment_id: 1,
doc_count: 100,
})
.unwrap();
failing_dir.fail_create.store(true, Ordering::SeqCst);
let err = w.append(&WalEntry::AddSegment {
segment_id: 99,
doc_count: 1,
});
assert!(
err.is_err(),
"append should fail when segment creation fails"
);
failing_dir.fail_create.store(false, Ordering::SeqCst);
let err2 = w.append(&WalEntry::AddSegment {
segment_id: 100,
doc_count: 1,
});
match err2 {
Err(PersistenceError::InvalidState(msg)) => {
assert!(
msg.contains("poisoned"),
"expected poisoned error, got: {msg}"
);
}
other => panic!("expected poisoned error, got: {other:?}"),
}
}
#[derive(serde::Serialize, serde::Deserialize)]
enum CountOp {
Set(u64),
Inc,
}
#[test]
fn point_in_time_recovery_stops_at_ceiling() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<CountOp>::new(dir.clone());
for i in 1..=10u64 {
w.append(&CountOp::Set(i)).unwrap();
}
w.flush().unwrap();
drop(w);
let result = recover_with_wal::<(), CountOp, _>(
&dir,
None,
RecoveryOptions::up_to(5),
|_| 0u64,
|state, _id, entry| {
if let CountOp::Set(v) = entry {
*state = v;
}
},
)
.unwrap();
assert_eq!(result.state, 5, "should have applied entries 1..=5");
assert_eq!(result.last_entry_id, 5);
}
#[test]
fn point_in_time_recovery_with_no_ceiling_applies_all() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<CountOp>::new(dir.clone());
for _ in 0..7 {
w.append(&CountOp::Inc).unwrap();
}
w.flush().unwrap();
drop(w);
let result = recover_with_wal::<(), CountOp, _>(
&dir,
None,
RecoveryOptions::strict(),
|_| 0u64,
|state, _id, entry| {
if let CountOp::Inc = entry {
*state += 1;
}
},
)
.unwrap();
assert_eq!(result.state, 7);
assert_eq!(result.last_entry_id, 7);
}
#[test]
fn point_in_time_recovery_ceiling_zero_applies_nothing() {
let dir = MemoryDirectory::arc();
let mut w = WalWriter::<CountOp>::new(dir.clone());
for _ in 0..5 {
w.append(&CountOp::Inc).unwrap();
}
w.flush().unwrap();
drop(w);
let result = recover_with_wal::<(), CountOp, _>(
&dir,
None,
RecoveryOptions::up_to(0),
|_| 0u64,
|state, _id, entry| {
if let CountOp::Inc = entry {
*state += 1;
}
},
)
.unwrap();
assert_eq!(result.state, 0, "ceiling 0 should apply no entries");
assert_eq!(result.last_entry_id, 0);
}