pub use imp::*;
#[cfg(feature = "metrics")]
mod imp {
use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard, OnceLock};
use std::time::Duration;
static OPENS: AtomicU64 = AtomicU64::new(0);
static STATS: AtomicU64 = AtomicU64::new(0);
static PREADS: AtomicU64 = AtomicU64::new(0);
static PREAD_BYTES: AtomicU64 = AtomicU64::new(0);
static ART_CHUNKS: AtomicU64 = AtomicU64::new(0);
static BINARY_TAG_CHUNKS: AtomicU64 = AtomicU64::new(0);
static SCAN_OPENS: AtomicU64 = AtomicU64::new(0);
static SCAN_PREADS: AtomicU64 = AtomicU64::new(0);
static SCAN_BYTES_READ: AtomicU64 = AtomicU64::new(0);
static PREAD_FAULT: OnceLock<Option<Duration>> = OnceLock::new();
static BACKING_FAULT_KIND: AtomicU8 = AtomicU8::new(0);
static BACKING_FAULT_PREFIX: AtomicUsize = AtomicUsize::new(0);
static SEAM_LOCK: Mutex<()> = Mutex::new(());
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackingFault {
Eio,
ShortRead { prefix: usize },
}
#[must_use = "the fault is cleared when this guard drops; bind it to a name"]
pub struct BackingFaultGuard(
#[allow(dead_code)] MutexGuard<'static, ()>,
);
impl Drop for BackingFaultGuard {
fn drop(&mut self) {
BACKING_FAULT_KIND.store(0, Ordering::SeqCst);
}
}
pub fn set_backing_fault(fault: BackingFault) -> BackingFaultGuard {
let lock = SEAM_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
match fault {
BackingFault::Eio => {
BACKING_FAULT_KIND.store(1, Ordering::SeqCst);
}
BackingFault::ShortRead { prefix } => {
BACKING_FAULT_PREFIX.store(prefix, Ordering::SeqCst);
BACKING_FAULT_KIND.store(2, Ordering::SeqCst);
}
}
BackingFaultGuard(lock)
}
pub fn backing_read_exact_at(
f: &std::fs::File,
buf: &mut [u8],
offset: u64,
) -> std::io::Result<()> {
use std::os::unix::fs::FileExt;
match BACKING_FAULT_KIND.load(Ordering::SeqCst) {
1 => return Err(std::io::Error::from_raw_os_error(5)),
2 => {
let p = BACKING_FAULT_PREFIX.load(Ordering::SeqCst).min(buf.len());
f.read_exact_at(&mut buf[..p], offset)?;
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"injected short backing read",
));
}
_ => {}
}
f.read_exact_at(buf, offset)
}
fn fault(var: &'static str, cell: &OnceLock<Option<Duration>>) {
let d = cell.get_or_init(|| {
std::env::var(var)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&us| us > 0)
.map(Duration::from_micros)
});
if let Some(d) = d {
std::thread::sleep(*d);
}
}
pub fn on_open() {
OPENS.fetch_add(1, Ordering::Relaxed);
static C: OnceLock<Option<Duration>> = OnceLock::new();
fault("MUSEFS_FAULT_OPEN_US", &C);
}
pub fn on_stat() {
STATS.fetch_add(1, Ordering::Relaxed);
static C: OnceLock<Option<Duration>> = OnceLock::new();
fault("MUSEFS_FAULT_STAT_US", &C);
}
pub fn on_pread(bytes: u64) {
PREADS.fetch_add(1, Ordering::Relaxed);
PREAD_BYTES.fetch_add(bytes, Ordering::Relaxed);
fault("MUSEFS_FAULT_PREAD_US", &PREAD_FAULT);
}
pub fn set_fault_pread(d: Option<Duration>) {
let first_set = PREAD_FAULT.set(d).is_ok();
debug_assert!(
first_set,
"set_fault_pread must run before the first on_pread"
);
}
pub fn on_art_chunk() {
ART_CHUNKS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_binary_tag_chunk() {
BINARY_TAG_CHUNKS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_scan_open() {
SCAN_OPENS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_scan_read(bytes: u64) {
SCAN_PREADS.fetch_add(1, Ordering::Relaxed);
SCAN_BYTES_READ.fetch_add(bytes, Ordering::Relaxed);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct Snapshot {
pub opens: u64,
pub stats: u64,
pub preads: u64,
pub pread_bytes: u64,
pub art_chunks: u64,
pub binary_tag_chunks: u64,
pub scan_opens: u64,
pub scan_preads: u64,
pub scan_bytes_read: u64,
}
pub fn snapshot() -> Snapshot {
Snapshot {
opens: OPENS.load(Ordering::Relaxed),
stats: STATS.load(Ordering::Relaxed),
preads: PREADS.load(Ordering::Relaxed),
pread_bytes: PREAD_BYTES.load(Ordering::Relaxed),
art_chunks: ART_CHUNKS.load(Ordering::Relaxed),
binary_tag_chunks: BINARY_TAG_CHUNKS.load(Ordering::Relaxed),
scan_opens: SCAN_OPENS.load(Ordering::Relaxed),
scan_preads: SCAN_PREADS.load(Ordering::Relaxed),
scan_bytes_read: SCAN_BYTES_READ.load(Ordering::Relaxed),
}
}
pub fn reset() {
OPENS.store(0, Ordering::Relaxed);
STATS.store(0, Ordering::Relaxed);
PREADS.store(0, Ordering::Relaxed);
PREAD_BYTES.store(0, Ordering::Relaxed);
ART_CHUNKS.store(0, Ordering::Relaxed);
BINARY_TAG_CHUNKS.store(0, Ordering::Relaxed);
SCAN_OPENS.store(0, Ordering::Relaxed);
SCAN_PREADS.store(0, Ordering::Relaxed);
SCAN_BYTES_READ.store(0, Ordering::Relaxed);
}
}
#[cfg(not(feature = "metrics"))]
mod imp {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct Snapshot {
pub opens: u64,
pub stats: u64,
pub preads: u64,
pub pread_bytes: u64,
pub art_chunks: u64,
pub binary_tag_chunks: u64,
pub scan_opens: u64,
pub scan_preads: u64,
pub scan_bytes_read: u64,
}
#[inline(always)]
pub fn on_open() {}
#[inline(always)]
pub fn on_stat() {}
#[inline(always)]
pub fn on_pread(_bytes: u64) {}
#[inline(always)]
pub fn set_fault_pread(_d: Option<std::time::Duration>) {}
#[inline(always)]
pub fn backing_read_exact_at(
f: &std::fs::File,
buf: &mut [u8],
offset: u64,
) -> std::io::Result<()> {
use std::os::unix::fs::FileExt;
f.read_exact_at(buf, offset)
}
#[inline(always)]
pub fn on_art_chunk() {}
#[inline(always)]
pub fn on_binary_tag_chunk() {}
#[inline(always)]
pub fn on_scan_open() {}
#[inline(always)]
pub fn on_scan_read(_bytes: u64) {}
#[inline(always)]
pub fn snapshot() -> Snapshot {
Snapshot::default()
}
#[inline(always)]
pub fn reset() {}
}
#[cfg(all(test, feature = "metrics"))]
mod tests {
use std::sync::Mutex;
use super::*;
static GLOBAL_STATE_LOCK: Mutex<()> = Mutex::new(());
fn lock_global_state() -> std::sync::MutexGuard<'static, ()> {
GLOBAL_STATE_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
#[test]
fn counters_accumulate_and_reset() {
let _lock = lock_global_state();
reset();
on_open();
on_open();
on_pread(100);
on_art_chunk();
on_binary_tag_chunk();
let s = snapshot();
assert_eq!(s.opens, 2);
assert_eq!(s.preads, 1);
assert_eq!(s.pread_bytes, 100);
assert_eq!(s.art_chunks, 1);
assert_eq!(s.binary_tag_chunks, 1);
reset();
assert_eq!(snapshot(), Snapshot::default());
}
#[test]
fn scan_counters_accumulate_and_reset() {
let _lock = lock_global_state();
reset();
on_scan_open();
on_scan_read(4096);
on_scan_read(128);
let s = snapshot();
assert_eq!(s.scan_opens, 1);
assert_eq!(s.scan_preads, 2);
assert_eq!(s.scan_bytes_read, 4096 + 128);
reset();
assert_eq!(snapshot(), Snapshot::default());
}
#[test]
fn backing_fault_injects_eio_then_clears_on_drop() {
use std::io::Write;
use std::os::unix::fs::FileExt;
let _lock = lock_global_state();
let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(b"hello world").unwrap();
let f = std::fs::File::open(tmp.path()).unwrap();
let mut buf = [0u8; 5];
backing_read_exact_at(&f, &mut buf, 0).unwrap();
assert_eq!(&buf, b"hello");
{
let _guard = set_backing_fault(BackingFault::Eio);
let err = backing_read_exact_at(&f, &mut buf, 0).unwrap_err();
assert_eq!(err.raw_os_error(), Some(5), "EIO == 5");
}
let mut buf2 = [0u8; 5];
backing_read_exact_at(&f, &mut buf2, 6).unwrap();
assert_eq!(&buf2, b"world");
let mut direct = [0u8; 5];
f.read_exact_at(&mut direct, 0).unwrap();
assert_eq!(&direct, b"hello");
}
#[test]
fn backing_fault_short_read_fills_prefix_then_errors() {
use std::io::Write;
let _lock = lock_global_state();
let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(b"abcdefgh").unwrap();
let f = std::fs::File::open(tmp.path()).unwrap();
let mut buf = [0u8; 8];
let _guard = set_backing_fault(BackingFault::ShortRead { prefix: 3 });
let err = backing_read_exact_at(&f, &mut buf, 0).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
assert_eq!(
&buf[..3],
b"abc",
"prefix bytes were filled before the fault"
);
}
}