use durability::storage::{Directory, FsDirectory};
use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
#[derive(Default)]
pub struct FaultConfig {
pub fail_wal_append_file: bool,
pub fail_wal_file_path: bool,
pub fail_wal_delete: bool,
pub delete_calls: usize,
}
pub struct FaultyDirectory {
inner: FsDirectory,
cfg: Arc<Mutex<FaultConfig>>,
}
impl FaultyDirectory {
pub fn new(inner: FsDirectory) -> Self {
Self {
inner,
cfg: Arc::new(Mutex::new(FaultConfig::default())),
}
}
pub fn cfg(&self) -> Arc<Mutex<FaultConfig>> {
self.cfg.clone()
}
fn is_wal_path(path: &str) -> bool {
(path.starts_with("wal/") || path == "wal") && !path.ends_with(".lock")
}
}
impl Directory for FaultyDirectory {
fn create_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
self.inner.create_file(path)
}
fn open_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Read + Send>> {
self.inner.open_file(path)
}
fn exists(&self, path: &str) -> bool {
self.inner.exists(path)
}
fn delete(&self, path: &str) -> durability::PersistenceResult<()> {
let mut cfg = self.cfg.lock().unwrap();
if Self::is_wal_path(path) {
cfg.delete_calls += 1;
if cfg.fail_wal_delete {
return Err(io::Error::other("injected delete failure").into());
}
}
drop(cfg);
self.inner.delete(path)
}
fn atomic_rename(&self, from: &str, to: &str) -> durability::PersistenceResult<()> {
self.inner.atomic_rename(from, to)
}
fn create_dir_all(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.create_dir_all(path)
}
fn list_dir(&self, path: &str) -> durability::PersistenceResult<Vec<String>> {
self.inner.list_dir(path)
}
fn append_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
let cfg = self.cfg.lock().unwrap();
if cfg.fail_wal_append_file && Self::is_wal_path(path) {
return Err(io::Error::other("injected append failure").into());
}
drop(cfg);
self.inner.append_file(path)
}
fn atomic_write(&self, path: &str, data: &[u8]) -> durability::PersistenceResult<()> {
self.inner.atomic_write(path, data)
}
fn file_path(&self, path: &str) -> Option<std::path::PathBuf> {
let cfg = self.cfg.lock().unwrap();
if cfg.fail_wal_file_path && Self::is_wal_path(path) {
return None;
}
drop(cfg);
self.inner.file_path(path)
}
}
pub struct CountdownDirectory {
inner: Arc<dyn Directory>,
remaining: AtomicU64,
}
impl CountdownDirectory {
pub fn new(inner: impl Into<Arc<dyn Directory>>) -> Self {
Self {
inner: inner.into(),
remaining: AtomicU64::new(u64::MAX),
}
}
pub fn arm(&self, n: u64) {
self.remaining.store(n, Ordering::SeqCst);
}
pub fn disarm(&self) {
self.remaining.store(u64::MAX, Ordering::SeqCst);
}
pub fn is_triggered(&self) -> bool {
self.remaining.load(Ordering::SeqCst) == 0
}
pub fn remaining(&self) -> u64 {
self.remaining.load(Ordering::SeqCst)
}
fn check_write(&self) -> durability::PersistenceResult<()> {
loop {
let current = self.remaining.load(Ordering::SeqCst);
if current == 0 {
return Err(io::Error::other("countdown: simulated write failure").into());
}
if current == u64::MAX {
return Ok(());
}
if self
.remaining
.compare_exchange(current, current - 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
return Ok(());
}
}
}
}
impl Directory for CountdownDirectory {
fn create_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
self.check_write()?;
self.inner.create_file(path)
}
fn open_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Read + Send>> {
self.inner.open_file(path)
}
fn exists(&self, path: &str) -> bool {
self.inner.exists(path)
}
fn delete(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.delete(path)
}
fn atomic_rename(&self, from: &str, to: &str) -> durability::PersistenceResult<()> {
self.check_write()?;
self.inner.atomic_rename(from, to)
}
fn create_dir_all(&self, path: &str) -> durability::PersistenceResult<()> {
self.inner.create_dir_all(path)
}
fn list_dir(&self, path: &str) -> durability::PersistenceResult<Vec<String>> {
self.inner.list_dir(path)
}
fn append_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
self.check_write()?;
self.inner.append_file(path)
}
fn atomic_write(&self, path: &str, data: &[u8]) -> durability::PersistenceResult<()> {
self.check_write()?;
self.inner.atomic_write(path, data)
}
fn file_path(&self, path: &str) -> Option<std::path::PathBuf> {
self.inner.file_path(path)
}
}