macro_rules! for_each_counter {
($cb:ident) => {
$cb! {
opens => OPENS,
stats => STATS,
preads => PREADS,
pread_bytes => PREAD_BYTES,
art_chunks => ART_CHUNKS,
binary_tag_chunks => BINARY_TAG_CHUNKS,
scan_opens => SCAN_OPENS,
scan_preads => SCAN_PREADS,
scan_bytes_read => SCAN_BYTES_READ,
readahead_hits => READAHEAD_HITS,
readahead_misses => READAHEAD_MISSES,
}
};
}
macro_rules! decl_snapshot {
($($field:ident => $stat:ident),* $(,)?) => {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct Snapshot {
$(pub $field: u64,)*
}
};
}
for_each_counter!(decl_snapshot);
pub use imp::*;
#[cfg(feature = "metrics")]
mod imp {
use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard, OnceLock};
use std::time::Duration;
macro_rules! decl_counters {
($($field:ident => $stat:ident),* $(,)?) => {
$(static $stat: AtomicU64 = AtomicU64::new(0);)*
pub fn snapshot() -> super::Snapshot {
super::Snapshot {
$($field: $stat.load(Ordering::Relaxed),)*
}
}
pub fn reset() {
$($stat.store(0, Ordering::Relaxed);)*
}
};
}
for_each_counter!(decl_counters);
static PREAD_FAULT: OnceLock<Option<Duration>> = OnceLock::new();
static BACKING_FAULT_KIND: AtomicU8 = AtomicU8::new(0);
static BACKING_FAULT_PREFIX: AtomicUsize = AtomicUsize::new(0);
static SEAM_LOCK: Mutex<()> = Mutex::new(());
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackingFault {
Eio,
ShortRead { prefix: usize },
}
#[must_use = "the fault is cleared when this guard drops; bind it to a name"]
pub struct BackingFaultGuard(
#[allow(dead_code)] MutexGuard<'static, ()>,
);
impl Drop for BackingFaultGuard {
fn drop(&mut self) {
BACKING_FAULT_KIND.store(0, Ordering::SeqCst);
}
}
pub fn set_backing_fault(fault: BackingFault) -> BackingFaultGuard {
let lock = SEAM_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
match fault {
BackingFault::Eio => {
BACKING_FAULT_KIND.store(1, Ordering::SeqCst);
}
BackingFault::ShortRead { prefix } => {
BACKING_FAULT_PREFIX.store(prefix, Ordering::SeqCst);
BACKING_FAULT_KIND.store(2, Ordering::SeqCst);
}
}
BackingFaultGuard(lock)
}
pub fn backing_read_exact_at(
f: &std::fs::File,
buf: &mut [u8],
offset: u64,
) -> std::io::Result<()> {
use std::os::unix::fs::FileExt;
match BACKING_FAULT_KIND.load(Ordering::SeqCst) {
1 => return Err(std::io::Error::from_raw_os_error(5)),
2 => {
let p = BACKING_FAULT_PREFIX.load(Ordering::SeqCst).min(buf.len());
f.read_exact_at(&mut buf[..p], offset)?;
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"injected short backing read",
));
}
_ => {}
}
f.read_exact_at(buf, offset)
}
fn fault(var: &'static str, cell: &OnceLock<Option<Duration>>) {
let d = cell.get_or_init(|| {
std::env::var(var)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&us| us > 0)
.map(Duration::from_micros)
});
if let Some(d) = d {
std::thread::sleep(*d);
}
}
pub fn on_open() {
OPENS.fetch_add(1, Ordering::Relaxed);
static C: OnceLock<Option<Duration>> = OnceLock::new();
fault("MUSEFS_FAULT_OPEN_US", &C);
}
pub fn on_stat() {
STATS.fetch_add(1, Ordering::Relaxed);
static C: OnceLock<Option<Duration>> = OnceLock::new();
fault("MUSEFS_FAULT_STAT_US", &C);
}
pub fn on_pread(bytes: u64) {
PREADS.fetch_add(1, Ordering::Relaxed);
PREAD_BYTES.fetch_add(bytes, Ordering::Relaxed);
fault("MUSEFS_FAULT_PREAD_US", &PREAD_FAULT);
}
pub fn set_fault_pread(d: Option<Duration>) {
let first_set = PREAD_FAULT.set(d).is_ok();
debug_assert!(
first_set,
"set_fault_pread must run before the first on_pread"
);
}
pub fn on_art_chunk() {
ART_CHUNKS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_binary_tag_chunk() {
BINARY_TAG_CHUNKS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_scan_open() {
SCAN_OPENS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_scan_read(bytes: u64) {
SCAN_PREADS.fetch_add(1, Ordering::Relaxed);
SCAN_BYTES_READ.fetch_add(bytes, Ordering::Relaxed);
}
pub fn on_readahead_hit() {
READAHEAD_HITS.fetch_add(1, Ordering::Relaxed);
}
pub fn on_readahead_miss() {
READAHEAD_MISSES.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(not(feature = "metrics"))]
mod imp {
#[inline(always)]
pub fn on_open() {}
#[inline(always)]
pub fn on_stat() {}
#[inline(always)]
pub fn on_pread(_bytes: u64) {}
#[inline(always)]
pub fn set_fault_pread(_d: Option<std::time::Duration>) {}
#[inline(always)]
pub fn backing_read_exact_at(
f: &std::fs::File,
buf: &mut [u8],
offset: u64,
) -> std::io::Result<()> {
use std::os::unix::fs::FileExt;
f.read_exact_at(buf, offset)
}
#[inline(always)]
pub fn on_art_chunk() {}
#[inline(always)]
pub fn on_binary_tag_chunk() {}
#[inline(always)]
pub fn on_scan_open() {}
#[inline(always)]
pub fn on_scan_read(_bytes: u64) {}
#[inline(always)]
pub fn on_readahead_hit() {}
#[inline(always)]
pub fn on_readahead_miss() {}
#[inline(always)]
pub fn snapshot() -> super::Snapshot {
super::Snapshot::default()
}
#[inline(always)]
pub fn reset() {}
}
#[cfg(all(test, feature = "metrics"))]
mod tests {
use std::sync::Mutex;
use super::*;
static GLOBAL_STATE_LOCK: Mutex<()> = Mutex::new(());
fn lock_global_state() -> std::sync::MutexGuard<'static, ()> {
GLOBAL_STATE_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
#[test]
fn counters_accumulate_and_reset() {
let _lock = lock_global_state();
reset();
on_open();
on_open();
on_pread(100);
on_art_chunk();
on_binary_tag_chunk();
on_readahead_miss();
let s = snapshot();
assert_eq!(s.opens, 2);
assert_eq!(s.preads, 1);
assert_eq!(s.pread_bytes, 100);
assert_eq!(s.art_chunks, 1);
assert_eq!(s.binary_tag_chunks, 1);
assert_eq!(s.readahead_misses, 1);
assert_eq!(s.readahead_hits, 0);
reset();
assert_eq!(snapshot(), Snapshot::default());
}
#[test]
fn scan_counters_accumulate_and_reset() {
let _lock = lock_global_state();
reset();
on_scan_open();
on_scan_read(4096);
on_scan_read(128);
let s = snapshot();
assert_eq!(s.scan_opens, 1);
assert_eq!(s.scan_preads, 2);
assert_eq!(s.scan_bytes_read, 4096 + 128);
reset();
assert_eq!(snapshot(), Snapshot::default());
}
#[test]
fn backing_fault_injects_eio_then_clears_on_drop() {
use std::io::Write;
use std::os::unix::fs::FileExt;
let _lock = lock_global_state();
let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(b"hello world").unwrap();
let f = std::fs::File::open(tmp.path()).unwrap();
let mut buf = [0u8; 5];
backing_read_exact_at(&f, &mut buf, 0).unwrap();
assert_eq!(&buf, b"hello");
{
let _guard = set_backing_fault(BackingFault::Eio);
let err = backing_read_exact_at(&f, &mut buf, 0).unwrap_err();
assert_eq!(err.raw_os_error(), Some(5), "EIO == 5");
}
let mut buf2 = [0u8; 5];
backing_read_exact_at(&f, &mut buf2, 6).unwrap();
assert_eq!(&buf2, b"world");
let mut direct = [0u8; 5];
f.read_exact_at(&mut direct, 0).unwrap();
assert_eq!(&direct, b"hello");
}
#[test]
fn backing_fault_short_read_fills_prefix_then_errors() {
use std::io::Write;
let _lock = lock_global_state();
let mut tmp = tempfile::NamedTempFile::new().unwrap();
tmp.write_all(b"abcdefgh").unwrap();
let f = std::fs::File::open(tmp.path()).unwrap();
let mut buf = [0u8; 8];
let _guard = set_backing_fault(BackingFault::ShortRead { prefix: 3 });
let err = backing_read_exact_at(&f, &mut buf, 0).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
assert_eq!(
&buf[..3],
b"abc",
"prefix bytes were filled before the fault"
);
}
}