durability 0.6.4

Crash-consistent persistence primitives: directory abstraction, generic WAL, checkpoints, and recovery.
Documentation
//! Filesystem-backed `Directory` wrapper with targeted fault injection.
//!
//! Important: this file lives under `tests/support/` so it is **not** compiled as a standalone
//! integration test target.

use durability::storage::{Directory, FsDirectory};
use std::io;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

/// Fault-injection configuration for WAL-related operations.
#[derive(Default)]
pub struct FaultConfig {
    /// Fail when opening an append handle for WAL files.
    pub fail_wal_append_file: bool,
    /// Fail by hiding `file_path()` for WAL paths (simulates "cannot prove durability").
    pub fail_wal_file_path: bool,
    /// Fail when deleting WAL paths (during truncation).
    pub fail_wal_delete: bool,
    /// Count of delete calls attempted against WAL paths.
    pub delete_calls: usize,
}

/// A filesystem-backed `Directory` wrapper with targeted fault injection.
pub struct FaultyDirectory {
    inner: FsDirectory,
    cfg: Arc<Mutex<FaultConfig>>,
}

impl FaultyDirectory {
    /// Wrap an existing `FsDirectory`.
    pub fn new(inner: FsDirectory) -> Self {
        Self {
            inner,
            cfg: Arc::new(Mutex::new(FaultConfig::default())),
        }
    }

    /// Access the shared fault config (for toggling failpoints and reading counters).
    pub fn cfg(&self) -> Arc<Mutex<FaultConfig>> {
        self.cfg.clone()
    }

    fn is_wal_path(path: &str) -> bool {
        (path.starts_with("wal/") || path == "wal") && !path.ends_with(".lock")
    }
}

impl Directory for FaultyDirectory {
    fn create_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
        self.inner.create_file(path)
    }

    fn open_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Read + Send>> {
        self.inner.open_file(path)
    }

    fn exists(&self, path: &str) -> bool {
        self.inner.exists(path)
    }

    fn delete(&self, path: &str) -> durability::PersistenceResult<()> {
        let mut cfg = self.cfg.lock().unwrap();
        if Self::is_wal_path(path) {
            cfg.delete_calls += 1;
            if cfg.fail_wal_delete {
                return Err(io::Error::other("injected delete failure").into());
            }
        }
        drop(cfg);
        self.inner.delete(path)
    }

    fn atomic_rename(&self, from: &str, to: &str) -> durability::PersistenceResult<()> {
        self.inner.atomic_rename(from, to)
    }

    fn create_dir_all(&self, path: &str) -> durability::PersistenceResult<()> {
        self.inner.create_dir_all(path)
    }

    fn list_dir(&self, path: &str) -> durability::PersistenceResult<Vec<String>> {
        self.inner.list_dir(path)
    }

    fn append_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
        let cfg = self.cfg.lock().unwrap();
        if cfg.fail_wal_append_file && Self::is_wal_path(path) {
            return Err(io::Error::other("injected append failure").into());
        }
        drop(cfg);
        self.inner.append_file(path)
    }

    fn atomic_write(&self, path: &str, data: &[u8]) -> durability::PersistenceResult<()> {
        self.inner.atomic_write(path, data)
    }

    fn file_path(&self, path: &str) -> Option<std::path::PathBuf> {
        let cfg = self.cfg.lock().unwrap();
        if cfg.fail_wal_file_path && Self::is_wal_path(path) {
            return None;
        }
        drop(cfg);
        self.inner.file_path(path)
    }
}

// ---------------------------------------------------------------------------
// I/O countdown directory (inspired by redb's FuzzerBackend)
// ---------------------------------------------------------------------------

/// A `Directory` wrapper that counts write operations and fails after a threshold.
///
/// Useful for crash-simulation testing: set the countdown to N, then drive the WAL
/// through operations. When the countdown hits zero, all writes fail with IO errors.
/// Verify that recovery produces a valid prefix of the written entries.
pub struct CountdownDirectory {
    inner: Arc<dyn Directory>,
    /// Number of write operations remaining before failure. 0 = already failed.
    /// u64::MAX = never fail (default).
    remaining: AtomicU64,
}

impl CountdownDirectory {
    /// Wrap a directory with an initially-disabled countdown (never fails).
    pub fn new(inner: impl Into<Arc<dyn Directory>>) -> Self {
        Self {
            inner: inner.into(),
            remaining: AtomicU64::new(u64::MAX),
        }
    }

    /// Arm the countdown: fail after `n` more write operations.
    pub fn arm(&self, n: u64) {
        self.remaining.store(n, Ordering::SeqCst);
    }

    /// Disarm the countdown (stop injecting failures).
    pub fn disarm(&self) {
        self.remaining.store(u64::MAX, Ordering::SeqCst);
    }

    /// Check if the countdown has triggered.
    pub fn is_triggered(&self) -> bool {
        self.remaining.load(Ordering::SeqCst) == 0
    }

    /// Return the current remaining count.
    pub fn remaining(&self) -> u64 {
        self.remaining.load(Ordering::SeqCst)
    }

    /// Decrement the countdown. Returns Err if we've hit zero.
    fn check_write(&self) -> durability::PersistenceResult<()> {
        loop {
            let current = self.remaining.load(Ordering::SeqCst);
            if current == 0 {
                return Err(io::Error::other("countdown: simulated write failure").into());
            }
            if current == u64::MAX {
                return Ok(());
            }
            if self
                .remaining
                .compare_exchange(current, current - 1, Ordering::SeqCst, Ordering::SeqCst)
                .is_ok()
            {
                return Ok(());
            }
        }
    }
}

impl Directory for CountdownDirectory {
    fn create_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
        self.check_write()?;
        self.inner.create_file(path)
    }

    fn open_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Read + Send>> {
        self.inner.open_file(path)
    }

    fn exists(&self, path: &str) -> bool {
        self.inner.exists(path)
    }

    fn delete(&self, path: &str) -> durability::PersistenceResult<()> {
        self.inner.delete(path)
    }

    fn atomic_rename(&self, from: &str, to: &str) -> durability::PersistenceResult<()> {
        self.check_write()?;
        self.inner.atomic_rename(from, to)
    }

    fn create_dir_all(&self, path: &str) -> durability::PersistenceResult<()> {
        self.inner.create_dir_all(path)
    }

    fn list_dir(&self, path: &str) -> durability::PersistenceResult<Vec<String>> {
        self.inner.list_dir(path)
    }

    fn append_file(&self, path: &str) -> durability::PersistenceResult<Box<dyn io::Write + Send>> {
        self.check_write()?;
        self.inner.append_file(path)
    }

    fn atomic_write(&self, path: &str, data: &[u8]) -> durability::PersistenceResult<()> {
        self.check_write()?;
        self.inner.atomic_write(path, data)
    }

    fn file_path(&self, path: &str) -> Option<std::path::PathBuf> {
        self.inner.file_path(path)
    }
}