use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use bytes::Bytes;
use io_uring::{IoUring, opcode, types};
use irontide_core::{Id20, Id32};
use irontide_storage::{FileMap, IoUringConfig, IoUringStorageState, TorrentStorage};
use parking_lot::{Mutex, RwLock};
use smallvec::SmallVec;
use tracing::warn;
use crate::disk::DiskConfig;
use crate::disk_backend::{DiskIoBackend, DiskIoStats, PosixDiskIo};
pub(crate) struct IoUringTorrentState {
fds: IoUringStorageState,
file_map: FileMap,
}
pub(crate) struct IoUringDiskIo {
inner: PosixDiskIo,
ring: Mutex<IoUring>,
pub(crate) uring_states: RwLock<HashMap<Id20, IoUringTorrentState>>,
config: IoUringConfig,
uring_write_bytes: AtomicU64,
uring_read_bytes: AtomicU64,
}
impl IoUringDiskIo {
pub fn new(disk_config: &DiskConfig, uring_config: IoUringConfig) -> std::io::Result<Self> {
let ring = IoUring::new(uring_config.sq_depth)?;
Ok(Self {
inner: PosixDiskIo::new(disk_config),
ring: Mutex::new(ring),
uring_states: RwLock::new(HashMap::new()),
config: uring_config,
uring_write_bytes: AtomicU64::new(0),
uring_read_bytes: AtomicU64::new(0),
})
}
fn uring_write(
&self,
state: &IoUringTorrentState,
piece: u32,
begin: u32,
s0: &[u8],
s1: &[u8],
) -> crate::Result<()> {
let total_len = s0.len().saturating_add(s1.len());
let total_u32 = u32::try_from(total_len).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"block length exceeds u32::MAX",
)
})?;
let segments = state.file_map.chunk_segments(piece, begin, total_u32);
let num_segments = segments.len();
if num_segments == 0 {
return Ok(());
}
let mut all_iovecs: Vec<SmallVec<[libc::iovec; 2]>> = Vec::with_capacity(num_segments);
let mut pos: usize = 0;
for seg in &segments {
let seg_len = seg.len as usize;
let seg_end = pos.saturating_add(seg_len);
let mut iovecs: SmallVec<[libc::iovec; 2]> = SmallVec::new();
if seg_end <= s0.len() {
iovecs.push(libc::iovec {
iov_base: s0[pos..seg_end].as_ptr() as *mut libc::c_void,
iov_len: seg_len,
});
} else if pos >= s0.len() {
let s1_start = pos.saturating_sub(s0.len());
let s1_end = seg_end.saturating_sub(s0.len());
iovecs.push(libc::iovec {
iov_base: s1[s1_start..s1_end].as_ptr() as *mut libc::c_void,
iov_len: seg_len,
});
} else {
let from_s0 = &s0[pos..];
let s1_need = seg_len.saturating_sub(from_s0.len());
iovecs.push(libc::iovec {
iov_base: from_s0.as_ptr() as *mut libc::c_void,
iov_len: from_s0.len(),
});
iovecs.push(libc::iovec {
iov_base: s1[..s1_need].as_ptr() as *mut libc::c_void,
iov_len: s1_need,
});
}
pos = seg_end;
all_iovecs.push(iovecs);
}
let mut ring = self.ring.lock();
for (i, seg) in segments.iter().enumerate() {
let iovecs = &all_iovecs[i];
let fd = state.fds.fd(seg.file_index);
let iov_count = u32::try_from(iovecs.len()).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "iovec count overflow")
})?;
let writev = opcode::Writev::new(types::Fd(fd), iovecs.as_ptr().cast(), iov_count)
.offset(seg.file_offset)
.build()
.user_data(i as u64);
loop {
match unsafe { ring.submission().push(&writev) } {
Ok(()) => break,
Err(_) => {
ring.submit().map_err(crate::Error::Io)?;
}
}
}
}
ring.submit_and_wait(num_segments)
.map_err(crate::Error::Io)?;
let mut first_error: Option<std::io::Error> = None;
let mut completed: usize = 0;
for cqe in ring.completion() {
completed = completed.saturating_add(1);
let result = cqe.result();
if result < 0 && first_error.is_none() {
first_error = Some(std::io::Error::from_raw_os_error(-result));
}
}
if completed < num_segments {
return Err(crate::Error::Io(std::io::Error::other(format!(
"io_uring: expected {num_segments} completions, got {completed}"
))));
}
if let Some(err) = first_error {
return Err(crate::Error::Io(err));
}
Ok(())
}
fn uring_read(
&self,
state: &IoUringTorrentState,
piece: u32,
begin: u32,
length: u32,
) -> crate::Result<Vec<u8>> {
let segments = state.file_map.chunk_segments(piece, begin, length);
let num_segments = segments.len();
let mut buf = vec![0u8; length as usize];
if num_segments == 0 {
return Ok(buf);
}
let mut all_iovecs: Vec<SmallVec<[libc::iovec; 1]>> = Vec::with_capacity(num_segments);
let mut pos: usize = 0;
for seg in &segments {
let seg_len = seg.len as usize;
let seg_end = pos.saturating_add(seg_len);
let iovecs: SmallVec<[libc::iovec; 1]> = smallvec::smallvec![libc::iovec {
iov_base: buf[pos..seg_end].as_mut_ptr().cast::<libc::c_void>(),
iov_len: seg_len,
}];
pos = seg_end;
all_iovecs.push(iovecs);
}
let mut ring = self.ring.lock();
for (i, seg) in segments.iter().enumerate() {
let iovecs = &all_iovecs[i];
let fd = state.fds.fd(seg.file_index);
let iov_count = u32::try_from(iovecs.len()).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "iovec count overflow")
})?;
let readv = opcode::Readv::new(types::Fd(fd), iovecs.as_ptr().cast(), iov_count)
.offset(seg.file_offset)
.build()
.user_data(i as u64);
loop {
match unsafe { ring.submission().push(&readv) } {
Ok(()) => break,
Err(_) => {
ring.submit().map_err(crate::Error::Io)?;
}
}
}
}
ring.submit_and_wait(num_segments)
.map_err(crate::Error::Io)?;
let mut first_error: Option<std::io::Error> = None;
let mut completed: usize = 0;
for cqe in ring.completion() {
completed = completed.saturating_add(1);
let result = cqe.result();
if result < 0 && first_error.is_none() {
first_error = Some(std::io::Error::from_raw_os_error(-result));
}
}
if completed < num_segments {
return Err(crate::Error::Io(std::io::Error::other(format!(
"io_uring: expected {num_segments} completions, got {completed}"
))));
}
if let Some(err) = first_error {
return Err(crate::Error::Io(err));
}
Ok(buf)
}
}
impl DiskIoBackend for IoUringDiskIo {
fn name(&self) -> &str {
"io_uring"
}
fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
self.inner.register(info_hash, Arc::clone(&storage));
if let Some((base_dir, file_paths, file_map)) = storage.filesystem_info() {
let mut flags = libc::O_RDWR | libc::O_CREAT;
if self.config.enable_direct_io {
flags |= libc::O_DIRECT;
}
match IoUringStorageState::open_files(base_dir, file_paths, flags) {
Ok(fds) => {
self.uring_states.write().insert(
info_hash,
IoUringTorrentState {
fds,
file_map: file_map.clone(),
},
);
}
Err(e) => {
warn!(
%info_hash,
error = %e,
"io_uring: failed to open fds, falling back to pwritev"
);
}
}
}
}
fn unregister(&self, info_hash: Id20) {
self.uring_states.write().remove(&info_hash);
self.inner.unregister(info_hash);
}
fn write_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
data: Bytes,
flush: bool,
) -> crate::Result<()> {
self.inner.write_chunk(info_hash, piece, begin, data, flush)
}
fn read_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
volatile: bool,
) -> crate::Result<Bytes> {
if !volatile {
return self
.inner
.read_chunk(info_hash, piece, begin, length, volatile);
}
let uring_states = self.uring_states.read();
if let Some(state) = uring_states.get(&info_hash) {
let data = self.uring_read(state, piece, begin, length)?;
drop(uring_states);
self.uring_read_bytes
.fetch_add(u64::from(length), Ordering::Relaxed);
return Ok(Bytes::from(data));
}
drop(uring_states);
self.inner
.read_chunk(info_hash, piece, begin, length, volatile)
}
fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
let uring_states = self.uring_states.read();
if let Some(state) = uring_states.get(&info_hash) {
if self.inner.cached_pieces(info_hash).contains(&piece) {
self.inner.flush_piece(info_hash, piece)?;
}
let piece_size = state.file_map.piece_size(piece);
let data = self.uring_read(state, piece, 0, piece_size)?;
drop(uring_states);
self.uring_read_bytes
.fetch_add(u64::from(piece_size), Ordering::Relaxed);
return Ok(data);
}
drop(uring_states);
self.inner.read_piece(info_hash, piece)
}
fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
self.inner.hash_piece(info_hash, piece, expected)
}
fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
self.inner.hash_piece_v2(info_hash, piece, expected)
}
fn hash_block(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
) -> crate::Result<Id32> {
self.inner.hash_block(info_hash, piece, begin, length)
}
fn clear_piece(&self, info_hash: Id20, piece: u32) {
self.inner.clear_piece(info_hash, piece)
}
fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()> {
self.inner.flush_piece(info_hash, piece)
}
fn flush_all(&self) -> crate::Result<()> {
self.inner.flush_all()
}
fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
self.inner.cached_pieces(info_hash)
}
fn stats(&self) -> DiskIoStats {
let mut s = self.inner.stats();
s.write_bytes = s
.write_bytes
.saturating_add(self.uring_write_bytes.load(Ordering::Relaxed));
s.read_bytes = s
.read_bytes
.saturating_add(self.uring_read_bytes.load(Ordering::Relaxed));
s
}
fn write_block_direct(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
s0: &[u8],
s1: &[u8],
) -> crate::Result<()> {
let uring_states = self.uring_states.read();
if let Some(state) = uring_states.get(&info_hash) {
let result = self.uring_write(state, piece, begin, s0, s1);
drop(uring_states);
if result.is_ok() {
let total = (s0.len().saturating_add(s1.len())) as u64;
self.uring_write_bytes.fetch_add(total, Ordering::Relaxed);
return Ok(());
}
warn!(
%info_hash,
piece,
begin,
error = %result.as_ref().unwrap_err(),
"io_uring write failed, falling back to pwritev"
);
} else {
drop(uring_states);
}
self.inner
.write_block_direct(info_hash, piece, begin, s0, s1)
}
}
#[cfg(test)]
mod tests {
use super::*;
use irontide_core::Lengths;
use irontide_storage::{FilesystemStorage, IoUringConfig, MemoryStorage, PreallocateMode};
use std::path::PathBuf;
use std::sync::Arc;
fn make_hash() -> Id20 {
Id20([0xAB; 20])
}
fn make_hash_n(n: u8) -> Id20 {
let mut b = [0u8; 20];
b[0] = n;
Id20(b)
}
fn test_config() -> crate::disk::DiskConfig {
crate::disk::DiskConfig {
io_threads: 2,
cache_size: 1024 * 1024,
..crate::disk::DiskConfig::default()
}
}
fn test_uring_config() -> IoUringConfig {
IoUringConfig::default()
}
fn make_fs_storage(dir: &std::path::Path, total: u64) -> Arc<dyn TorrentStorage> {
let chunk = total.min(16384) as u32;
let lengths = Lengths::new(total, total, chunk);
Arc::new(
FilesystemStorage::new(
dir,
vec![PathBuf::from("test.bin")],
vec![total],
lengths,
None,
PreallocateMode::None,
false,
)
.expect("FilesystemStorage::new should succeed"),
)
}
fn make_fs_storage_multi(
dir: &std::path::Path,
file_sizes: &[u64],
piece_len: u64,
) -> Arc<dyn TorrentStorage> {
let total: u64 = file_sizes.iter().sum();
let chunk = piece_len.min(16384) as u32;
let lengths = Lengths::new(total, piece_len, chunk);
let paths: Vec<PathBuf> = file_sizes
.iter()
.enumerate()
.map(|(i, _)| PathBuf::from(format!("file{i}.bin")))
.collect();
Arc::new(
FilesystemStorage::new(
dir,
paths,
file_sizes.to_vec(),
lengths,
None,
PreallocateMode::None,
false,
)
.expect("multi-file FilesystemStorage::new should succeed"),
)
}
#[test]
fn uring_config_default() {
let cfg = IoUringConfig::default();
assert_eq!(cfg.sq_depth, 256);
assert!(!cfg.enable_direct_io);
assert_eq!(cfg.batch_threshold, 4);
}
#[test]
fn uring_ring_creation() {
let result = IoUringDiskIo::new(&test_config(), test_uring_config());
assert!(
result.is_ok(),
"IoUringDiskIo::new() should succeed with default config"
);
}
#[test]
fn uring_ring_creation_invalid_depth() {
let cfg = IoUringConfig {
sq_depth: 0,
..IoUringConfig::default()
};
let result = IoUringDiskIo::new(&test_config(), cfg);
assert!(result.is_err(), "sq_depth=0 should fail io_uring setup");
}
#[test]
fn uring_register_filesystem_storage() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash();
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, storage);
let states = backend.uring_states.read();
assert!(
states.contains_key(&ih),
"uring_states should contain the registered info_hash"
);
}
#[test]
fn uring_register_memory_storage() {
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash();
let lengths = Lengths::new(16384, 16384, 16384);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
backend.register(ih, storage);
let states = backend.uring_states.read();
assert!(
!states.contains_key(&ih),
"uring_states should NOT contain memory storage (no filesystem_info)"
);
}
#[test]
fn uring_write_single_block() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(6);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xBEu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write_block_direct should succeed");
let readback = storage
.read_chunk(0, 0, 16384)
.expect("read_chunk should succeed");
assert_eq!(readback, data);
}
#[test]
fn uring_write_vectored_split() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(7);
let total = 16384_u64;
let storage = make_fs_storage(dir.path(), total);
backend.register(ih, Arc::clone(&storage));
let s0: Vec<u8> = (0..10000_u32).map(|i| (i % 251) as u8).collect();
let s1: Vec<u8> = (0..6384_u32).map(|i| ((i + 10000) % 251) as u8).collect();
backend
.write_block_direct(ih, 0, 0, &s0, &s1)
.expect("vectored write should succeed");
let readback = storage
.read_chunk(0, 0, 16384)
.expect("read_chunk should succeed");
let mut expected = s0.clone();
expected.extend_from_slice(&s1);
assert_eq!(readback, expected);
}
#[test]
fn uring_write_multi_file_boundary() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(8);
let storage = make_fs_storage_multi(dir.path(), &[8192, 8192], 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xCDu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("multi-file write should succeed");
let readback = storage
.read_chunk(0, 0, 16384)
.expect("read_chunk should succeed");
assert_eq!(readback, data);
}
#[test]
fn uring_read_delegates_to_inner() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(9);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xAAu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write should succeed");
let readback = backend
.read_chunk(ih, 0, 0, 16384, true)
.expect("read_chunk via backend should succeed");
assert_eq!(&readback[..], &data[..]);
}
#[test]
fn uring_hash_piece_delegates() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(10);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0x42u8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write should succeed");
backend.flush_piece(ih, 0).expect("flush should succeed");
let expected_hash = irontide_core::sha1(&data);
let matches = backend
.hash_piece(ih, 0, &expected_hash)
.expect("hash_piece should succeed");
assert!(matches, "hash should match the written data");
}
#[test]
fn uring_unregister_closes_fds() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(11);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
assert!(backend.uring_states.read().contains_key(&ih));
backend.unregister(ih);
assert!(
!backend.uring_states.read().contains_key(&ih),
"uring_states should be empty after unregister"
);
}
#[test]
fn uring_backend_name() {
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
assert_eq!(backend.name(), "io_uring");
}
#[test]
fn uring_stats_track_writes() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(13);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, storage);
let data = vec![0xFFu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write should succeed");
let stats = backend.stats();
assert!(
stats.write_bytes >= 16384,
"stats.write_bytes ({}) should include the 16384-byte uring write",
stats.write_bytes
);
}
#[test]
fn uring_concurrent_torrents() {
let dir1 = tempfile::tempdir().expect("tempdir 1");
let dir2 = tempfile::tempdir().expect("tempdir 2");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih1 = make_hash_n(141);
let ih2 = make_hash_n(142);
let storage1 = make_fs_storage(dir1.path(), 16384);
let storage2 = make_fs_storage(dir2.path(), 16384);
backend.register(ih1, Arc::clone(&storage1));
backend.register(ih2, Arc::clone(&storage2));
let data1 = vec![0x11u8; 16384];
let data2 = vec![0x22u8; 16384];
backend
.write_block_direct(ih1, 0, 0, &data1, &[])
.expect("write to torrent 1");
backend
.write_block_direct(ih2, 0, 0, &data2, &[])
.expect("write to torrent 2");
let read1 = storage1.read_chunk(0, 0, 16384).expect("read torrent 1");
let read2 = storage2.read_chunk(0, 0, 16384).expect("read torrent 2");
assert_eq!(read1, data1, "torrent 1 data should be 0x11");
assert_eq!(read2, data2, "torrent 2 data should be 0x22");
}
#[test]
fn uring_factory_fallback() {
use crate::disk_backend::create_backend_from_config;
let config = crate::disk::DiskConfig {
storage_mode: irontide_core::StorageMode::IoUring,
io_uring_sq_depth: 0,
..crate::disk::DiskConfig::default()
};
let backend = create_backend_from_config(&config);
assert_eq!(
backend.name(),
"posix",
"should fall back to posix when io_uring init fails"
);
}
#[test]
fn uring_read_single_file() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(16);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xAAu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write_block_direct should succeed");
let readback = backend
.read_chunk(ih, 0, 0, 16384, true)
.expect("volatile read_chunk should succeed");
assert_eq!(&readback[..], &data[..]);
}
#[test]
fn uring_read_multi_file_boundary() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(17);
let storage = make_fs_storage_multi(dir.path(), &[8192, 8192], 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xBBu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("multi-file write should succeed");
let readback = backend
.read_chunk(ih, 0, 0, 16384, true)
.expect("volatile read_chunk across files should succeed");
assert_eq!(&readback[..], &data[..]);
}
#[test]
fn uring_read_piece_single_file() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(18);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xCCu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write_block_direct should succeed");
let piece_data = backend
.read_piece(ih, 0)
.expect("read_piece should succeed");
assert_eq!(piece_data, data);
}
#[test]
fn uring_read_piece_multi_file() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(19);
let storage = make_fs_storage_multi(dir.path(), &[8192, 8192], 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xDDu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("multi-file write should succeed");
let piece_data = backend
.read_piece(ih, 0)
.expect("read_piece across files should succeed");
assert_eq!(piece_data, data);
}
#[test]
fn uring_read_nonvolatile_uses_cache() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(20);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xEEu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write_block_direct should succeed");
let readback = backend
.read_chunk(ih, 0, 0, 16384, false)
.expect("non-volatile read_chunk should succeed");
assert_eq!(&readback[..], &data[..]);
assert_eq!(
backend.uring_read_bytes.load(Ordering::Relaxed),
0,
"non-volatile read should not increment uring_read_bytes"
);
}
#[test]
fn uring_read_volatile_bypasses_cache() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(21);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xFFu8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write_block_direct should succeed");
let _ = backend
.read_chunk(ih, 0, 0, 16384, true)
.expect("volatile read_chunk should succeed");
assert!(
backend.uring_read_bytes.load(Ordering::Relaxed) > 0,
"volatile read should increment uring_read_bytes"
);
}
#[test]
fn uring_read_stats_track_reads() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(22);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = vec![0x42u8; 16384];
backend
.write_block_direct(ih, 0, 0, &data, &[])
.expect("write_block_direct should succeed");
let _ = backend
.read_chunk(ih, 0, 0, 16384, true)
.expect("volatile read_chunk should succeed");
let stats = backend.stats();
assert!(
stats.read_bytes >= 16384,
"stats.read_bytes ({}) should include the 16384-byte uring read",
stats.read_bytes
);
}
#[test]
fn uring_read_piece_after_buffered_write() {
let dir = tempfile::tempdir().expect("tempdir");
let backend =
IoUringDiskIo::new(&test_config(), test_uring_config()).expect("backend creation");
let ih = make_hash_n(23);
let storage = make_fs_storage(dir.path(), 16384);
backend.register(ih, Arc::clone(&storage));
let data = Bytes::from(vec![0xDDu8; 16384]);
backend
.write_chunk(ih, 0, 0, data.clone(), true)
.expect("write_chunk should succeed");
let piece_data = backend
.read_piece(ih, 0)
.expect("read_piece after inner write should succeed");
assert_eq!(piece_data, &data[..]);
}
}