use crate::store::platform::fs::{PositionedReadError, StoreFs};
use crate::store::{StoreError, SyncMode};
use std::collections::BTreeMap;
use std::fs::{File, ReadDir};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
#[derive(Default, Clone, Copy)]
struct DurableState {
durable_len: u64,
}
pub(crate) struct SimFs {
rng: Mutex<fastrand::Rng>,
durable: Mutex<BTreeMap<PathBuf, DurableState>>,
fsync_drop_one_in: u32,
enospc_on_copy: Mutex<EnospcSchedule>,
op_fault: Mutex<OpFaultSchedule>,
#[cfg(test)]
read_fault: Mutex<ReadFaultSchedule>,
}
#[derive(Default)]
struct EnospcSchedule {
fail_at: Option<u32>,
seen: u32,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum CrashOp {
Rename,
RemoveFile,
PersistTemp,
}
#[derive(Default)]
struct OpFaultSchedule {
target: Option<(CrashOp, u32)>,
seen: u32,
}
#[cfg(test)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ReadFaultKind {
Io,
ShortRead {
bytes_read: usize,
},
}
#[cfg(test)]
#[derive(Default)]
struct ReadFaultSchedule {
target: Option<(u32, ReadFaultKind)>,
seen: u32,
}
impl SimFs {
pub(crate) fn new(seed: u64, fsync_drop_one_in: u32) -> Self {
Self {
rng: Mutex::new(fastrand::Rng::with_seed(seed)),
durable: Mutex::new(BTreeMap::new()),
fsync_drop_one_in,
enospc_on_copy: Mutex::new(EnospcSchedule::default()),
op_fault: Mutex::new(OpFaultSchedule::default()),
#[cfg(test)]
read_fault: Mutex::new(ReadFaultSchedule::default()),
}
}
#[cfg(test)]
pub(crate) fn with_fault_on(self, op: CrashOp, fail_at: u32) -> Self {
self.arm_fault_on(op, fail_at);
self
}
#[cfg(test)]
pub(crate) fn arm_fault_on(&self, op: CrashOp, fail_at: u32) {
let mut sched = self
.op_fault
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
sched.target = Some((op, fail_at));
sched.seen = 0;
}
fn op_fault_strikes(&self, op: CrashOp) -> bool {
let mut sched = self
.op_fault
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let Some((target, fail_at)) = sched.target else {
return false;
};
if target != op {
return false;
}
sched.seen = sched.seen.saturating_add(1);
sched.seen == fail_at
}
fn injected_op_fault(op: CrashOp) -> io::Error {
io::Error::other(format!("SimFs: injected fault on {op:?}"))
}
#[cfg(test)]
pub(crate) fn with_read_fault_on(self, fail_at: u32, kind: ReadFaultKind) -> Self {
self.arm_read_fault_on(fail_at, kind);
self
}
#[cfg(test)]
pub(crate) fn arm_read_fault_on(&self, fail_at: u32, kind: ReadFaultKind) {
let mut sched = self
.read_fault
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
sched.target = Some((fail_at, kind));
sched.seen = 0;
}
#[cfg(test)]
fn read_fault_strikes(&self) -> Option<ReadFaultKind> {
let mut sched = self
.read_fault
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (fail_at, kind) = sched.target?;
sched.seen = sched.seen.saturating_add(1);
(sched.seen == fail_at).then_some(kind)
}
pub(crate) fn with_enospc_on_copy(self, fail_at: u32) -> Self {
{
let mut sched = self
.enospc_on_copy
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
sched.fail_at = Some(fail_at);
sched.seen = 0;
}
self
}
fn enospc_strikes_now(&self) -> bool {
let mut sched = self
.enospc_on_copy
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let Some(fail_at) = sched.fail_at else {
return false;
};
sched.seen = sched.seen.saturating_add(1);
sched.seen == fail_at
}
fn fsync_dropped(&self) -> bool {
let mut rng = self
.rng
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let roll = rng.u32(..);
self.fsync_drop_one_in != 0 && roll.is_multiple_of(self.fsync_drop_one_in)
}
fn record_durable(&self, file: &File, path: &Path) {
let Ok(metadata) = file.metadata() else {
return;
};
let len = metadata.len();
let mut durable = self
.durable
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
durable.entry(path.to_path_buf()).or_default().durable_len = len;
}
#[cfg(test)]
pub(crate) fn durable_len(&self, path: &Path) -> u64 {
self.durable
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(path)
.map_or(0, |state| state.durable_len)
}
pub(crate) fn crash(&self) {
let durable = self
.durable
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
for (path, state) in durable.iter() {
let _truncated = crate::store::platform::fs::truncate_file_to(path, state.durable_len);
}
}
fn track_materialized_file(&self, path: &Path) {
let Ok(meta) = crate::store::platform::fs::metadata(path) else {
return;
};
let mut durable = self
.durable
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
durable.entry(path.to_path_buf()).or_default().durable_len = meta.len();
}
}
impl StoreFs for SimFs {
fn read_dir(&self, path: &Path) -> io::Result<ReadDir> {
crate::store::platform::fs::read_dir(path)
}
fn create_dir_all(&self, path: &Path) -> io::Result<()> {
crate::store::platform::fs::create_dir_all(path)
}
fn create_new_file(&self, path: &Path) -> Result<File, StoreError> {
let file = crate::store::platform::fs::create_new_file(path)?;
self.durable
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.entry(path.to_path_buf())
.or_default();
Ok(file)
}
fn sync_file_with_mode(
&self,
file: &File,
path: &Path,
_mode: &SyncMode,
) -> Result<(), StoreError> {
if self.fsync_dropped() {
return Ok(());
}
self.record_durable(file, path);
Ok(())
}
fn sync_file_all(&self, file: &File, path: &Path) -> io::Result<()> {
if self.fsync_dropped() {
return Ok(());
}
self.record_durable(file, path);
Ok(())
}
fn sync_parent_dir(&self, _path: &Path) -> Result<(), StoreError> {
Ok(())
}
fn reject_symlink_leaf(&self, path: &Path, purpose: &str) -> Result<(), StoreError> {
crate::store::platform::fs::reject_symlink_leaf(path, purpose)
}
fn canonicalize(&self, path: &Path) -> io::Result<PathBuf> {
crate::store::platform::fs::canonicalize(path)
}
fn symlink_metadata(&self, path: &Path) -> io::Result<std::fs::Metadata> {
crate::store::platform::fs::symlink_metadata(path)
}
fn cow_copy_file(
&self,
from: &Path,
to: &Path,
preference: crate::store::CopyPreference,
) -> io::Result<crate::store::platform::fs::CowStrategyUsed> {
if self.enospc_strikes_now() {
return Err(io::Error::new(
io::ErrorKind::StorageFull,
"SimFs: injected ENOSPC mid-fork on cow_copy_file",
));
}
let used = crate::store::platform::fs::cow_copy_file(from, to, preference)?;
self.track_materialized_file(to);
Ok(used)
}
fn copy(&self, from: &Path, to: &Path) -> io::Result<u64> {
if self.enospc_strikes_now() {
return Err(io::Error::new(
io::ErrorKind::StorageFull,
"SimFs: injected ENOSPC mid-fork on copy",
));
}
let bytes = crate::store::platform::fs::copy(from, to)?;
self.track_materialized_file(to);
Ok(bytes)
}
fn metadata(&self, path: &Path) -> io::Result<std::fs::Metadata> {
crate::store::platform::fs::metadata(path)
}
fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
if self.op_fault_strikes(CrashOp::Rename) {
return Err(Self::injected_op_fault(CrashOp::Rename));
}
crate::store::platform::fs::rename(from, to)
}
fn remove_file(&self, path: &Path) -> io::Result<()> {
if self.op_fault_strikes(CrashOp::RemoveFile) {
return Err(Self::injected_op_fault(CrashOp::RemoveFile));
}
crate::store::platform::fs::remove_file(path)
}
fn named_temp_in(&self, dir: &Path) -> io::Result<tempfile::NamedTempFile> {
crate::store::platform::fs::named_temp_in(dir)
}
fn persist_temp_with_parent_sync(
&self,
named_temp: tempfile::NamedTempFile,
final_path: &Path,
admission: crate::store::platform::sync::ParentDirSyncAdmission,
) -> io::Result<()> {
if self.op_fault_strikes(CrashOp::PersistTemp) {
return Err(Self::injected_op_fault(CrashOp::PersistTemp));
}
crate::store::platform::sync::persist_temp_with_parent_sync(
named_temp, final_path, admission,
)
}
fn read_exact_at(
&self,
file: &mut File,
offset: u64,
buf: &mut [u8],
) -> Result<(), PositionedReadError> {
#[cfg(test)]
if let Some(kind) = self.read_fault_strikes() {
return Err(match kind {
ReadFaultKind::Io => PositionedReadError::Io(io::Error::other(
"SimFs: injected positioned-read fault",
)),
ReadFaultKind::ShortRead { bytes_read } => {
PositionedReadError::ShortRead { bytes_read }
}
});
}
crate::store::platform::fs::read_exact_at(file, offset, buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[test]
fn same_seed_same_fsync_drop_schedule() {
let a = SimFs::new(99, 4);
let b = SimFs::new(99, 4);
let pa: Vec<_> = (0..64).map(|_| a.fsync_dropped()).collect();
let pb: Vec<_> = (0..64).map(|_| b.fsync_dropped()).collect();
assert_eq!(
pa, pb,
"PROPERTY: identical seeds produce identical fsync-drop schedules"
);
}
#[test]
fn crash_truncates_to_durable_length() {
let dir = tempfile::tempdir().expect("tmpdir");
let fs = SimFs::new(1, 0);
let path = dir.path().join("seg.fbat");
let mut file = fs.create_new_file(&path).expect("create");
file.write_all(b"durable").expect("write durable");
fs.sync_file_all(&file, &path).expect("honored fsync");
let durable = fs.durable_len(&path);
file.write_all(b"-and-lost-tail").expect("write tail");
crate::store::platform::sync::sync_file_all_io(&file).expect("flush real bytes to disk");
fs.crash();
let recovered = crate::store::platform::fs::metadata(&path)
.expect("stat")
.len();
assert_eq!(
recovered, durable,
"PROPERTY: a crash truncates the real file to its last durable (fsynced) length"
);
assert_eq!(
recovered,
b"durable".len() as u64,
"PROPERTY: only the fsynced prefix survives the crash"
);
}
#[test]
fn dropped_fsync_does_not_advance_durable_length() {
let dir = tempfile::tempdir().expect("tmpdir");
let fs = SimFs::new(7, 1);
let path = dir.path().join("seg.fbat");
let mut file = fs.create_new_file(&path).expect("create");
file.write_all(b"unsynced").expect("write");
crate::store::platform::sync::sync_file_all_io(&file).expect("flush real bytes");
fs.sync_file_all(&file, &path)
.expect("dropped fsync still returns Ok to the store");
assert_eq!(
fs.durable_len(&path),
0,
"PROPERTY: a dropped fsync returns Ok but never advances the durable length"
);
fs.crash();
assert_eq!(
crate::store::platform::fs::metadata(&path)
.expect("stat")
.len(),
0,
"PROPERTY: an all-dropped-fsync file loses its entire tail on crash"
);
}
}