use crate::store::fault::InjectionPoint;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(crate) enum Fault {
None,
TornWrite {
prefix_len: usize,
},
ShortRead {
returned: usize,
},
FsyncDrop,
}
#[derive(Default, Clone)]
struct SimFile {
bytes: Vec<u8>,
durable_len: usize,
}
pub(crate) struct InMemFaultFs {
rng: Mutex<fastrand::Rng>,
files: Mutex<BTreeMap<PathBuf, SimFile>>,
}
impl InMemFaultFs {
pub(crate) fn new(seed: u64) -> Self {
Self {
rng: Mutex::new(fastrand::Rng::with_seed(seed)),
files: Mutex::new(BTreeMap::new()),
}
}
pub(crate) fn decide_fault(&self, point: &InjectionPoint, op_len: usize) -> Fault {
let mut rng = self
.rng
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let roll = rng.u32(..);
if !roll.is_multiple_of(8) {
return Fault::None;
}
let bounded = |op_len: usize| -> usize {
if op_len == 0 {
0
} else {
(roll as usize) % op_len
}
};
match point {
InjectionPoint::BatchFsync { .. }
| InjectionPoint::SingleAppendWritten { .. }
| InjectionPoint::ActiveSegmentSync { .. }
| InjectionPoint::BatchCommitWritten { .. } => Fault::FsyncDrop,
InjectionPoint::ReadAt { .. } | InjectionPoint::ColdStartScanFrame { .. } => {
Fault::ShortRead {
returned: bounded(op_len),
}
}
InjectionPoint::BatchStart { .. }
| InjectionPoint::BatchBeginWritten { .. }
| InjectionPoint::BatchItemWritten { .. }
| InjectionPoint::BatchItemsComplete { .. }
| InjectionPoint::BatchPrePublish { .. }
| InjectionPoint::SingleAppendStart { .. }
| InjectionPoint::SingleAppendPublished { .. }
| InjectionPoint::SegmentRotationCreate { .. }
| InjectionPoint::SegmentRotation { .. }
| InjectionPoint::MmapIndexLoad
| InjectionPoint::IndexFooterDecode { .. }
| InjectionPoint::CheckpointDecode
| InjectionPoint::HiddenRangesLoad => Fault::TornWrite {
prefix_len: bounded(op_len),
},
}
}
pub(crate) fn write_bytes(&self, path: &Path, point: &InjectionPoint, data: &[u8]) -> usize {
let landed = match self.decide_fault(point, data.len()) {
Fault::TornWrite { prefix_len } => prefix_len,
Fault::None | Fault::ShortRead { .. } | Fault::FsyncDrop => data.len(),
};
let mut files = self
.files
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let file = files.entry(path.to_path_buf()).or_default();
file.bytes.extend_from_slice(&data[..landed]);
landed
}
pub(crate) fn read_bytes(
&self,
path: &Path,
point: &InjectionPoint,
offset: usize,
len: usize,
) -> Vec<u8> {
let files = self
.files
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let Some(file) = files.get(path) else {
return Vec::new();
};
let available = file.bytes.len().saturating_sub(offset);
let want = len.min(available);
let deliver = match self.decide_fault(point, want) {
Fault::ShortRead { returned } => returned.min(want),
Fault::None | Fault::TornWrite { .. } | Fault::FsyncDrop => want,
};
file.bytes[offset..offset + deliver].to_vec()
}
pub(crate) fn fsync(&self, path: &Path, point: &InjectionPoint) -> bool {
let dropped = matches!(self.decide_fault(point, 0), Fault::FsyncDrop);
if dropped {
return false;
}
let mut files = self
.files
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(file) = files.get_mut(path) {
file.durable_len = file.bytes.len();
}
true
}
pub(crate) fn crash(&self) {
let mut files = self
.files
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
for file in files.values_mut() {
file.bytes.truncate(file.durable_len);
}
}
pub(crate) fn durable_len(&self, path: &Path) -> usize {
self.files
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(path)
.map_or(0, |f| f.durable_len)
}
pub(crate) fn len(&self, path: &Path) -> usize {
self.files
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(path)
.map_or(0, |f| f.bytes.len())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn point() -> InjectionPoint {
InjectionPoint::SingleAppendStart {
entity: "e".to_string(),
}
}
#[test]
fn same_seed_same_fault_sequence() {
let a = InMemFaultFs::new(99);
let b = InMemFaultFs::new(99);
let pa: Vec<_> = (0..32).map(|_| a.decide_fault(&point(), 100)).collect();
let pb: Vec<_> = (0..32).map(|_| b.decide_fault(&point(), 100)).collect();
assert_eq!(
pa, pb,
"PROPERTY: identical seeds produce identical fault sequences"
);
}
#[test]
fn crash_truncates_to_durable_length() {
let fs = InMemFaultFs::new(1);
let p = Path::new("seg.fbat");
let landed = fs.write_bytes(p, &point(), b"hello world");
for _ in 0..16 {
if fs.fsync(p, &point()) {
break;
}
}
let durable = fs.durable_len(p);
fs.crash();
assert_eq!(
fs.len(p),
durable,
"PROPERTY: a crash truncates each file to its last durable length"
);
assert!(
landed <= 11,
"torn writes never exceed the requested length"
);
}
#[test]
fn fired_fault_is_bounded_by_the_exact_roll_modulo_op_len() {
let mut chosen = None;
for seed in 0..10_000u64 {
let roll = fastrand::Rng::with_seed(seed).u32(..);
if roll.is_multiple_of(8) && !roll.is_multiple_of(100) {
chosen = Some((seed, roll));
break;
}
}
let (seed, roll) =
chosen.expect("a firing, off-grid first roll exists among the first 10_000 seeds");
let expected = (roll as usize) % 100;
assert_ne!(
expected, 0,
"scenario shape: the bounded prefix must be nonzero"
);
let fs = InMemFaultFs::new(seed);
let fault = fs.decide_fault(&point(), 100);
assert_eq!(
fault,
Fault::TornWrite {
prefix_len: expected
},
"a fired torn write is bounded to roll % op_len — the exact PRNG draw, never 0"
);
}
#[test]
fn fired_fault_on_a_zero_length_op_bounds_to_zero_without_dividing() {
let mut chosen = None;
for seed in 0..10_000u64 {
if fastrand::Rng::with_seed(seed).u32(..).is_multiple_of(8) {
chosen = Some(seed);
break;
}
}
let seed = chosen.expect("a firing first roll exists among the first 10_000 seeds");
let fs = InMemFaultFs::new(seed);
let fault = fs.decide_fault(&InjectionPoint::ReadAt { offset: 0, len: 0 }, 0);
assert_eq!(
fault,
Fault::ShortRead { returned: 0 },
"a zero-length op bounds its fault to 0 via the guard, not via a % 0 panic"
);
}
}