#![allow(
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
reason = "M175: disk I/O sizes bounded by piece_length (u32 by construction in Lengths::new); offset arithmetic stays within file lengths"
)]
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use bytes::Bytes;
use irontide_core::{Id20, Id32};
use irontide_storage::TorrentStorage;
use crate::buffer_pool::BufferPool;
use crate::disk::DiskConfig;
#[derive(Debug, Clone, Default)]
pub struct DiskIoStats {
pub read_bytes: u64,
pub write_bytes: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub write_buffer_bytes: usize,
pub read_cache_bytes: usize,
pub pool_entries: usize,
pub prefetch_count: u64,
pub eviction_count: u64,
pub skeleton_count: u64,
}
pub trait DiskIoBackend: Send + Sync {
fn name(&self) -> &str;
fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>);
fn unregister(&self, info_hash: Id20);
fn write_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
data: Bytes,
flush: bool,
) -> crate::Result<()>;
fn read_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
volatile: bool,
) -> crate::Result<Bytes>;
fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>>;
fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool>;
fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool>;
fn hash_block(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
) -> crate::Result<Id32>;
fn clear_piece(&self, info_hash: Id20, piece: u32);
fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()>;
fn flush_all(&self) -> crate::Result<()>;
fn cached_pieces(&self, info_hash: Id20) -> Vec<u32>;
fn stats(&self) -> DiskIoStats;
fn write_block_direct(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
s0: &[u8],
s1: &[u8],
) -> crate::Result<()>;
}
pub struct DisabledDiskIo;
impl DiskIoBackend for DisabledDiskIo {
fn name(&self) -> &'static str {
"disabled"
}
fn register(&self, _info_hash: Id20, _storage: Arc<dyn TorrentStorage>) {}
fn unregister(&self, _info_hash: Id20) {}
fn write_chunk(
&self,
_info_hash: Id20,
_piece: u32,
_begin: u32,
_data: Bytes,
_flush: bool,
) -> crate::Result<()> {
Ok(())
}
fn read_chunk(
&self,
_info_hash: Id20,
_piece: u32,
_begin: u32,
length: u32,
_volatile: bool,
) -> crate::Result<Bytes> {
Ok(Bytes::from(vec![0u8; length as usize]))
}
fn read_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<Vec<u8>> {
Ok(Vec::new())
}
fn hash_piece(&self, _info_hash: Id20, _piece: u32, _expected: &Id20) -> crate::Result<bool> {
Ok(true)
}
fn hash_piece_v2(
&self,
_info_hash: Id20,
_piece: u32,
_expected: &Id32,
) -> crate::Result<bool> {
Ok(true)
}
fn hash_block(
&self,
_info_hash: Id20,
_piece: u32,
_begin: u32,
_length: u32,
) -> crate::Result<Id32> {
Ok(Id32([0u8; 32]))
}
fn clear_piece(&self, _info_hash: Id20, _piece: u32) {}
fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
Ok(())
}
fn flush_all(&self) -> crate::Result<()> {
Ok(())
}
fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
vec![]
}
fn stats(&self) -> DiskIoStats {
DiskIoStats::default()
}
fn write_block_direct(
&self,
_info_hash: Id20,
_piece: u32,
_begin: u32,
_s0: &[u8],
_s1: &[u8],
) -> crate::Result<()> {
Ok(())
}
}
pub struct PosixDiskIo {
storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
pool: Mutex<BufferPool>,
stats: Mutex<DiskIoStats>,
}
impl PosixDiskIo {
#[must_use]
pub fn new(config: &DiskConfig) -> Self {
let mut pool = BufferPool::with_capacity(config.buffer_pool_capacity);
pool.set_mlock(config.enable_mlock);
Self {
storages: RwLock::new(HashMap::new()),
pool: Mutex::new(pool),
stats: Mutex::new(DiskIoStats::default()),
}
}
fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
self.storages
.read()
.get(&info_hash)
.cloned()
.ok_or_else(|| {
crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"torrent not registered",
)))
})
}
}
impl DiskIoBackend for PosixDiskIo {
fn name(&self) -> &'static str {
"posix"
}
fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
self.storages.write().insert(info_hash, storage);
}
fn unregister(&self, info_hash: Id20) {
self.storages.write().remove(&info_hash);
self.pool.lock().clear_torrent(info_hash);
}
fn write_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
data: Bytes,
flush: bool,
) -> crate::Result<()> {
let len = data.len();
if flush {
let storage = self.get_storage(info_hash)?;
storage.write_chunk(piece, begin, &data)?;
self.stats.lock().write_bytes += len as u64;
return Ok(());
}
let key = (info_hash, piece);
self.pool.lock().write_block(key, begin, data, 0);
self.stats.lock().write_bytes += len as u64;
Ok(())
}
fn read_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
volatile: bool,
) -> crate::Result<Bytes> {
let key = (info_hash, piece);
if !volatile {
let mut pool = self.pool.lock();
if let Some(data) = pool.read_block(key, begin, length as usize) {
drop(pool);
self.stats.lock().cache_hits += 1;
return Ok(data);
}
}
self.stats.lock().cache_misses += 1;
let storage = self.get_storage(info_hash)?;
if !volatile {
if let Ok(piece_data) = storage.read_piece(piece) {
let piece_bytes = Bytes::from(piece_data);
self.stats.lock().read_bytes += piece_bytes.len() as u64;
let mut pool = self.pool.lock();
pool.prefetch_piece(key, piece_bytes.clone());
drop(pool);
let end = (begin as usize).saturating_add(length as usize);
if end <= piece_bytes.len() {
return Ok(piece_bytes.slice(begin as usize..end));
}
} else {
}
}
let data = storage.read_chunk(piece, begin, length)?;
let bytes = Bytes::from(data);
self.stats.lock().read_bytes += bytes.len() as u64;
Ok(bytes)
}
fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
self.flush_piece(info_hash, piece)?;
let storage = self.get_storage(info_hash)?;
let data = storage.read_piece(piece)?;
self.stats.lock().read_bytes += data.len() as u64;
Ok(data)
}
fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
let key = (info_hash, piece);
let cached_data = {
let mut pool = self.pool.lock();
pool.take_all_blocks(key)
};
if let Some(data) = cached_data {
let hash = irontide_core::sha1(&data);
if hash == *expected {
let storage = self.get_storage(info_hash)?;
storage.write_chunk(piece, 0, &data)?;
self.stats.lock().write_bytes += data.len() as u64;
let mut pool = self.pool.lock();
pool.promote_to_cached(key, Bytes::from(data));
return Ok(true);
}
return Ok(false);
}
self.flush_piece(info_hash, piece)?;
let storage = self.get_storage(info_hash)?;
Ok(storage.verify_piece(piece, expected)?)
}
fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
let key = (info_hash, piece);
let cached_data = {
let mut pool = self.pool.lock();
pool.take_all_blocks(key)
};
if let Some(data) = cached_data {
let hash = irontide_core::sha256(&data);
if hash == *expected {
let storage = self.get_storage(info_hash)?;
storage.write_chunk(piece, 0, &data)?;
self.stats.lock().write_bytes += data.len() as u64;
let mut pool = self.pool.lock();
pool.promote_to_cached(key, Bytes::from(data));
return Ok(true);
}
return Ok(false);
}
self.flush_piece(info_hash, piece)?;
let storage = self.get_storage(info_hash)?;
Ok(storage.verify_piece_v2(piece, expected)?)
}
fn hash_block(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
) -> crate::Result<Id32> {
self.flush_piece(info_hash, piece)?;
let storage = self.get_storage(info_hash)?;
Ok(storage.hash_block(piece, begin, length)?)
}
fn clear_piece(&self, info_hash: Id20, piece: u32) {
self.pool.lock().clear_piece((info_hash, piece));
}
fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()> {
let blocks = {
let mut pool = self.pool.lock();
match pool.flush_piece((info_hash, piece)) {
Some(b) => b,
None => return Ok(()),
}
};
let storage = self.get_storage(info_hash)?;
for (begin, data) in blocks {
storage.write_chunk(piece, begin, &data)?;
}
Ok(())
}
fn flush_all(&self) -> crate::Result<()> {
let all_blocks = {
let mut pool = self.pool.lock();
pool.flush_all()
};
for ((info_hash, piece), blocks) in all_blocks {
let storage = self.get_storage(info_hash)?;
for (begin, data) in blocks {
storage.write_chunk(piece, begin, &data)?;
}
}
Ok(())
}
fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
let pool = self.pool.lock();
pool.hot_pieces(info_hash)
}
fn stats(&self) -> DiskIoStats {
let mut s = self.stats.lock().clone();
let pool = self.pool.lock();
let ps = pool.stats();
s.write_buffer_bytes = ps.write_buffer_bytes;
s.read_cache_bytes = ps.read_cache_bytes;
s.pool_entries = ps.total_entries;
s.prefetch_count = ps.prefetch_count;
s.eviction_count = ps.eviction_count;
s.skeleton_count = ps.skeleton_count;
s
}
fn write_block_direct(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
s0: &[u8],
s1: &[u8],
) -> crate::Result<()> {
let storage = self.get_storage(info_hash)?;
storage.write_chunk_vectored(piece, begin, s0, s1)?;
let total = (s0.len() + s1.len()) as u64;
self.stats.lock().write_bytes += total;
Ok(())
}
}
pub struct MmapDiskIo {
storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
stats: Mutex<DiskIoStats>,
}
impl MmapDiskIo {
#[must_use]
pub fn new() -> Self {
Self {
storages: RwLock::new(HashMap::new()),
stats: Mutex::new(DiskIoStats::default()),
}
}
fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
self.storages
.read()
.get(&info_hash)
.cloned()
.ok_or_else(|| {
crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"torrent not registered",
)))
})
}
}
impl Default for MmapDiskIo {
fn default() -> Self {
Self::new()
}
}
impl DiskIoBackend for MmapDiskIo {
fn name(&self) -> &'static str {
"mmap"
}
fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
self.storages.write().insert(info_hash, storage);
}
fn unregister(&self, info_hash: Id20) {
self.storages.write().remove(&info_hash);
}
fn write_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
data: Bytes,
_flush: bool,
) -> crate::Result<()> {
let len = data.len();
let storage = self.get_storage(info_hash)?;
storage.write_chunk(piece, begin, &data)?;
self.stats.lock().write_bytes += len as u64;
Ok(())
}
fn read_chunk(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
_volatile: bool,
) -> crate::Result<Bytes> {
let storage = self.get_storage(info_hash)?;
let data = storage.read_chunk(piece, begin, length)?;
let bytes = Bytes::from(data);
self.stats.lock().read_bytes += bytes.len() as u64;
Ok(bytes)
}
fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
let storage = self.get_storage(info_hash)?;
let data = storage.read_piece(piece)?;
self.stats.lock().read_bytes += data.len() as u64;
Ok(data)
}
fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
let storage = self.get_storage(info_hash)?;
Ok(storage.verify_piece(piece, expected)?)
}
fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
let storage = self.get_storage(info_hash)?;
Ok(storage.verify_piece_v2(piece, expected)?)
}
fn hash_block(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
) -> crate::Result<Id32> {
let storage = self.get_storage(info_hash)?;
Ok(storage.hash_block(piece, begin, length)?)
}
fn clear_piece(&self, _info_hash: Id20, _piece: u32) {
}
fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
Ok(())
}
fn flush_all(&self) -> crate::Result<()> {
Ok(())
}
fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
vec![]
}
fn stats(&self) -> DiskIoStats {
self.stats.lock().clone()
}
fn write_block_direct(
&self,
info_hash: Id20,
piece: u32,
begin: u32,
s0: &[u8],
s1: &[u8],
) -> crate::Result<()> {
let storage = self.get_storage(info_hash)?;
storage.write_chunk_vectored(piece, begin, s0, s1)?;
let total = (s0.len() + s1.len()) as u64;
self.stats.lock().write_bytes += total;
Ok(())
}
}
#[must_use]
pub fn create_backend_from_config(config: &DiskConfig) -> Arc<dyn DiskIoBackend> {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
if config.storage_mode == irontide_core::StorageMode::IoUring {
let uring_config = irontide_storage::IoUringConfig {
sq_depth: config.io_uring_sq_depth,
enable_direct_io: config.io_uring_direct_io,
batch_threshold: config.io_uring_batch_threshold,
};
match crate::io_uring_backend::IoUringDiskIo::new(config, uring_config) {
Ok(backend) => return Arc::new(backend),
Err(e) => {
tracing::warn!("io_uring init failed, falling back to posix: {e}");
}
}
}
#[cfg(all(target_os = "windows", feature = "iocp"))]
if config.storage_mode == irontide_core::StorageMode::Iocp {
let iocp_config = irontide_storage::IocpConfig {
concurrent_threads: config.iocp_concurrent_threads,
enable_direct_io: config.iocp_direct_io,
};
match crate::iocp_backend::IocpDiskIo::new(config, iocp_config) {
Ok(backend) => return Arc::new(backend),
Err(e) => {
tracing::warn!("IOCP init failed, falling back to posix: {e}");
}
}
}
if config.storage_mode == irontide_core::StorageMode::Mmap {
Arc::new(MmapDiskIo::new())
} else {
Arc::new(PosixDiskIo::new(config))
}
}
#[cfg(test)]
mod tests {
use super::*;
use irontide_core::Lengths;
use irontide_storage::MemoryStorage;
fn make_hash() -> Id20 {
Id20([0xAB; 20])
}
fn make_hash_n(n: u8) -> Id20 {
let mut b = [0u8; 20];
b[0] = n;
Id20(b)
}
fn test_config() -> DiskConfig {
DiskConfig {
io_threads: 2,
cache_size: 1024 * 1024, ..DiskConfig::default()
}
}
fn make_storage(piece_size: u64) -> Arc<dyn TorrentStorage> {
let chunk = piece_size.min(16384) as u32;
let lengths = Lengths::new(piece_size, piece_size, chunk);
Arc::new(MemoryStorage::new(lengths))
}
fn make_storage_full(total: u64, piece_len: u64, chunk_size: u32) -> Arc<dyn TorrentStorage> {
let lengths = Lengths::new(total, piece_len, chunk_size);
Arc::new(MemoryStorage::new(lengths))
}
#[test]
fn disabled_backend_name() {
let backend = DisabledDiskIo;
assert_eq!(backend.name(), "disabled");
}
#[test]
fn disabled_backend_write_succeeds() {
let backend = DisabledDiskIo;
let result =
backend.write_chunk(make_hash(), 0, 0, Bytes::from_static(&[1, 2, 3, 4]), false);
assert!(result.is_ok());
}
#[test]
fn disabled_backend_read_returns_zeroed() {
let backend = DisabledDiskIo;
let length = 16384u32;
let data = backend
.read_chunk(make_hash(), 0, 0, length, false)
.unwrap();
assert_eq!(data.len(), length as usize);
assert!(data.iter().all(|&b| b == 0));
}
#[test]
fn disabled_backend_hash_always_passes() {
let backend = DisabledDiskIo;
let expected = Id20([0xFF; 20]);
let result = backend.hash_piece(make_hash(), 42, &expected).unwrap();
assert!(result);
}
#[test]
fn disabled_backend_hash_v2_always_passes() {
let backend = DisabledDiskIo;
let expected = Id32([0xFF; 32]);
let result = backend.hash_piece_v2(make_hash(), 42, &expected).unwrap();
assert!(result);
}
#[test]
fn disabled_backend_stats_default() {
let backend = DisabledDiskIo;
let stats = backend.stats();
assert_eq!(stats.read_bytes, 0);
assert_eq!(stats.write_bytes, 0);
assert_eq!(stats.cache_hits, 0);
assert_eq!(stats.cache_misses, 0);
assert_eq!(stats.write_buffer_bytes, 0);
}
#[test]
fn disabled_backend_cached_pieces_empty() {
let backend = DisabledDiskIo;
let pieces = backend.cached_pieces(make_hash());
assert!(pieces.is_empty());
}
#[test]
fn posix_backend_name() {
let backend = PosixDiskIo::new(&test_config());
assert_eq!(backend.name(), "posix");
}
#[test]
fn posix_register_unregister() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(1);
let storage = make_storage(100);
backend.register(ih, storage);
assert!(backend.read_chunk(ih, 0, 0, 10, false).is_ok());
backend.unregister(ih);
assert!(backend.read_chunk(ih, 0, 0, 10, false).is_err());
}
#[test]
fn posix_write_and_read_flush() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(2);
let storage = make_storage(100);
backend.register(ih, storage);
let data = vec![42u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
.unwrap();
let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
assert_eq!(&read[..], &data[..]);
}
#[test]
fn posix_write_buffered_then_read() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(3);
let storage = make_storage(100);
backend.register(ih, storage);
let data = vec![99u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
.unwrap();
let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
assert_eq!(&read[..], &data[..]);
let stats = backend.stats();
assert!(stats.cache_hits >= 1);
}
#[test]
fn posix_read_cache_hit() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(4);
let storage = make_storage(100);
backend.register(ih, storage);
let data = vec![7u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
.unwrap();
let r1 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
assert_eq!(&r1[..], &data[..]);
let s1 = backend.stats();
assert_eq!(s1.cache_misses, 1);
let r2 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
assert_eq!(&r2[..], &data[..]);
let s2 = backend.stats();
assert_eq!(s2.cache_hits, 1);
}
#[test]
fn posix_hash_piece_correct() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(5);
let storage = make_storage(50);
backend.register(ih, storage);
let data = vec![9u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
.unwrap();
let expected = irontide_core::sha1(&data);
assert!(backend.hash_piece(ih, 0, &expected).unwrap());
}
#[test]
fn posix_hash_piece_wrong() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(6);
let storage = make_storage(50);
backend.register(ih, storage);
let data = vec![9u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data), true)
.unwrap();
let wrong = Id20([0xFF; 20]);
assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
}
#[test]
fn posix_hash_piece_v2() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(7);
let storage = make_storage(16384);
backend.register(ih, storage);
let data = vec![0xABu8; 16384];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
.unwrap();
let expected = irontide_core::sha256(&data);
assert!(backend.hash_piece_v2(ih, 0, &expected).unwrap());
}
#[test]
fn posix_clear_piece_drops_buffer() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(8);
let storage = make_storage(100);
backend.register(ih, storage);
let data = vec![55u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data), false)
.unwrap();
assert!(backend.stats().write_buffer_bytes > 0);
backend.clear_piece(ih, 0);
assert_eq!(backend.stats().write_buffer_bytes, 0);
}
#[test]
fn posix_cached_pieces() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(9);
let storage = make_storage_full(100, 50, 25);
backend.register(ih, storage);
let data = vec![1u8; 25];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
.unwrap();
backend
.write_chunk(ih, 1, 0, Bytes::from(data), true)
.unwrap();
backend.read_chunk(ih, 0, 0, 25, false).unwrap();
backend.read_chunk(ih, 0, 0, 25, false).unwrap();
let cached = backend.cached_pieces(ih);
assert!(cached.contains(&0));
assert!(!cached.contains(&1));
}
#[test]
fn mmap_backend_name() {
let backend = MmapDiskIo::new();
assert_eq!(backend.name(), "mmap");
}
#[test]
fn mmap_write_and_read() {
let backend = MmapDiskIo::new();
let ih = make_hash_n(10);
let storage = make_storage(100);
backend.register(ih, storage);
let data = vec![42u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
.unwrap();
let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
assert_eq!(&read[..], &data[..]);
}
#[test]
fn mmap_cached_pieces_always_empty() {
let backend = MmapDiskIo::new();
let ih = make_hash_n(11);
let storage = make_storage(100);
backend.register(ih, storage);
backend
.write_chunk(ih, 0, 0, Bytes::from(vec![1u8; 50]), false)
.unwrap();
backend.read_chunk(ih, 0, 0, 50, false).unwrap();
assert!(backend.cached_pieces(ih).is_empty());
}
#[test]
fn mmap_stats_track_io() {
let backend = MmapDiskIo::new();
let ih = make_hash_n(12);
let storage = make_storage(100);
backend.register(ih, storage);
backend
.write_chunk(ih, 0, 0, Bytes::from(vec![1u8; 50]), false)
.unwrap();
let stats = backend.stats();
assert_eq!(stats.write_bytes, 50);
backend.read_chunk(ih, 0, 0, 50, false).unwrap();
let stats = backend.stats();
assert_eq!(stats.read_bytes, 50);
}
#[test]
fn read_piece_returns_full_piece() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(20);
let chunk_size = 16384u32;
let piece_size = u64::from(chunk_size) * 2; let storage = make_storage_full(piece_size, piece_size, chunk_size);
backend.register(ih, storage);
let chunk0 = vec![0xAAu8; chunk_size as usize];
let chunk1 = vec![0xBBu8; chunk_size as usize];
backend
.write_chunk(ih, 0, 0, Bytes::from(chunk0.clone()), false)
.expect("write chunk 0");
backend
.write_chunk(ih, 0, chunk_size, Bytes::from(chunk1.clone()), false)
.expect("write chunk 1");
let piece_data = backend.read_piece(ih, 0).expect("read_piece");
assert_eq!(piece_data.len(), piece_size as usize);
assert_eq!(&piece_data[..chunk_size as usize], &chunk0[..]);
assert_eq!(&piece_data[chunk_size as usize..], &chunk1[..]);
let stats = backend.stats();
assert!(stats.read_bytes >= piece_size);
}
#[test]
fn posix_hash_from_cache_pass() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(30);
let storage = make_storage(100);
backend.register(ih, storage);
let d1 = vec![0xAA_u8; 50];
let d2 = vec![0xBB_u8; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
.unwrap();
backend
.write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
.unwrap();
let mut full = d1;
full.extend_from_slice(&d2);
let expected = irontide_core::sha1(&full);
assert!(backend.hash_piece(ih, 0, &expected).unwrap());
let piece_data = backend.read_piece(ih, 0).unwrap();
assert_eq!(piece_data.len(), 100);
backend.read_chunk(ih, 0, 0, 50, false).unwrap();
let cached = backend.cached_pieces(ih);
assert!(cached.contains(&0));
}
#[test]
fn posix_hash_from_cache_fail() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(31);
let storage = make_storage(50);
backend.register(ih, storage);
backend
.write_chunk(ih, 0, 0, Bytes::from(vec![0xCC_u8; 50]), false)
.unwrap();
let wrong = Id20([0xFF; 20]);
assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
assert_eq!(backend.stats().write_buffer_bytes, 0);
}
#[test]
fn factory_creates_mmap_for_mmap_mode() {
let config = DiskConfig {
storage_mode: irontide_core::StorageMode::Mmap,
..DiskConfig::default()
};
let backend = create_backend_from_config(&config);
assert_eq!(backend.name(), "mmap");
}
#[test]
fn factory_creates_posix_for_auto_mode() {
let config = DiskConfig {
storage_mode: irontide_core::StorageMode::Auto,
..DiskConfig::default()
};
let backend = create_backend_from_config(&config);
assert_eq!(backend.name(), "posix");
}
#[test]
fn piece_completion_flows_to_hash_pool() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(40);
let storage = make_storage(100);
backend.register(ih, storage);
let d1 = vec![0xAA; 50];
let d2 = vec![0xBB; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
.expect("write block 0");
backend
.write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
.expect("write block 1");
let mut full = d1;
full.extend_from_slice(&d2);
let expected_hash = irontide_core::sha1(&full);
assert!(
backend
.hash_piece(ih, 0, &expected_hash)
.expect("hash_piece"),
"hash should match"
);
let piece = backend.read_piece(ih, 0).expect("read_piece after hash");
assert_eq!(piece, full);
let _ = backend
.read_chunk(ih, 0, 0, 100, false)
.expect("second read");
let cached = backend.cached_pieces(ih);
assert!(
cached.contains(&0),
"piece 0 should be in hot pieces after two accesses"
);
}
#[test]
fn back_pressure_flush_reaches_disk() {
let config = DiskConfig {
buffer_pool_capacity: 64 * 1024,
..test_config()
};
let backend = PosixDiskIo::new(&config);
let ih = make_hash_n(41);
let piece_size = 16384_u64;
let storage = make_storage_full(piece_size * 10, piece_size, 16384);
backend.register(ih, storage);
for p in 0..8_u32 {
let data = vec![p as u8; piece_size as usize];
backend
.write_chunk(ih, p, 0, Bytes::from(data), false)
.unwrap_or_else(|e| panic!("write piece {p}: {e}"));
backend
.flush_piece(ih, p)
.unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
}
for p in 0..8_u32 {
let piece = backend
.read_piece(ih, p)
.unwrap_or_else(|e| panic!("read piece {p}: {e}"));
assert_eq!(piece.len(), piece_size as usize);
assert!(
piece.iter().all(|&b| b == p as u8),
"piece {p} data mismatch"
);
}
for p in 0..6_u32 {
let data = vec![(p + 10) as u8; piece_size as usize];
backend
.write_chunk(ih, p, 0, Bytes::from(data), false)
.unwrap_or_else(|e| panic!("overwrite piece {p}: {e}"));
}
let stats = backend.stats();
assert!(
stats.skeleton_count > 0 || stats.eviction_count > 0,
"should see evictions when exceeding budget: skeleton={}, eviction={}",
stats.skeleton_count,
stats.eviction_count
);
}
#[test]
fn prefetch_then_suggest_then_serve() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(42);
let storage = make_storage(100);
backend.register(ih, storage);
let data = vec![0xCC; 100];
backend
.write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
.expect("write flushed");
let r1 = backend
.read_chunk(ih, 0, 0, 100, false)
.expect("first read");
assert_eq!(&r1[..], &data[..]);
let r2 = backend
.read_chunk(ih, 0, 0, 100, false)
.expect("second read");
assert_eq!(&r2[..], &data[..]);
let cached = backend.cached_pieces(ih);
assert!(cached.contains(&0), "piece 0 should be hot after two reads");
let s1 = backend.stats();
let _ = backend
.read_chunk(ih, 0, 0, 100, false)
.expect("third read");
let s2 = backend.stats();
assert!(
s2.cache_hits > s1.cache_hits,
"third read should be a cache hit: before={}, after={}",
s1.cache_hits,
s2.cache_hits
);
}
#[test]
fn full_download_verify_cycle() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(43);
let chunk = 16384_u32;
let piece_len = u64::from(chunk) * 4; let total = piece_len * 3; let storage = make_storage_full(total, piece_len, chunk);
backend.register(ih, storage);
for p in 0..3_u32 {
let mut full_piece = Vec::new();
for b in 0..4_u32 {
let block = vec![(p * 4 + b) as u8; chunk as usize];
full_piece.extend_from_slice(&block);
backend
.write_chunk(ih, p, b * chunk, Bytes::from(block), false)
.unwrap_or_else(|e| panic!("write piece {p} block {b}: {e}"));
}
let expected = irontide_core::sha1(&full_piece);
assert!(
backend
.hash_piece(ih, p, &expected)
.unwrap_or_else(|e| panic!("hash piece {p}: {e}")),
"piece {p} hash should pass"
);
}
for p in 0..3_u32 {
let data = backend
.read_piece(ih, p)
.unwrap_or_else(|e| panic!("read piece {p}: {e}"));
assert_eq!(data.len(), piece_len as usize);
for b in 0..4_u32 {
let expected_byte = (p * 4 + b) as u8;
let start = (b * chunk) as usize;
let end = start + chunk as usize;
assert!(
data[start..end].iter().all(|&x| x == expected_byte),
"piece {p} block {b}: expected 0x{expected_byte:02X}"
);
}
}
}
#[test]
fn skeleton_eviction_then_completion() {
let config = DiskConfig {
buffer_pool_capacity: 32 * 1024,
..test_config()
};
let backend = PosixDiskIo::new(&config);
let ih = make_hash_n(44);
let storage = make_storage_full(16384 * 4, 16384, 16384);
backend.register(ih, storage);
for p in 0..4_u32 {
let data = vec![p as u8; 16384];
backend
.write_chunk(ih, p, 0, Bytes::from(data), false)
.unwrap_or_else(|e| panic!("write piece {p}: {e}"));
}
for p in 0..4_u32 {
backend
.flush_piece(ih, p)
.unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
let piece = backend
.read_piece(ih, p)
.unwrap_or_else(|e| panic!("read piece {p}: {e}"));
assert_eq!(piece.len(), 16384);
}
let stats = backend.stats();
assert!(
stats.skeleton_count > 0 || stats.eviction_count > 0,
"should have evictions under pressure: skeleton={}, eviction={}",
stats.skeleton_count,
stats.eviction_count
);
}
#[test]
fn hash_from_cache_fail_no_disk_write() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(46);
let storage = make_storage(50);
backend.register(ih, storage);
let data = vec![0xDD; 50];
backend
.write_chunk(ih, 0, 0, Bytes::from(data), false)
.expect("buffered write");
let wrong_hash = Id20([0xFF; 20]);
assert!(
!backend
.hash_piece(ih, 0, &wrong_hash)
.expect("hash_piece should not error"),
"hash should fail with wrong expected value"
);
let piece = backend.read_piece(ih, 0).expect("read_piece");
assert!(
piece.iter().all(|&b| b == 0),
"failed piece should not be written to disk"
);
}
#[test]
fn seeding_throughput_with_cache() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(47);
let chunk = 16384_u32;
let piece_len = u64::from(chunk);
let num_pieces = 64_u32;
let storage = make_storage_full(piece_len * u64::from(num_pieces), piece_len, chunk);
backend.register(ih, storage);
for p in 0..num_pieces {
let data = vec![p as u8; chunk as usize];
backend
.write_chunk(ih, p, 0, Bytes::from(data), true)
.unwrap_or_else(|e| panic!("write piece {p}: {e}"));
}
for p in 0..num_pieces {
backend
.read_chunk(ih, p, 0, chunk, false)
.unwrap_or_else(|e| panic!("first read piece {p}: {e}"));
}
let stats_before = backend.stats();
for p in 0..num_pieces {
backend
.read_chunk(ih, p, 0, chunk, false)
.unwrap_or_else(|e| panic!("second read piece {p}: {e}"));
}
let stats_after = backend.stats();
let hits = stats_after
.cache_hits
.saturating_sub(stats_before.cache_hits);
let total = u64::from(num_pieces);
let hit_rate = hits as f64 / total as f64;
assert!(
hit_rate > 0.8,
"cache hit rate {hit_rate:.1} should be >80% (hits={hits}, total={total})"
);
}
#[test]
fn write_block_direct_contiguous() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(60);
let storage = make_storage(100);
backend.register(ih, Arc::clone(&storage));
let data = vec![0xCDu8; 50];
backend.write_block_direct(ih, 0, 0, &data, &[]).unwrap();
let read = storage.read_chunk(0, 0, 50).unwrap();
assert_eq!(read, data);
let stats = backend.stats();
assert_eq!(stats.write_bytes, 50);
}
#[test]
fn write_block_direct_split() {
let backend = PosixDiskIo::new(&test_config());
let ih = make_hash_n(61);
let storage = make_storage(100);
backend.register(ih, Arc::clone(&storage));
let s0: Vec<u8> = (0..30).collect();
let s1: Vec<u8> = (30..50).collect();
backend.write_block_direct(ih, 0, 10, &s0, &s1).unwrap();
let read = storage.read_chunk(0, 10, 50).unwrap();
let expected: Vec<u8> = (0..50).collect();
assert_eq!(read, expected);
let stats = backend.stats();
assert_eq!(stats.write_bytes, 50);
}
}