#![allow(
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
reason = "M175: disk job sizes bounded by piece_length (u32 by construction in Lengths::new)"
)]
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use bitflags::bitflags;
use bytes::Bytes;
use irontide_core::{Id20, Id32};
use irontide_storage::TorrentStorage;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DiskJobFlags: u8 {
const FORCE_COPY = 0x01;
const SEQUENTIAL = 0x02;
const VOLATILE_READ = 0x04;
const FLUSH_PIECE = 0x08;
}
}
#[derive(Debug)]
pub struct DiskWriteError {
pub piece: u32,
pub begin: u32,
pub error: irontide_storage::Error,
}
#[derive(Debug)]
pub struct VerifyResult {
pub piece: u32,
pub passed: bool,
}
pub(crate) struct WriteJob {
piece: u32,
begin: u32,
data: Bytes,
}
pub(crate) struct DiskWriteState {
tx: mpsc::Sender<WriteJob>,
pending: Mutex<HashMap<u32, u32>>,
notify: tokio::sync::Notify,
lock_timing: crate::timed_lock::LockTimingSettings,
}
pub(crate) enum DiskJob {
Register {
info_hash: Id20,
storage: Arc<dyn TorrentStorage>,
reply: oneshot::Sender<()>,
},
Unregister {
info_hash: Id20,
},
Write {
info_hash: Id20,
piece: u32,
begin: u32,
data: Bytes,
flags: DiskJobFlags,
reply: oneshot::Sender<irontide_storage::Result<()>>,
},
Read {
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
flags: DiskJobFlags,
reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
},
Hash {
info_hash: Id20,
piece: u32,
expected: Id20,
#[allow(dead_code)]
flags: DiskJobFlags,
reply: oneshot::Sender<irontide_storage::Result<bool>>,
},
HashV2 {
info_hash: Id20,
piece: u32,
expected: Id32,
#[allow(dead_code)]
flags: DiskJobFlags,
reply: oneshot::Sender<irontide_storage::Result<bool>>,
},
BlockHash {
info_hash: Id20,
piece: u32,
begin: u32,
length: u32,
#[allow(dead_code)]
flags: DiskJobFlags,
reply: oneshot::Sender<irontide_storage::Result<Id32>>,
},
ClearPiece {
info_hash: Id20,
piece: u32,
},
FlushWriteBuffer {
info_hash: Id20,
piece: u32,
reply: oneshot::Sender<irontide_storage::Result<()>>,
},
CachedPieces {
info_hash: Id20,
reply: oneshot::Sender<Vec<u32>>,
},
FlushAll {
reply: oneshot::Sender<irontide_storage::Result<()>>,
},
Shutdown {
reply: oneshot::Sender<()>,
},
}
#[derive(Debug, Clone)]
pub struct DiskConfig {
pub io_threads: usize,
pub storage_mode: irontide_core::StorageMode,
pub cache_size: usize,
pub write_cache_ratio: f32,
pub channel_capacity: usize,
pub buffer_pool_capacity: usize,
pub enable_mlock: bool,
pub lock_warn_threshold_ms: u64,
pub io_uring_sq_depth: u32,
pub io_uring_direct_io: bool,
pub filesystem_direct_io: bool,
pub io_uring_batch_threshold: usize,
pub iocp_concurrent_threads: u32,
pub iocp_direct_io: bool,
}
impl Default for DiskConfig {
fn default() -> Self {
Self {
io_threads: 4,
storage_mode: irontide_core::StorageMode::Auto,
cache_size: 16 * 1024 * 1024,
write_cache_ratio: 0.5,
channel_capacity: 512,
buffer_pool_capacity: 64 * 1024 * 1024,
enable_mlock: cfg!(unix),
lock_warn_threshold_ms: 50,
io_uring_sq_depth: 256,
io_uring_direct_io: false,
filesystem_direct_io: false,
io_uring_batch_threshold: 4,
iocp_concurrent_threads: 0,
iocp_direct_io: false,
}
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct DiskStats {
pub read_bytes: u64,
pub write_bytes: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub write_buffer_bytes: usize,
pub queued_jobs: usize,
#[serde(default)]
pub read_cache_bytes: usize,
#[serde(default)]
pub pool_entries: usize,
#[serde(default)]
pub prefetch_count: u64,
#[serde(default)]
pub eviction_count: u64,
#[serde(default)]
pub skeleton_count: u64,
}
impl From<crate::disk_backend::DiskIoStats> for DiskStats {
fn from(s: crate::disk_backend::DiskIoStats) -> Self {
Self {
read_bytes: s.read_bytes,
write_bytes: s.write_bytes,
cache_hits: s.cache_hits,
cache_misses: s.cache_misses,
write_buffer_bytes: s.write_buffer_bytes,
queued_jobs: 0,
read_cache_bytes: s.read_cache_bytes,
pool_entries: s.pool_entries,
prefetch_count: s.prefetch_count,
eviction_count: s.eviction_count,
skeleton_count: s.skeleton_count,
}
}
}
#[derive(Clone)]
pub struct DiskManagerHandle {
tx: mpsc::Sender<DiskJob>,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
spawner: crate::blocking_spawner::BlockingSpawner,
}
impl DiskManagerHandle {
#[must_use]
pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
let backend = crate::disk_backend::create_backend_from_config(&config);
let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
Self::new_with_backend(config, backend, spawner)
}
pub(crate) fn new_with_backend(
config: DiskConfig,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
spawner: crate::blocking_spawner::BlockingSpawner,
) -> (Self, tokio::task::JoinHandle<()>) {
let (tx, rx) = mpsc::channel(config.channel_capacity);
let backend_for_actor = Arc::clone(&backend);
let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
let join = tokio::spawn(actor.run());
(
Self {
tx,
backend,
spawner,
},
join,
)
}
pub async fn register_torrent(
&self,
info_hash: Id20,
storage: Arc<dyn TorrentStorage>,
) -> DiskHandle {
let storage_for_handle = Arc::clone(&storage);
let (reply_tx, reply_rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::Register {
info_hash,
storage,
reply: reply_tx,
})
.await;
let _ = reply_rx.await;
let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
let write_state = Arc::new(DiskWriteState {
tx: write_tx,
pending: Mutex::new(HashMap::new()),
notify: tokio::sync::Notify::new(),
lock_timing: crate::timed_lock::LockTimingSettings::default(),
});
let writer_storage = Arc::clone(&storage_for_handle);
let writer_state = Arc::clone(&write_state);
let writer_spawner = self.spawner.clone();
tokio::spawn(async move {
while let Some(first) = write_rx.recv().await {
let mut batch = vec![first];
while batch.len() < 64 {
match write_rx.try_recv() {
Ok(job) => batch.push(job),
Err(_) => break,
}
}
let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
let ws = Arc::clone(&writer_storage);
let spawner = writer_spawner.clone();
spawner
.block_in_place(move || {
for WriteJob { piece, begin, data } in &batch {
if let Err(e) = ws.write_chunk(*piece, *begin, data) {
tracing::warn!(piece, begin, %e, "deferred write failed");
}
}
})
.await;
{
let mut pending = crate::timed_lock::TimedGuard::new(
writer_state.pending.lock(),
&writer_state.lock_timing,
"disk_pending",
);
for piece in &pieces {
if let Some(count) = pending.get_mut(piece) {
*count = count.saturating_sub(1);
if *count == 0 {
pending.remove(piece);
}
}
}
}
writer_state.notify.notify_waiters();
}
});
DiskHandle {
tx: self.tx.clone(),
info_hash,
hash_pool: None,
hash_result_tx: None,
storage: Some(storage_for_handle),
backend: Some(Arc::clone(&self.backend)),
write_state: Some(write_state),
spawner: Some(self.spawner.clone()),
}
}
pub async fn unregister_torrent(&self, info_hash: Id20) {
let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
}
pub async fn shutdown(&self) {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
let _ = rx.await;
}
}
#[derive(Clone)]
pub struct DiskHandle {
tx: mpsc::Sender<DiskJob>,
info_hash: Id20,
hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
storage: Option<Arc<dyn TorrentStorage>>,
backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
write_state: Option<Arc<DiskWriteState>>,
spawner: Option<crate::blocking_spawner::BlockingSpawner>,
}
impl std::fmt::Debug for DiskHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DiskHandle")
.field("info_hash", &self.info_hash)
.finish_non_exhaustive()
}
}
impl DiskHandle {
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
Self {
tx,
info_hash,
hash_pool: None,
hash_result_tx: None,
storage: None,
backend: None,
write_state: None,
spawner: None,
}
}
pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
self.hash_pool = Some(pool);
}
pub fn set_hash_result_tx(
&mut self,
tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
) {
self.hash_result_tx = Some(tx);
}
pub async fn write_chunk(
&self,
piece: u32,
begin: u32,
data: Bytes,
flags: DiskJobFlags,
) -> irontide_storage::Result<()> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::Write {
info_hash: self.info_hash,
piece,
begin,
data,
flags,
reply: tx,
})
.await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub async fn read_chunk(
&self,
piece: u32,
begin: u32,
length: u32,
flags: DiskJobFlags,
) -> irontide_storage::Result<Bytes> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::Read {
info_hash: self.info_hash,
piece,
begin,
length,
flags,
reply: tx,
})
.await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub async fn verify_piece(
&self,
piece: u32,
expected: Id20,
flags: DiskJobFlags,
) -> irontide_storage::Result<bool> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::Hash {
info_hash: self.info_hash,
piece,
expected,
flags,
reply: tx,
})
.await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub async fn verify_piece_v2(
&self,
piece: u32,
expected: Id32,
flags: DiskJobFlags,
) -> irontide_storage::Result<bool> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::HashV2 {
info_hash: self.info_hash,
piece,
expected,
flags,
reply: tx,
})
.await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub async fn hash_block(
&self,
piece: u32,
begin: u32,
length: u32,
flags: DiskJobFlags,
) -> irontide_storage::Result<Id32> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::BlockHash {
info_hash: self.info_hash,
piece,
begin,
length,
flags,
reply: tx,
})
.await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub async fn clear_piece(&self, piece: u32) {
let _ = self
.tx
.send(DiskJob::ClearPiece {
info_hash: self.info_hash,
piece,
})
.await;
}
pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::FlushWriteBuffer {
info_hash: self.info_hash,
piece,
reply: tx,
})
.await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub async fn cached_pieces(&self) -> Vec<u32> {
let (tx, rx) = oneshot::channel();
let _ = self
.tx
.send(DiskJob::CachedPieces {
info_hash: self.info_hash,
reply: tx,
})
.await;
rx.await.unwrap_or_default()
}
pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
rx.await.unwrap_or_else(|_| {
Err(irontide_storage::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"disk actor gone",
)))
})
}
pub fn enqueue_verify(
&self,
piece: u32,
expected: Id20,
generation: u64,
result_tx: &mpsc::Sender<VerifyResult>,
) {
if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
if let Some(backend) = &self.backend {
let pool = pool.clone();
let hash_tx = hash_tx.clone();
let backend = Arc::clone(backend);
let info_hash = self.info_hash;
let job = crate::hash_pool::HashJob::Streaming {
piece,
expected,
generation,
info_hash,
backend,
result_tx: hash_tx,
};
tokio::spawn(async move {
if pool.submit(job).await.is_err() {
tracing::warn!(piece, "hash pool shut down, treating as failed");
}
});
return;
}
let hash_tx = hash_tx.clone();
tokio::spawn(async move {
tracing::warn!(piece, "verify: no backend (hash pool path)");
let _ = hash_tx
.send(crate::hash_pool::HashResult {
piece,
passed: false,
generation,
})
.await;
});
return;
}
if let Some(backend) = &self.backend {
let backend = Arc::clone(backend);
let info_hash = self.info_hash;
let result_tx = result_tx.clone();
let spawner = self.spawner.clone().unwrap();
tokio::spawn(async move {
let passed = spawner
.block_in_place(move || {
backend
.hash_piece(info_hash, piece, &expected)
.unwrap_or_else(|e| {
warn!(piece, %e, "verify: hash_piece failed");
false
})
})
.await;
let _ = result_tx.send(VerifyResult { piece, passed }).await;
});
return;
}
let result_tx = result_tx.clone();
tokio::spawn(async move {
warn!(piece, "verify: no data source, treating as failed");
let _ = result_tx
.send(VerifyResult {
piece,
passed: false,
})
.await;
});
}
pub fn enqueue_verify_v2(
&self,
piece: u32,
expected: Id32,
result_tx: &mpsc::Sender<VerifyResult>,
) {
if let Some(backend) = &self.backend {
let backend = Arc::clone(backend);
let info_hash = self.info_hash;
let result_tx = result_tx.clone();
let spawner = self.spawner.clone().unwrap();
tokio::spawn(async move {
let passed = spawner
.block_in_place(move || match backend.read_piece(info_hash, piece) {
Ok(data) => {
let actual = irontide_core::sha256(&data);
actual == expected
}
Err(e) => {
warn!(piece, %e, "verify v2: read_piece failed");
false
}
})
.await;
let _ = result_tx.send(VerifyResult { piece, passed }).await;
});
return;
}
let result_tx = result_tx.clone();
tokio::spawn(async move {
warn!(piece, "verify v2: no data source, treating as failed");
let _ = result_tx
.send(VerifyResult {
piece,
passed: false,
})
.await;
});
}
#[allow(
clippy::needless_pass_by_value,
reason = "Bytes is refcounted, pass-by-value is the bytes-crate idiom"
)]
pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
let (Some(write_state), Some(storage)) = (&self.write_state, &self.storage) else {
return;
};
{
let mut pending = crate::timed_lock::TimedGuard::new(
write_state.pending.lock(),
&write_state.lock_timing,
"disk_pending",
);
*pending.entry(piece).or_insert(0) += 1;
}
match write_state.tx.try_send(WriteJob {
piece,
begin,
data: data.clone(),
}) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
let storage = Arc::clone(storage);
if let Some(ref spawner) = self.spawner {
spawner.block_in_place_sync(|| {
if let Err(e) = storage.write_chunk(piece, begin, &data) {
tracing::warn!(piece, begin, %e, "deferred write fallback failed");
}
});
} else {
if let Err(e) = storage.write_chunk(piece, begin, &data) {
tracing::warn!(piece, begin, %e, "deferred write fallback failed");
}
}
let mut pending = crate::timed_lock::TimedGuard::new(
write_state.pending.lock(),
&write_state.lock_timing,
"disk_pending",
);
if let Some(count) = pending.get_mut(&piece) {
*count = count.saturating_sub(1);
if *count == 0 {
pending.remove(&piece);
drop(pending);
write_state.notify.notify_waiters();
}
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
let mut pending = crate::timed_lock::TimedGuard::new(
write_state.pending.lock(),
&write_state.lock_timing,
"disk_pending",
);
if let Some(count) = pending.get_mut(&piece) {
*count = count.saturating_sub(1);
if *count == 0 {
pending.remove(&piece);
drop(pending);
write_state.notify.notify_waiters();
}
}
}
}
}
pub(crate) fn write_block_direct(
&self,
piece: u32,
begin: u32,
s0: &[u8],
s1: &[u8],
) -> crate::Result<()> {
let Some(backend) = &self.backend else {
return Ok(());
};
backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
}
pub(crate) async fn flush_piece_writes(&self, piece: u32) {
let Some(write_state) = &self.write_state else {
return;
};
loop {
{
let pending = crate::timed_lock::TimedGuard::new(
write_state.pending.lock(),
&write_state.lock_timing,
"disk_pending",
);
if !pending.contains_key(&piece) {
return;
}
}
write_state.notify.notified().await;
}
}
#[allow(dead_code)]
pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
self.storage.clone()
}
}
struct DiskActor {
rx: mpsc::Receiver<DiskJob>,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
spawner: crate::blocking_spawner::BlockingSpawner,
#[allow(dead_code)]
config: DiskConfig,
}
impl DiskActor {
fn new(
rx: mpsc::Receiver<DiskJob>,
config: DiskConfig,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
spawner: crate::blocking_spawner::BlockingSpawner,
) -> Self {
Self {
rx,
backend,
spawner,
config,
}
}
async fn run(mut self) {
loop {
let Some(first) = self.rx.recv().await else {
break;
};
let mut batch = vec![first];
while let Ok(job) = self.rx.try_recv() {
batch.push(job);
}
for job in batch {
if let DiskJob::Shutdown { reply } = job {
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
if let Err(e) = flush_result {
warn!("flush_all on shutdown failed: {e}");
}
let _ = reply.send(());
return;
}
self.dispatch_job(job);
}
}
}
fn dispatch_job(&self, job: DiskJob) {
match job {
DiskJob::Register {
info_hash,
storage,
reply,
} => {
self.backend.register(info_hash, storage);
let _ = reply.send(());
}
DiskJob::Unregister { info_hash } => {
self.backend.unregister(info_hash);
}
DiskJob::ClearPiece { info_hash, piece } => {
self.backend.clear_piece(info_hash, piece);
}
DiskJob::CachedPieces { info_hash, reply } => {
let pieces = self.backend.cached_pieces(info_hash);
let _ = reply.send(pieces);
}
DiskJob::Write {
info_hash,
piece,
begin,
data,
flags,
reply,
} => {
let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner
.block_in_place(move || {
backend.write_chunk(info_hash, piece, begin, data, flush)
})
.await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::Read {
info_hash,
piece,
begin,
length,
flags,
reply,
} => {
let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner
.block_in_place(move || {
backend.read_chunk(info_hash, piece, begin, length, volatile)
})
.await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::Hash {
info_hash,
piece,
expected,
reply,
..
} => {
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner
.block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
.await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::HashV2 {
info_hash,
piece,
expected,
reply,
..
} => {
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner
.block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
.await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::BlockHash {
info_hash,
piece,
begin,
length,
reply,
..
} => {
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner
.block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
.await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::FlushWriteBuffer {
info_hash,
piece,
reply,
} => {
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner
.block_in_place(move || backend.flush_piece(info_hash, piece))
.await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::FlushAll { reply } => {
let backend = Arc::clone(&self.backend);
let spawner = self.spawner.clone();
tokio::spawn(async move {
let result = spawner.block_in_place(move || backend.flush_all()).await;
let _ = reply.send(to_storage_result(result));
});
}
DiskJob::Shutdown { .. } => unreachable!(),
}
}
}
fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
r.map_err(|e| match e {
crate::Error::Storage(se) => se,
other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
})
}
#[cfg(test)]
mod tests {
use super::*;
use irontide_core::Lengths;
use irontide_storage::MemoryStorage;
fn test_config() -> DiskConfig {
DiskConfig {
io_threads: 2,
cache_size: 1024 * 1024,
..DiskConfig::default()
}
}
fn make_hash(n: u8) -> Id20 {
let mut b = [0u8; 20];
b[0] = n;
Id20(b)
}
#[tokio::test]
async fn async_write_read() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(1);
let lengths = Lengths::new(100, 50, 25);
let storage = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, storage).await;
let data = Bytes::from(vec![42u8; 25]);
disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
.await
.unwrap();
let read = disk
.read_chunk(0, 0, 25, DiskJobFlags::empty())
.await
.unwrap();
assert_eq!(read, data);
mgr.shutdown().await;
}
#[tokio::test]
async fn verify_through_handle() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(2);
let lengths = Lengths::new(100, 50, 25);
let storage = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, storage).await;
let piece_data = vec![9u8; 50];
disk.write_chunk(
0,
0,
Bytes::from(piece_data.clone()),
DiskJobFlags::FLUSH_PIECE,
)
.await
.unwrap();
disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
.await
.unwrap();
let expected = irontide_core::sha1(&piece_data);
assert!(
disk.verify_piece(0, expected, DiskJobFlags::empty())
.await
.unwrap()
);
assert!(
!disk
.verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
.await
.unwrap()
);
mgr.shutdown().await;
}
#[tokio::test]
async fn cache_hit_avoids_io() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(3);
let lengths = Lengths::new(100, 50, 25);
let storage = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, storage).await;
let data = Bytes::from(vec![7u8; 25]);
disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
.await
.unwrap();
let r1 = disk
.read_chunk(0, 0, 25, DiskJobFlags::empty())
.await
.unwrap();
assert_eq!(r1, data);
let r2 = disk
.read_chunk(0, 0, 25, DiskJobFlags::empty())
.await
.unwrap();
assert_eq!(r2, data);
mgr.shutdown().await;
}
#[tokio::test]
async fn volatile_read_bypasses_cache() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(4);
let lengths = Lengths::new(100, 50, 25);
let storage = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, storage).await;
let data = Bytes::from(vec![5u8; 25]);
disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
.await
.unwrap();
let r = disk
.read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
.await
.unwrap();
assert_eq!(r, data);
mgr.shutdown().await;
}
#[tokio::test]
async fn clear_piece_evicts_cache() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(5);
let lengths = Lengths::new(100, 50, 25);
let storage = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, storage).await;
let data = Bytes::from(vec![5u8; 25]);
disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
.await
.unwrap();
disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
.await
.unwrap();
disk.clear_piece(0).await;
let r = disk
.read_chunk(0, 0, 25, DiskJobFlags::empty())
.await
.unwrap();
assert_eq!(r, data);
mgr.shutdown().await;
}
#[tokio::test]
async fn write_buffer_flush() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(6);
let lengths = Lengths::new(100, 50, 25);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
.await
.unwrap();
disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
.await
.unwrap();
disk.flush_piece(0).await.unwrap();
let piece = storage.read_piece(0).unwrap();
assert_eq!(&piece[..25], &[1u8; 25]);
assert_eq!(&piece[25..], &[2u8; 25]);
mgr.shutdown().await;
}
#[tokio::test]
async fn verify_piece_v2_via_disk_handle() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(11);
let data = vec![0xABu8; 16384];
let expected = irontide_core::sha256(&data);
let lengths = Lengths::new(16384, 16384, 16384);
let storage = Arc::new(MemoryStorage::new(lengths));
storage.write_chunk(0, 0, &data).unwrap();
let disk = mgr.register_torrent(ih, storage).await;
let result = disk
.verify_piece_v2(0, expected, DiskJobFlags::empty())
.await;
assert!(result.unwrap());
mgr.shutdown().await;
}
#[tokio::test]
async fn hash_block_via_disk_handle() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(12);
let data = vec![0xCDu8; 16384];
let lengths = Lengths::new(16384, 16384, 16384);
let storage = Arc::new(MemoryStorage::new(lengths));
storage.write_chunk(0, 0, &data).unwrap();
let disk = mgr.register_torrent(ih, storage).await;
let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
mgr.shutdown().await;
}
#[tokio::test]
async fn concurrent_verify_multiple_pieces() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(10);
let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
let piece_len = 50u64;
let lengths = Lengths::new(data.len() as u64, piece_len, 25);
let storage = Arc::new(MemoryStorage::new(lengths.clone()));
let num_pieces = lengths.num_pieces();
for p in 0..num_pieces {
let offset = lengths.piece_offset(p) as usize;
let size = lengths.piece_size(p) as usize;
storage
.write_chunk(p, 0, &data[offset..offset + size])
.unwrap();
}
let disk = mgr.register_torrent(ih, storage).await;
let mut expected_hashes = Vec::new();
for p in 0..num_pieces {
let offset = lengths.piece_offset(p) as usize;
let size = lengths.piece_size(p) as usize;
expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
}
let mut js = tokio::task::JoinSet::new();
for p in 0..num_pieces {
let d = disk.clone();
let hash = expected_hashes[p as usize];
js.spawn(async move {
let valid = d
.verify_piece(p, hash, DiskJobFlags::empty())
.await
.unwrap();
(p, valid)
});
}
let mut results = Vec::new();
while let Some(r) = js.join_next().await {
results.push(r.unwrap());
}
results.sort_by_key(|&(p, _)| p);
assert_eq!(results.len(), num_pieces as usize);
for (p, valid) in &results {
assert!(valid, "piece {p} should be valid");
}
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_block_deferred_writes_to_storage() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(30);
let lengths = Lengths::new(100, 50, 25);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
let block0 = Bytes::from(vec![0xAAu8; 25]);
let block1 = Bytes::from(vec![0xBBu8; 25]);
disk.write_block_deferred(0, 0, block0.clone());
disk.write_block_deferred(0, 25, block1.clone());
disk.flush_piece_writes(0).await;
let read0 = storage.read_chunk(0, 0, 25).unwrap();
assert_eq!(&read0[..], &block0[..], "block 0 should match");
let read1 = storage.read_chunk(0, 25, 25).unwrap();
assert_eq!(&read1[..], &block1[..], "block 1 should match");
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn flush_piece_writes_waits_for_completion() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(31);
let lengths = Lengths::new(200, 100, 25);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
for i in 0u32..4 {
let data = Bytes::from(vec![(i as u8) + 1; 25]);
disk.write_block_deferred(0, i * 25, data);
}
disk.flush_piece_writes(0).await;
let piece = storage.read_piece(0).unwrap();
assert_eq!(&piece[0..25], &[1u8; 25]);
assert_eq!(&piece[25..50], &[2u8; 25]);
assert_eq!(&piece[50..75], &[3u8; 25]);
assert_eq!(&piece[75..100], &[4u8; 25]);
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_writer_drains_multiple_jobs() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(50);
let lengths = Lengths::new(250, 250, 25);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
for i in 0u32..10 {
let data = Bytes::from(vec![i as u8 + 1; 25]);
disk.write_block_deferred(0, i * 25, data);
}
disk.flush_piece_writes(0).await;
for i in 0u32..10 {
let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
assert_eq!(
&chunk[..],
vec![i as u8 + 1; 25].as_slice(),
"block {i} mismatch"
);
}
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn batch_writer_caps_at_64() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(51);
let lengths = Lengths::new(1600, 1600, 16);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
for i in 0u32..100 {
let data = Bytes::from(vec![i as u8; 16]);
disk.write_block_deferred(0, i * 16, data);
}
disk.flush_piece_writes(0).await;
for i in 0u32..100 {
let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
assert_eq!(
&chunk[..],
vec![i as u8; 16].as_slice(),
"block {i} mismatch after overflow to next batch"
);
}
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn verify_from_disk_after_deferred_write() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(40);
let chunk_size = 16384u32;
let piece_size = u64::from(chunk_size) * 2;
let lengths = Lengths::new(piece_size, piece_size, chunk_size);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
let chunk0 = vec![0xAAu8; chunk_size as usize];
let chunk1 = vec![0xBBu8; chunk_size as usize];
disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
disk.flush_piece_writes(0).await;
let mut full_piece = Vec::with_capacity(piece_size as usize);
full_piece.extend_from_slice(&chunk0);
full_piece.extend_from_slice(&chunk1);
let expected_hash = irontide_core::sha1(&full_piece);
let (result_tx, mut result_rx) = mpsc::channel(4);
disk.enqueue_verify(0, expected_hash, 0, &result_tx);
let result = result_rx
.recv()
.await
.expect("should receive verify result");
assert_eq!(result.piece, 0);
assert!(result.passed, "disk-based SHA-1 verify should pass");
disk.write_block_deferred(0, 0, Bytes::from(chunk0));
disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
disk.flush_piece_writes(0).await;
disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
let result = result_rx
.recv()
.await
.expect("should receive verify result");
assert!(!result.passed, "wrong hash should fail disk-based verify");
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn verify_v2_from_disk_after_deferred_write() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(41);
let chunk_size = 16384u32;
let piece_size = u64::from(chunk_size) * 2;
let lengths = Lengths::new(piece_size, piece_size, chunk_size);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
let chunk0 = vec![0xCCu8; chunk_size as usize];
let chunk1 = vec![0xDDu8; chunk_size as usize];
disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
disk.flush_piece_writes(0).await;
let mut full_piece = Vec::with_capacity(piece_size as usize);
full_piece.extend_from_slice(&chunk0);
full_piece.extend_from_slice(&chunk1);
let expected_hash = irontide_core::sha256(&full_piece);
let (result_tx, mut result_rx) = mpsc::channel(4);
disk.enqueue_verify_v2(0, expected_hash, &result_tx);
let result = result_rx
.recv()
.await
.expect("should receive v2 verify result");
assert_eq!(result.piece, 0);
assert!(result.passed, "disk-based SHA-256 verify should pass");
disk.write_block_deferred(0, 0, Bytes::from(chunk0));
disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
disk.flush_piece_writes(0).await;
disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
let result = result_rx
.recv()
.await
.expect("should receive v2 verify result");
assert!(
!result.passed,
"wrong hash should fail disk-based v2 verify"
);
mgr.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn verify_with_hash_pool_from_disk() {
let (mgr, _actor) = DiskManagerHandle::new(test_config());
let ih = make_hash(42);
let chunk_size = 16384u32;
let piece_size = u64::from(chunk_size) * 2;
let lengths = Lengths::new(piece_size, piece_size, chunk_size);
let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
disk.set_hash_result_tx(hash_result_tx);
let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
disk.set_hash_pool(hash_pool);
let chunk0 = vec![0xEEu8; chunk_size as usize];
let chunk1 = vec![0xFFu8; chunk_size as usize];
disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
disk.flush_piece_writes(0).await;
let mut full_piece = Vec::with_capacity(piece_size as usize);
full_piece.extend_from_slice(&chunk0);
full_piece.extend_from_slice(&chunk1);
let expected_hash = irontide_core::sha1(&full_piece);
let (verify_result_tx, _) = mpsc::channel(4); disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
let result = hash_result_rx
.recv()
.await
.expect("should receive hash pool result");
assert!(result.passed, "hash pool disk-based verify should pass");
assert_eq!(result.piece, 0);
assert_eq!(result.generation, 42);
mgr.shutdown().await;
}
}