Skip to main content

irontide_session/
disk_backend.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_precision_loss,
4    clippy::cast_possible_wrap,
5    clippy::cast_sign_loss,
6    reason = "M175: disk I/O sizes bounded by piece_length (u32 by construction in Lengths::new); offset arithmetic stays within file lengths"
7)]
8
9//! Pluggable disk I/O backend abstraction.
10//!
11//! The [`DiskIoBackend`] trait defines the interface for session-level disk I/O,
12//! allowing custom storage backends (POSIX, mmap, disabled/null, etc.).
13//! [`DisabledDiskIo`] is a no-op backend useful for network throughput benchmarking.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::{Mutex, RwLock};
19
20use bytes::Bytes;
21use irontide_core::{Id20, Id32};
22use irontide_storage::TorrentStorage;
23
24use crate::buffer_pool::BufferPool;
25use crate::disk::DiskConfig;
26
27/// Aggregate I/O statistics for a disk backend.
28#[derive(Debug, Clone, Default)]
29pub struct DiskIoStats {
30    /// Total bytes read from storage.
31    pub read_bytes: u64,
32    /// Total bytes written to storage.
33    pub write_bytes: u64,
34    /// Number of read requests satisfied from cache.
35    pub cache_hits: u64,
36    /// Number of read requests that required disk access.
37    pub cache_misses: u64,
38    /// Current size of the write buffer in bytes.
39    pub write_buffer_bytes: usize,
40    /// Current size of the read cache in bytes (M102).
41    pub read_cache_bytes: usize,
42    /// Total number of entries in the buffer pool (M102).
43    pub pool_entries: usize,
44    /// Number of prefetch insertions into the read cache (M102).
45    pub prefetch_count: u64,
46    /// Number of ARC evictions from the read cache (M102).
47    pub eviction_count: u64,
48    /// Number of Writing-to-Skeleton demotions (M102).
49    pub skeleton_count: u64,
50}
51
52/// Trait for pluggable disk I/O backends.
53///
54/// Implementations are shared across the session via `Arc<dyn DiskIoBackend>`.
55/// All methods must be safe to call from multiple threads concurrently.
56pub trait DiskIoBackend: Send + Sync {
57    /// Human-readable backend name (e.g., "posix", "mmap", "disabled").
58    fn name(&self) -> &str;
59
60    /// Register a torrent's storage for I/O operations.
61    fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>);
62
63    /// Unregister a torrent's storage.
64    fn unregister(&self, info_hash: Id20);
65
66    /// Write a chunk of piece data. If `flush` is true, persist immediately.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the underlying storage write fails.
71    fn write_chunk(
72        &self,
73        info_hash: Id20,
74        piece: u32,
75        begin: u32,
76        data: Bytes,
77        flush: bool,
78    ) -> crate::Result<()>;
79
80    /// Read a chunk of piece data. `volatile` hints the data won't be re-read soon.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the underlying storage read fails.
85    fn read_chunk(
86        &self,
87        info_hash: Id20,
88        piece: u32,
89        begin: u32,
90        length: u32,
91        volatile: bool,
92    ) -> crate::Result<Bytes>;
93
94    /// Read an entire piece from storage, returning its raw bytes.
95    ///
96    /// For backends with a write buffer, the piece is flushed before reading.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the underlying storage read fails.
101    fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>>;
102
103    /// Verify a piece against its expected SHA-1 hash.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the underlying storage read fails.
108    fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool>;
109
110    /// Verify a piece against its expected SHA-256 hash (BEP 52).
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the underlying storage read fails.
115    fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool>;
116
117    /// Compute SHA-256 hash of a single block within a piece (BEP 52 Merkle).
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the underlying storage read fails.
122    fn hash_block(
123        &self,
124        info_hash: Id20,
125        piece: u32,
126        begin: u32,
127        length: u32,
128    ) -> crate::Result<Id32>;
129
130    /// Discard buffered data for a piece (e.g., after hash failure).
131    fn clear_piece(&self, info_hash: Id20, piece: u32);
132
133    /// Flush buffered writes for a piece to persistent storage.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the underlying storage flush fails.
138    fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()>;
139
140    /// Flush all buffered writes across all torrents.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the underlying storage flush fails.
145    fn flush_all(&self) -> crate::Result<()>;
146
147    /// Return piece indices currently held in cache (for `SuggestPiece`, M44).
148    fn cached_pieces(&self, info_hash: Id20) -> Vec<u32>;
149
150    /// Return current I/O statistics.
151    fn stats(&self) -> DiskIoStats;
152
153    /// Write a block directly to storage from two slices (vectored write).
154    ///
155    /// Bypasses the buffer pool entirely — data goes straight to the
156    /// underlying `TorrentStorage`. The two slices represent contiguous
157    /// block data split across a ring-buffer wrap boundary.
158    ///
159    /// This is the zero-copy direct-pwrite path used by peer tasks.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the underlying storage write fails.
164    fn write_block_direct(
165        &self,
166        info_hash: Id20,
167        piece: u32,
168        begin: u32,
169        s0: &[u8],
170        s1: &[u8],
171    ) -> crate::Result<()>;
172}
173
174/// No-op disk I/O backend for network throughput benchmarking.
175///
176/// All writes succeed silently, reads return zeroed bytes, and hash
177/// verifications always pass.
178pub struct DisabledDiskIo;
179
180impl DiskIoBackend for DisabledDiskIo {
181    fn name(&self) -> &'static str {
182        "disabled"
183    }
184
185    fn register(&self, _info_hash: Id20, _storage: Arc<dyn TorrentStorage>) {}
186
187    fn unregister(&self, _info_hash: Id20) {}
188
189    fn write_chunk(
190        &self,
191        _info_hash: Id20,
192        _piece: u32,
193        _begin: u32,
194        _data: Bytes,
195        _flush: bool,
196    ) -> crate::Result<()> {
197        Ok(())
198    }
199
200    fn read_chunk(
201        &self,
202        _info_hash: Id20,
203        _piece: u32,
204        _begin: u32,
205        length: u32,
206        _volatile: bool,
207    ) -> crate::Result<Bytes> {
208        Ok(Bytes::from(vec![0u8; length as usize]))
209    }
210
211    fn read_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<Vec<u8>> {
212        Ok(Vec::new())
213    }
214
215    fn hash_piece(&self, _info_hash: Id20, _piece: u32, _expected: &Id20) -> crate::Result<bool> {
216        Ok(true)
217    }
218
219    fn hash_piece_v2(
220        &self,
221        _info_hash: Id20,
222        _piece: u32,
223        _expected: &Id32,
224    ) -> crate::Result<bool> {
225        Ok(true)
226    }
227
228    fn hash_block(
229        &self,
230        _info_hash: Id20,
231        _piece: u32,
232        _begin: u32,
233        _length: u32,
234    ) -> crate::Result<Id32> {
235        Ok(Id32([0u8; 32]))
236    }
237
238    fn clear_piece(&self, _info_hash: Id20, _piece: u32) {}
239
240    fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
241        Ok(())
242    }
243
244    fn flush_all(&self) -> crate::Result<()> {
245        Ok(())
246    }
247
248    fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
249        vec![]
250    }
251
252    fn stats(&self) -> DiskIoStats {
253        DiskIoStats::default()
254    }
255
256    fn write_block_direct(
257        &self,
258        _info_hash: Id20,
259        _piece: u32,
260        _begin: u32,
261        _s0: &[u8],
262        _s1: &[u8],
263    ) -> crate::Result<()> {
264        Ok(())
265    }
266}
267
268// ---------------------------------------------------------------------------
269// PosixDiskIo — unified BufferPool backend
270// ---------------------------------------------------------------------------
271
272/// POSIX disk I/O backend with unified buffer pool (combined read cache +
273/// write buffer under a single byte budget).
274///
275/// Replaces the previous separate `ArcCache` + `WriteBuffer` design with
276/// a single [`BufferPool`] that uses a byte-budget ARC for read caching and
277/// tracks in-flight writes as `Writing` entries.
278pub struct PosixDiskIo {
279    storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
280    pool: Mutex<BufferPool>,
281    stats: Mutex<DiskIoStats>,
282}
283
284impl PosixDiskIo {
285    /// Create a new POSIX disk I/O backend with the given configuration.
286    #[must_use]
287    pub fn new(config: &DiskConfig) -> Self {
288        let mut pool = BufferPool::with_capacity(config.buffer_pool_capacity);
289        pool.set_mlock(config.enable_mlock);
290        Self {
291            storages: RwLock::new(HashMap::new()),
292            pool: Mutex::new(pool),
293            stats: Mutex::new(DiskIoStats::default()),
294        }
295    }
296
297    fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
298        self.storages
299            .read()
300            .get(&info_hash)
301            .cloned()
302            .ok_or_else(|| {
303                crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
304                    std::io::ErrorKind::NotFound,
305                    "torrent not registered",
306                )))
307            })
308    }
309}
310
311impl DiskIoBackend for PosixDiskIo {
312    fn name(&self) -> &'static str {
313        "posix"
314    }
315
316    fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
317        self.storages.write().insert(info_hash, storage);
318    }
319
320    fn unregister(&self, info_hash: Id20) {
321        self.storages.write().remove(&info_hash);
322        self.pool.lock().clear_torrent(info_hash);
323    }
324
325    fn write_chunk(
326        &self,
327        info_hash: Id20,
328        piece: u32,
329        begin: u32,
330        data: Bytes,
331        flush: bool,
332    ) -> crate::Result<()> {
333        let len = data.len();
334
335        if flush {
336            let storage = self.get_storage(info_hash)?;
337            storage.write_chunk(piece, begin, &data)?;
338            self.stats.lock().write_bytes += len as u64;
339            return Ok(());
340        }
341
342        // Buffer the write in the pool. Pass piece_size=0 since we don't know
343        // it here — completion detection is handled by ChunkTracker in
344        // torrent.rs, not by the buffer pool.
345        let key = (info_hash, piece);
346        self.pool.lock().write_block(key, begin, data, 0);
347        self.stats.lock().write_bytes += len as u64;
348        Ok(())
349    }
350
351    fn read_chunk(
352        &self,
353        info_hash: Id20,
354        piece: u32,
355        begin: u32,
356        length: u32,
357        volatile: bool,
358    ) -> crate::Result<Bytes> {
359        let key = (info_hash, piece);
360
361        // 1. Check buffer pool (Writing entries + Cached entries)
362        if !volatile {
363            let mut pool = self.pool.lock();
364            if let Some(data) = pool.read_block(key, begin, length as usize) {
365                drop(pool);
366                self.stats.lock().cache_hits += 1;
367                return Ok(data);
368            }
369        }
370        self.stats.lock().cache_misses += 1;
371
372        // 2. Read full piece from storage and cache it (unless volatile)
373        let storage = self.get_storage(info_hash)?;
374        if !volatile {
375            if let Ok(piece_data) = storage.read_piece(piece) {
376                let piece_bytes = Bytes::from(piece_data);
377                self.stats.lock().read_bytes += piece_bytes.len() as u64;
378
379                let mut pool = self.pool.lock();
380                pool.prefetch_piece(key, piece_bytes.clone());
381                drop(pool);
382
383                // Return the requested slice
384                let end = (begin as usize).saturating_add(length as usize);
385                if end <= piece_bytes.len() {
386                    return Ok(piece_bytes.slice(begin as usize..end));
387                }
388                // Fall through to chunk read if slice is out of bounds
389                // (shouldn't happen in practice, but defensive)
390            } else {
391                // Fall through to chunk read on piece-level failure
392            }
393        }
394
395        // 3. Volatile path or fallback: read just the requested chunk
396        let data = storage.read_chunk(piece, begin, length)?;
397        let bytes = Bytes::from(data);
398        self.stats.lock().read_bytes += bytes.len() as u64;
399        Ok(bytes)
400    }
401
402    fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
403        self.flush_piece(info_hash, piece)?;
404        let storage = self.get_storage(info_hash)?;
405        let data = storage.read_piece(piece)?;
406        self.stats.lock().read_bytes += data.len() as u64;
407        Ok(data)
408    }
409
410    fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
411        let key = (info_hash, piece);
412
413        // Try hash from cache: take all blocks from the Writing entry and hash
414        // them in memory, avoiding a disk round-trip. This works because
415        // hash_piece is only called after ChunkTracker confirms the piece is
416        // complete, so all blocks should be present.
417        let cached_data = {
418            let mut pool = self.pool.lock();
419            pool.take_all_blocks(key)
420        };
421
422        if let Some(data) = cached_data {
423            let hash = irontide_core::sha1(&data);
424            if hash == *expected {
425                // Hash pass: flush to disk + promote to read cache.
426                let storage = self.get_storage(info_hash)?;
427                storage.write_chunk(piece, 0, &data)?;
428                self.stats.lock().write_bytes += data.len() as u64;
429
430                let mut pool = self.pool.lock();
431                pool.promote_to_cached(key, Bytes::from(data));
432                return Ok(true);
433            }
434            // Hash fail: data already removed from pool by take_all_blocks.
435            return Ok(false);
436        }
437
438        // No cached data — flush and verify from disk (legacy path).
439        self.flush_piece(info_hash, piece)?;
440        let storage = self.get_storage(info_hash)?;
441        Ok(storage.verify_piece(piece, expected)?)
442    }
443
444    fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
445        let key = (info_hash, piece);
446
447        // Try hash from cache (same pattern as hash_piece but SHA-256).
448        let cached_data = {
449            let mut pool = self.pool.lock();
450            pool.take_all_blocks(key)
451        };
452
453        if let Some(data) = cached_data {
454            let hash = irontide_core::sha256(&data);
455            if hash == *expected {
456                let storage = self.get_storage(info_hash)?;
457                storage.write_chunk(piece, 0, &data)?;
458                self.stats.lock().write_bytes += data.len() as u64;
459
460                let mut pool = self.pool.lock();
461                pool.promote_to_cached(key, Bytes::from(data));
462                return Ok(true);
463            }
464            return Ok(false);
465        }
466
467        self.flush_piece(info_hash, piece)?;
468        let storage = self.get_storage(info_hash)?;
469        Ok(storage.verify_piece_v2(piece, expected)?)
470    }
471
472    fn hash_block(
473        &self,
474        info_hash: Id20,
475        piece: u32,
476        begin: u32,
477        length: u32,
478    ) -> crate::Result<Id32> {
479        self.flush_piece(info_hash, piece)?;
480        let storage = self.get_storage(info_hash)?;
481        Ok(storage.hash_block(piece, begin, length)?)
482    }
483
484    fn clear_piece(&self, info_hash: Id20, piece: u32) {
485        self.pool.lock().clear_piece((info_hash, piece));
486    }
487
488    fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()> {
489        let blocks = {
490            let mut pool = self.pool.lock();
491            match pool.flush_piece((info_hash, piece)) {
492                Some(b) => b,
493                None => return Ok(()),
494            }
495        };
496        let storage = self.get_storage(info_hash)?;
497        for (begin, data) in blocks {
498            storage.write_chunk(piece, begin, &data)?;
499        }
500        Ok(())
501    }
502
503    fn flush_all(&self) -> crate::Result<()> {
504        let all_blocks = {
505            let mut pool = self.pool.lock();
506            pool.flush_all()
507        };
508        for ((info_hash, piece), blocks) in all_blocks {
509            let storage = self.get_storage(info_hash)?;
510            for (begin, data) in blocks {
511                storage.write_chunk(piece, begin, &data)?;
512            }
513        }
514        Ok(())
515    }
516
517    fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
518        let pool = self.pool.lock();
519        pool.hot_pieces(info_hash)
520    }
521
522    fn stats(&self) -> DiskIoStats {
523        let mut s = self.stats.lock().clone();
524        let pool = self.pool.lock();
525        let ps = pool.stats();
526        s.write_buffer_bytes = ps.write_buffer_bytes;
527        s.read_cache_bytes = ps.read_cache_bytes;
528        s.pool_entries = ps.total_entries;
529        s.prefetch_count = ps.prefetch_count;
530        s.eviction_count = ps.eviction_count;
531        s.skeleton_count = ps.skeleton_count;
532        s
533    }
534
535    fn write_block_direct(
536        &self,
537        info_hash: Id20,
538        piece: u32,
539        begin: u32,
540        s0: &[u8],
541        s1: &[u8],
542    ) -> crate::Result<()> {
543        let storage = self.get_storage(info_hash)?;
544        storage.write_chunk_vectored(piece, begin, s0, s1)?;
545        let total = (s0.len() + s1.len()) as u64;
546        self.stats.lock().write_bytes += total;
547        Ok(())
548    }
549}
550
551// ---------------------------------------------------------------------------
552// MmapDiskIo — direct I/O, no user-space cache
553// ---------------------------------------------------------------------------
554
555/// Memory-mapped disk I/O backend with no user-space cache.
556///
557/// Relies entirely on the kernel page cache via mmap. All reads and writes
558/// go directly to storage with no write buffering.
559pub struct MmapDiskIo {
560    storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
561    stats: Mutex<DiskIoStats>,
562}
563
564impl MmapDiskIo {
565    /// Create a new mmap disk I/O backend.
566    #[must_use]
567    pub fn new() -> Self {
568        Self {
569            storages: RwLock::new(HashMap::new()),
570            stats: Mutex::new(DiskIoStats::default()),
571        }
572    }
573
574    fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
575        self.storages
576            .read()
577            .get(&info_hash)
578            .cloned()
579            .ok_or_else(|| {
580                crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
581                    std::io::ErrorKind::NotFound,
582                    "torrent not registered",
583                )))
584            })
585    }
586}
587
588impl Default for MmapDiskIo {
589    fn default() -> Self {
590        Self::new()
591    }
592}
593
594impl DiskIoBackend for MmapDiskIo {
595    fn name(&self) -> &'static str {
596        "mmap"
597    }
598
599    fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
600        self.storages.write().insert(info_hash, storage);
601    }
602
603    fn unregister(&self, info_hash: Id20) {
604        self.storages.write().remove(&info_hash);
605    }
606
607    fn write_chunk(
608        &self,
609        info_hash: Id20,
610        piece: u32,
611        begin: u32,
612        data: Bytes,
613        _flush: bool,
614    ) -> crate::Result<()> {
615        let len = data.len();
616        let storage = self.get_storage(info_hash)?;
617        storage.write_chunk(piece, begin, &data)?;
618        self.stats.lock().write_bytes += len as u64;
619        Ok(())
620    }
621
622    fn read_chunk(
623        &self,
624        info_hash: Id20,
625        piece: u32,
626        begin: u32,
627        length: u32,
628        _volatile: bool,
629    ) -> crate::Result<Bytes> {
630        let storage = self.get_storage(info_hash)?;
631        let data = storage.read_chunk(piece, begin, length)?;
632        let bytes = Bytes::from(data);
633        self.stats.lock().read_bytes += bytes.len() as u64;
634        Ok(bytes)
635    }
636
637    fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
638        let storage = self.get_storage(info_hash)?;
639        let data = storage.read_piece(piece)?;
640        self.stats.lock().read_bytes += data.len() as u64;
641        Ok(data)
642    }
643
644    fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
645        let storage = self.get_storage(info_hash)?;
646        Ok(storage.verify_piece(piece, expected)?)
647    }
648
649    fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
650        let storage = self.get_storage(info_hash)?;
651        Ok(storage.verify_piece_v2(piece, expected)?)
652    }
653
654    fn hash_block(
655        &self,
656        info_hash: Id20,
657        piece: u32,
658        begin: u32,
659        length: u32,
660    ) -> crate::Result<Id32> {
661        let storage = self.get_storage(info_hash)?;
662        Ok(storage.hash_block(piece, begin, length)?)
663    }
664
665    fn clear_piece(&self, _info_hash: Id20, _piece: u32) {
666        // No-op: no write buffer or cache
667    }
668
669    fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
670        // No-op: no write buffer
671        Ok(())
672    }
673
674    fn flush_all(&self) -> crate::Result<()> {
675        // No-op: no write buffer
676        Ok(())
677    }
678
679    fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
680        vec![]
681    }
682
683    fn stats(&self) -> DiskIoStats {
684        self.stats.lock().clone()
685    }
686
687    fn write_block_direct(
688        &self,
689        info_hash: Id20,
690        piece: u32,
691        begin: u32,
692        s0: &[u8],
693        s1: &[u8],
694    ) -> crate::Result<()> {
695        let storage = self.get_storage(info_hash)?;
696        storage.write_chunk_vectored(piece, begin, s0, s1)?;
697        let total = (s0.len() + s1.len()) as u64;
698        self.stats.lock().write_bytes += total;
699        Ok(())
700    }
701}
702
703// ---------------------------------------------------------------------------
704// Factory
705// ---------------------------------------------------------------------------
706
707/// Create a disk I/O backend based on the storage mode in the configuration.
708///
709/// Returns `IoUringDiskIo` when `config.storage_mode` is `IoUring` (Linux + `io-uring` feature),
710/// `MmapDiskIo` when `Mmap`, otherwise `PosixDiskIo` with unified buffer pool.
711/// If `io_uring` initialisation fails, falls back to `PosixDiskIo` with a warning.
712#[must_use]
713pub fn create_backend_from_config(config: &DiskConfig) -> Arc<dyn DiskIoBackend> {
714    #[cfg(all(target_os = "linux", feature = "io-uring"))]
715    if config.storage_mode == irontide_core::StorageMode::IoUring {
716        let uring_config = irontide_storage::IoUringConfig {
717            sq_depth: config.io_uring_sq_depth,
718            enable_direct_io: config.io_uring_direct_io,
719            batch_threshold: config.io_uring_batch_threshold,
720        };
721        match crate::io_uring_backend::IoUringDiskIo::new(config, uring_config) {
722            Ok(backend) => return Arc::new(backend),
723            Err(e) => {
724                tracing::warn!("io_uring init failed, falling back to posix: {e}");
725            }
726        }
727    }
728
729    #[cfg(all(target_os = "windows", feature = "iocp"))]
730    if config.storage_mode == irontide_core::StorageMode::Iocp {
731        let iocp_config = irontide_storage::IocpConfig {
732            concurrent_threads: config.iocp_concurrent_threads,
733            enable_direct_io: config.iocp_direct_io,
734        };
735        match crate::iocp_backend::IocpDiskIo::new(config, iocp_config) {
736            Ok(backend) => return Arc::new(backend),
737            Err(e) => {
738                tracing::warn!("IOCP init failed, falling back to posix: {e}");
739            }
740        }
741    }
742
743    if config.storage_mode == irontide_core::StorageMode::Mmap {
744        Arc::new(MmapDiskIo::new())
745    } else {
746        Arc::new(PosixDiskIo::new(config))
747    }
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753    use irontide_core::Lengths;
754    use irontide_storage::MemoryStorage;
755
756    fn make_hash() -> Id20 {
757        Id20([0xAB; 20])
758    }
759
760    fn make_hash_n(n: u8) -> Id20 {
761        let mut b = [0u8; 20];
762        b[0] = n;
763        Id20(b)
764    }
765
766    fn test_config() -> DiskConfig {
767        DiskConfig {
768            io_threads: 2,
769            cache_size: 1024 * 1024, // 1 MiB
770            ..DiskConfig::default()
771        }
772    }
773
774    /// Create a `MemoryStorage` with a single piece of the given size.
775    fn make_storage(piece_size: u64) -> Arc<dyn TorrentStorage> {
776        let chunk = piece_size.min(16384) as u32;
777        let lengths = Lengths::new(piece_size, piece_size, chunk);
778        Arc::new(MemoryStorage::new(lengths))
779    }
780
781    /// Create a `MemoryStorage` with the given total/piece/chunk sizes.
782    fn make_storage_full(total: u64, piece_len: u64, chunk_size: u32) -> Arc<dyn TorrentStorage> {
783        let lengths = Lengths::new(total, piece_len, chunk_size);
784        Arc::new(MemoryStorage::new(lengths))
785    }
786
787    // -----------------------------------------------------------------------
788    // DisabledDiskIo tests
789    // -----------------------------------------------------------------------
790
791    #[test]
792    fn disabled_backend_name() {
793        let backend = DisabledDiskIo;
794        assert_eq!(backend.name(), "disabled");
795    }
796
797    #[test]
798    fn disabled_backend_write_succeeds() {
799        let backend = DisabledDiskIo;
800        let result =
801            backend.write_chunk(make_hash(), 0, 0, Bytes::from_static(&[1, 2, 3, 4]), false);
802        assert!(result.is_ok());
803    }
804
805    #[test]
806    fn disabled_backend_read_returns_zeroed() {
807        let backend = DisabledDiskIo;
808        let length = 16384u32;
809        let data = backend
810            .read_chunk(make_hash(), 0, 0, length, false)
811            .unwrap();
812        assert_eq!(data.len(), length as usize);
813        assert!(data.iter().all(|&b| b == 0));
814    }
815
816    #[test]
817    fn disabled_backend_hash_always_passes() {
818        let backend = DisabledDiskIo;
819        let expected = Id20([0xFF; 20]);
820        let result = backend.hash_piece(make_hash(), 42, &expected).unwrap();
821        assert!(result);
822    }
823
824    #[test]
825    fn disabled_backend_hash_v2_always_passes() {
826        let backend = DisabledDiskIo;
827        let expected = Id32([0xFF; 32]);
828        let result = backend.hash_piece_v2(make_hash(), 42, &expected).unwrap();
829        assert!(result);
830    }
831
832    #[test]
833    fn disabled_backend_stats_default() {
834        let backend = DisabledDiskIo;
835        let stats = backend.stats();
836        assert_eq!(stats.read_bytes, 0);
837        assert_eq!(stats.write_bytes, 0);
838        assert_eq!(stats.cache_hits, 0);
839        assert_eq!(stats.cache_misses, 0);
840        assert_eq!(stats.write_buffer_bytes, 0);
841    }
842
843    #[test]
844    fn disabled_backend_cached_pieces_empty() {
845        let backend = DisabledDiskIo;
846        let pieces = backend.cached_pieces(make_hash());
847        assert!(pieces.is_empty());
848    }
849
850    // -----------------------------------------------------------------------
851    // PosixDiskIo tests
852    // -----------------------------------------------------------------------
853
854    #[test]
855    fn posix_backend_name() {
856        let backend = PosixDiskIo::new(&test_config());
857        assert_eq!(backend.name(), "posix");
858    }
859
860    #[test]
861    fn posix_register_unregister() {
862        let backend = PosixDiskIo::new(&test_config());
863        let ih = make_hash_n(1);
864        let storage = make_storage(100);
865        backend.register(ih, storage);
866
867        // Should be able to read (empty data)
868        assert!(backend.read_chunk(ih, 0, 0, 10, false).is_ok());
869
870        backend.unregister(ih);
871
872        // Should fail after unregister
873        assert!(backend.read_chunk(ih, 0, 0, 10, false).is_err());
874    }
875
876    #[test]
877    fn posix_write_and_read_flush() {
878        let backend = PosixDiskIo::new(&test_config());
879        let ih = make_hash_n(2);
880        let storage = make_storage(100);
881        backend.register(ih, storage);
882
883        let data = vec![42u8; 50];
884        backend
885            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
886            .unwrap();
887        let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
888        assert_eq!(&read[..], &data[..]);
889    }
890
891    #[test]
892    fn posix_write_buffered_then_read() {
893        let backend = PosixDiskIo::new(&test_config());
894        let ih = make_hash_n(3);
895        let storage = make_storage(100);
896        backend.register(ih, storage);
897
898        let data = vec![99u8; 50];
899        // Write without flush — goes to buffer pool
900        backend
901            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
902            .unwrap();
903
904        // Read should find it in buffer pool
905        let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
906        assert_eq!(&read[..], &data[..]);
907
908        // Should be a cache hit (from buffer pool)
909        let stats = backend.stats();
910        assert!(stats.cache_hits >= 1);
911    }
912
913    #[test]
914    fn posix_read_cache_hit() {
915        let backend = PosixDiskIo::new(&test_config());
916        let ih = make_hash_n(4);
917        let storage = make_storage(100);
918        backend.register(ih, storage);
919
920        let data = vec![7u8; 50];
921        backend
922            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
923            .unwrap();
924
925        // First read: cache miss, reads from storage and prefetches
926        let r1 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
927        assert_eq!(&r1[..], &data[..]);
928        let s1 = backend.stats();
929        assert_eq!(s1.cache_misses, 1);
930
931        // Second read: should be cache hit (from prefetched Cached entry)
932        let r2 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
933        assert_eq!(&r2[..], &data[..]);
934        let s2 = backend.stats();
935        assert_eq!(s2.cache_hits, 1);
936    }
937
938    #[test]
939    fn posix_hash_piece_correct() {
940        let backend = PosixDiskIo::new(&test_config());
941        let ih = make_hash_n(5);
942        let storage = make_storage(50);
943        backend.register(ih, storage);
944
945        let data = vec![9u8; 50];
946        backend
947            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
948            .unwrap();
949
950        let expected = irontide_core::sha1(&data);
951        assert!(backend.hash_piece(ih, 0, &expected).unwrap());
952    }
953
954    #[test]
955    fn posix_hash_piece_wrong() {
956        let backend = PosixDiskIo::new(&test_config());
957        let ih = make_hash_n(6);
958        let storage = make_storage(50);
959        backend.register(ih, storage);
960
961        let data = vec![9u8; 50];
962        backend
963            .write_chunk(ih, 0, 0, Bytes::from(data), true)
964            .unwrap();
965
966        let wrong = Id20([0xFF; 20]);
967        assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
968    }
969
970    #[test]
971    fn posix_hash_piece_v2() {
972        let backend = PosixDiskIo::new(&test_config());
973        let ih = make_hash_n(7);
974        let storage = make_storage(16384);
975        backend.register(ih, storage);
976
977        let data = vec![0xABu8; 16384];
978        backend
979            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
980            .unwrap();
981
982        let expected = irontide_core::sha256(&data);
983        assert!(backend.hash_piece_v2(ih, 0, &expected).unwrap());
984    }
985
986    #[test]
987    fn posix_clear_piece_drops_buffer() {
988        let backend = PosixDiskIo::new(&test_config());
989        let ih = make_hash_n(8);
990        let storage = make_storage(100);
991        backend.register(ih, storage);
992
993        let data = vec![55u8; 50];
994        // Write buffered (not flushed)
995        backend
996            .write_chunk(ih, 0, 0, Bytes::from(data), false)
997            .unwrap();
998        assert!(backend.stats().write_buffer_bytes > 0);
999
1000        backend.clear_piece(ih, 0);
1001        assert_eq!(backend.stats().write_buffer_bytes, 0);
1002    }
1003
1004    #[test]
1005    fn posix_cached_pieces() {
1006        let backend = PosixDiskIo::new(&test_config());
1007        let ih = make_hash_n(9);
1008        // Two pieces: piece 0 and piece 1
1009        let storage = make_storage_full(100, 50, 25);
1010        backend.register(ih, storage);
1011
1012        let data = vec![1u8; 25];
1013        backend
1014            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
1015            .unwrap();
1016        backend
1017            .write_chunk(ih, 1, 0, Bytes::from(data), true)
1018            .unwrap();
1019
1020        // Read piece 0 twice to promote to T2 (hot_pieces requires ≥2 accesses)
1021        backend.read_chunk(ih, 0, 0, 25, false).unwrap();
1022        backend.read_chunk(ih, 0, 0, 25, false).unwrap();
1023
1024        let cached = backend.cached_pieces(ih);
1025        assert!(cached.contains(&0));
1026        // piece 1 was not read, should not be cached
1027        assert!(!cached.contains(&1));
1028    }
1029
1030    // -----------------------------------------------------------------------
1031    // MmapDiskIo tests
1032    // -----------------------------------------------------------------------
1033
1034    #[test]
1035    fn mmap_backend_name() {
1036        let backend = MmapDiskIo::new();
1037        assert_eq!(backend.name(), "mmap");
1038    }
1039
1040    #[test]
1041    fn mmap_write_and_read() {
1042        let backend = MmapDiskIo::new();
1043        let ih = make_hash_n(10);
1044        let storage = make_storage(100);
1045        backend.register(ih, storage);
1046
1047        let data = vec![42u8; 50];
1048        backend
1049            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
1050            .unwrap();
1051        let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1052        assert_eq!(&read[..], &data[..]);
1053    }
1054
1055    #[test]
1056    fn mmap_cached_pieces_always_empty() {
1057        let backend = MmapDiskIo::new();
1058        let ih = make_hash_n(11);
1059        let storage = make_storage(100);
1060        backend.register(ih, storage);
1061
1062        backend
1063            .write_chunk(ih, 0, 0, Bytes::from(vec![1u8; 50]), false)
1064            .unwrap();
1065        backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1066
1067        assert!(backend.cached_pieces(ih).is_empty());
1068    }
1069
1070    #[test]
1071    fn mmap_stats_track_io() {
1072        let backend = MmapDiskIo::new();
1073        let ih = make_hash_n(12);
1074        let storage = make_storage(100);
1075        backend.register(ih, storage);
1076
1077        backend
1078            .write_chunk(ih, 0, 0, Bytes::from(vec![1u8; 50]), false)
1079            .unwrap();
1080        let stats = backend.stats();
1081        assert_eq!(stats.write_bytes, 50);
1082
1083        backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1084        let stats = backend.stats();
1085        assert_eq!(stats.read_bytes, 50);
1086    }
1087
1088    // -----------------------------------------------------------------------
1089    // read_piece tests
1090    // -----------------------------------------------------------------------
1091
1092    #[test]
1093    fn read_piece_returns_full_piece() {
1094        let backend = PosixDiskIo::new(&test_config());
1095        let ih = make_hash_n(20);
1096        let chunk_size = 16384u32;
1097        let piece_size = u64::from(chunk_size) * 2; // 32768 — two chunks
1098        let storage = make_storage_full(piece_size, piece_size, chunk_size);
1099        backend.register(ih, storage);
1100
1101        // Write two chunks via the backend (buffered, not flushed)
1102        let chunk0 = vec![0xAAu8; chunk_size as usize];
1103        let chunk1 = vec![0xBBu8; chunk_size as usize];
1104        backend
1105            .write_chunk(ih, 0, 0, Bytes::from(chunk0.clone()), false)
1106            .expect("write chunk 0");
1107        backend
1108            .write_chunk(ih, 0, chunk_size, Bytes::from(chunk1.clone()), false)
1109            .expect("write chunk 1");
1110
1111        // read_piece should flush the buffer pool and return the full piece
1112        let piece_data = backend.read_piece(ih, 0).expect("read_piece");
1113        assert_eq!(piece_data.len(), piece_size as usize);
1114        assert_eq!(&piece_data[..chunk_size as usize], &chunk0[..]);
1115        assert_eq!(&piece_data[chunk_size as usize..], &chunk1[..]);
1116
1117        // Stats should reflect the read
1118        let stats = backend.stats();
1119        assert!(stats.read_bytes >= piece_size);
1120    }
1121
1122    // -----------------------------------------------------------------------
1123    // Hash-from-cache tests
1124    // -----------------------------------------------------------------------
1125
1126    #[test]
1127    fn posix_hash_from_cache_pass() {
1128        let backend = PosixDiskIo::new(&test_config());
1129        let ih = make_hash_n(30);
1130        let storage = make_storage(100);
1131        backend.register(ih, storage);
1132
1133        // Write two blocks into the buffer pool (not flushed)
1134        let d1 = vec![0xAA_u8; 50];
1135        let d2 = vec![0xBB_u8; 50];
1136        backend
1137            .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
1138            .unwrap();
1139        backend
1140            .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
1141            .unwrap();
1142
1143        // Compute expected hash of the full piece
1144        let mut full = d1;
1145        full.extend_from_slice(&d2);
1146        let expected = irontide_core::sha1(&full);
1147
1148        // hash_piece should hash from cache, write to disk, and promote to Cached
1149        assert!(backend.hash_piece(ih, 0, &expected).unwrap());
1150
1151        // The piece should now be readable from disk via read_piece
1152        let piece_data = backend.read_piece(ih, 0).unwrap();
1153        assert_eq!(piece_data.len(), 100);
1154
1155        // Read again to promote from T1 to T2 (hot_pieces requires ≥2 accesses)
1156        backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1157
1158        // The piece should now be in the hot pieces list (T2)
1159        let cached = backend.cached_pieces(ih);
1160        assert!(cached.contains(&0));
1161    }
1162
1163    #[test]
1164    fn posix_hash_from_cache_fail() {
1165        let backend = PosixDiskIo::new(&test_config());
1166        let ih = make_hash_n(31);
1167        let storage = make_storage(50);
1168        backend.register(ih, storage);
1169
1170        // Write one block into buffer pool
1171        backend
1172            .write_chunk(ih, 0, 0, Bytes::from(vec![0xCC_u8; 50]), false)
1173            .unwrap();
1174
1175        // Wrong hash — should fail, data discarded from pool
1176        let wrong = Id20([0xFF; 20]);
1177        assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
1178
1179        // Buffer pool should no longer hold the data
1180        assert_eq!(backend.stats().write_buffer_bytes, 0);
1181    }
1182
1183    // -----------------------------------------------------------------------
1184    // Factory tests
1185    // -----------------------------------------------------------------------
1186
1187    #[test]
1188    fn factory_creates_mmap_for_mmap_mode() {
1189        let config = DiskConfig {
1190            storage_mode: irontide_core::StorageMode::Mmap,
1191            ..DiskConfig::default()
1192        };
1193        let backend = create_backend_from_config(&config);
1194        assert_eq!(backend.name(), "mmap");
1195    }
1196
1197    #[test]
1198    fn factory_creates_posix_for_auto_mode() {
1199        let config = DiskConfig {
1200            storage_mode: irontide_core::StorageMode::Auto,
1201            ..DiskConfig::default()
1202        };
1203        let backend = create_backend_from_config(&config);
1204        assert_eq!(backend.name(), "posix");
1205    }
1206
1207    // -----------------------------------------------------------------------
1208    // Integration tests — full buffer pool lifecycle through PosixDiskIo
1209    // -----------------------------------------------------------------------
1210
1211    #[test]
1212    fn piece_completion_flows_to_hash_pool() {
1213        let backend = PosixDiskIo::new(&test_config());
1214        let ih = make_hash_n(40);
1215        let storage = make_storage(100);
1216        backend.register(ih, storage);
1217
1218        // Write two blocks (not flushed — buffered in pool)
1219        let d1 = vec![0xAA; 50];
1220        let d2 = vec![0xBB; 50];
1221        backend
1222            .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
1223            .expect("write block 0");
1224        backend
1225            .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
1226            .expect("write block 1");
1227
1228        let mut full = d1;
1229        full.extend_from_slice(&d2);
1230        let expected_hash = irontide_core::sha1(&full);
1231
1232        // Hash from cache should pass
1233        assert!(
1234            backend
1235                .hash_piece(ih, 0, &expected_hash)
1236                .expect("hash_piece"),
1237            "hash should match"
1238        );
1239
1240        // Data should now be on disk (read_piece flushes then reads from storage)
1241        let piece = backend.read_piece(ih, 0).expect("read_piece after hash");
1242        assert_eq!(piece, full);
1243
1244        // After hash_piece, the piece was promoted to Cached. A second read
1245        // promotes it from T1 to T2, making it appear in hot_pieces.
1246        let _ = backend
1247            .read_chunk(ih, 0, 0, 100, false)
1248            .expect("second read");
1249        let cached = backend.cached_pieces(ih);
1250        assert!(
1251            cached.contains(&0),
1252            "piece 0 should be in hot pieces after two accesses"
1253        );
1254    }
1255
1256    #[test]
1257    fn back_pressure_flush_reaches_disk() {
1258        // Use a tiny buffer pool (64 KiB) to force eviction under pressure.
1259        let config = DiskConfig {
1260            buffer_pool_capacity: 64 * 1024,
1261            ..test_config()
1262        };
1263        let backend = PosixDiskIo::new(&config);
1264        let ih = make_hash_n(41);
1265        let piece_size = 16384_u64;
1266        let storage = make_storage_full(piece_size * 10, piece_size, 16384);
1267        backend.register(ih, storage);
1268
1269        // Write 8 pieces, flushing each before the next. This exercises the
1270        // back-pressure path: the pool never exceeds its budget because we
1271        // flush proactively, and every piece reaches disk.
1272        for p in 0..8_u32 {
1273            let data = vec![p as u8; piece_size as usize];
1274            backend
1275                .write_chunk(ih, p, 0, Bytes::from(data), false)
1276                .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1277            backend
1278                .flush_piece(ih, p)
1279                .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1280        }
1281
1282        // All 8 pieces should be on disk and readable.
1283        for p in 0..8_u32 {
1284            let piece = backend
1285                .read_piece(ih, p)
1286                .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1287            assert_eq!(piece.len(), piece_size as usize);
1288            assert!(
1289                piece.iter().all(|&b| b == p as u8),
1290                "piece {p} data mismatch"
1291            );
1292        }
1293
1294        // Also verify: writing beyond budget without flushing causes eviction.
1295        // Write 6 more pieces without flushing (96 KiB > 64 KiB).
1296        for p in 0..6_u32 {
1297            let data = vec![(p + 10) as u8; piece_size as usize];
1298            backend
1299                .write_chunk(ih, p, 0, Bytes::from(data), false)
1300                .unwrap_or_else(|e| panic!("overwrite piece {p}: {e}"));
1301        }
1302        let stats = backend.stats();
1303        assert!(
1304            stats.skeleton_count > 0 || stats.eviction_count > 0,
1305            "should see evictions when exceeding budget: skeleton={}, eviction={}",
1306            stats.skeleton_count,
1307            stats.eviction_count
1308        );
1309    }
1310
1311    #[test]
1312    fn prefetch_then_suggest_then_serve() {
1313        let backend = PosixDiskIo::new(&test_config());
1314        let ih = make_hash_n(42);
1315        let storage = make_storage(100);
1316        backend.register(ih, storage);
1317
1318        // Write data and flush to disk first (simulating a completed piece on disk).
1319        let data = vec![0xCC; 100];
1320        backend
1321            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
1322            .expect("write flushed");
1323
1324        // Read piece (triggers prefetch into cache — cache miss path).
1325        let r1 = backend
1326            .read_chunk(ih, 0, 0, 100, false)
1327            .expect("first read");
1328        assert_eq!(&r1[..], &data[..]);
1329
1330        // Second read promotes to T2 in ARC.
1331        let r2 = backend
1332            .read_chunk(ih, 0, 0, 100, false)
1333            .expect("second read");
1334        assert_eq!(&r2[..], &data[..]);
1335
1336        // Should appear in hot pieces now (T2).
1337        let cached = backend.cached_pieces(ih);
1338        assert!(cached.contains(&0), "piece 0 should be hot after two reads");
1339
1340        // Third read should be a pure cache hit.
1341        let s1 = backend.stats();
1342        let _ = backend
1343            .read_chunk(ih, 0, 0, 100, false)
1344            .expect("third read");
1345        let s2 = backend.stats();
1346        assert!(
1347            s2.cache_hits > s1.cache_hits,
1348            "third read should be a cache hit: before={}, after={}",
1349            s1.cache_hits,
1350            s2.cache_hits
1351        );
1352    }
1353
1354    #[test]
1355    fn full_download_verify_cycle() {
1356        let backend = PosixDiskIo::new(&test_config());
1357        let ih = make_hash_n(43);
1358        let chunk = 16384_u32;
1359        let piece_len = u64::from(chunk) * 4; // 4 blocks per piece
1360        let total = piece_len * 3; // 3 pieces
1361        let storage = make_storage_full(total, piece_len, chunk);
1362        backend.register(ih, storage);
1363
1364        // Download 3 pieces: write all blocks then hash from cache.
1365        for p in 0..3_u32 {
1366            let mut full_piece = Vec::new();
1367            for b in 0..4_u32 {
1368                let block = vec![(p * 4 + b) as u8; chunk as usize];
1369                full_piece.extend_from_slice(&block);
1370                backend
1371                    .write_chunk(ih, p, b * chunk, Bytes::from(block), false)
1372                    .unwrap_or_else(|e| panic!("write piece {p} block {b}: {e}"));
1373            }
1374            let expected = irontide_core::sha1(&full_piece);
1375            assert!(
1376                backend
1377                    .hash_piece(ih, p, &expected)
1378                    .unwrap_or_else(|e| panic!("hash piece {p}: {e}")),
1379                "piece {p} hash should pass"
1380            );
1381        }
1382
1383        // Verify all 3 pieces on disk.
1384        for p in 0..3_u32 {
1385            let data = backend
1386                .read_piece(ih, p)
1387                .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1388            assert_eq!(data.len(), piece_len as usize);
1389            for b in 0..4_u32 {
1390                let expected_byte = (p * 4 + b) as u8;
1391                let start = (b * chunk) as usize;
1392                let end = start + chunk as usize;
1393                assert!(
1394                    data[start..end].iter().all(|&x| x == expected_byte),
1395                    "piece {p} block {b}: expected 0x{expected_byte:02X}"
1396                );
1397            }
1398        }
1399    }
1400
1401    #[test]
1402    fn skeleton_eviction_then_completion() {
1403        // Tiny pool: 32 KiB — forces Writing-to-Skeleton demotion.
1404        let config = DiskConfig {
1405            buffer_pool_capacity: 32 * 1024,
1406            ..test_config()
1407        };
1408        let backend = PosixDiskIo::new(&config);
1409        let ih = make_hash_n(44);
1410        // 4 pieces of 16 KiB each = 64 KiB > 32 KiB budget.
1411        let storage = make_storage_full(16384 * 4, 16384, 16384);
1412        backend.register(ih, storage);
1413
1414        for p in 0..4_u32 {
1415            let data = vec![p as u8; 16384];
1416            backend
1417                .write_chunk(ih, p, 0, Bytes::from(data), false)
1418                .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1419        }
1420
1421        // Flush and verify all pieces can complete via disk path.
1422        for p in 0..4_u32 {
1423            backend
1424                .flush_piece(ih, p)
1425                .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1426            let piece = backend
1427                .read_piece(ih, p)
1428                .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1429            assert_eq!(piece.len(), 16384);
1430        }
1431
1432        // Verify eviction/skeleton stats reflect memory pressure.
1433        let stats = backend.stats();
1434        assert!(
1435            stats.skeleton_count > 0 || stats.eviction_count > 0,
1436            "should have evictions under pressure: skeleton={}, eviction={}",
1437            stats.skeleton_count,
1438            stats.eviction_count
1439        );
1440    }
1441
1442    #[test]
1443    fn hash_from_cache_fail_no_disk_write() {
1444        let backend = PosixDiskIo::new(&test_config());
1445        let ih = make_hash_n(46);
1446        let storage = make_storage(50);
1447        backend.register(ih, storage);
1448
1449        let data = vec![0xDD; 50];
1450        backend
1451            .write_chunk(ih, 0, 0, Bytes::from(data), false)
1452            .expect("buffered write");
1453
1454        // Hash with wrong expected value — should fail.
1455        let wrong_hash = Id20([0xFF; 20]);
1456        assert!(
1457            !backend
1458                .hash_piece(ih, 0, &wrong_hash)
1459                .expect("hash_piece should not error"),
1460            "hash should fail with wrong expected value"
1461        );
1462
1463        // The piece should NOT be on disk. MemoryStorage is zero-initialized,
1464        // so reading back should yield all zeros (no write occurred).
1465        let piece = backend.read_piece(ih, 0).expect("read_piece");
1466        assert!(
1467            piece.iter().all(|&b| b == 0),
1468            "failed piece should not be written to disk"
1469        );
1470    }
1471
1472    #[test]
1473    fn seeding_throughput_with_cache() {
1474        let backend = PosixDiskIo::new(&test_config());
1475        let ih = make_hash_n(47);
1476        let chunk = 16384_u32;
1477        let piece_len = u64::from(chunk);
1478        let num_pieces = 64_u32;
1479        let storage = make_storage_full(piece_len * u64::from(num_pieces), piece_len, chunk);
1480        backend.register(ih, storage);
1481
1482        // Write all pieces to disk (flush=true simulates completed seeded data).
1483        for p in 0..num_pieces {
1484            let data = vec![p as u8; chunk as usize];
1485            backend
1486                .write_chunk(ih, p, 0, Bytes::from(data), true)
1487                .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1488        }
1489
1490        // First read pass: cache miss + prefetch for each piece.
1491        for p in 0..num_pieces {
1492            backend
1493                .read_chunk(ih, p, 0, chunk, false)
1494                .unwrap_or_else(|e| panic!("first read piece {p}: {e}"));
1495        }
1496
1497        // Second read pass: should be mostly cache hits.
1498        let stats_before = backend.stats();
1499        for p in 0..num_pieces {
1500            backend
1501                .read_chunk(ih, p, 0, chunk, false)
1502                .unwrap_or_else(|e| panic!("second read piece {p}: {e}"));
1503        }
1504        let stats_after = backend.stats();
1505
1506        let hits = stats_after
1507            .cache_hits
1508            .saturating_sub(stats_before.cache_hits);
1509        let total = u64::from(num_pieces);
1510        let hit_rate = hits as f64 / total as f64;
1511
1512        // Should have >80% cache hit rate with 64 x 16 KiB pieces in 1 MiB cache.
1513        assert!(
1514            hit_rate > 0.8,
1515            "cache hit rate {hit_rate:.1} should be >80% (hits={hits}, total={total})"
1516        );
1517    }
1518
1519    // -----------------------------------------------------------------------
1520    // write_block_direct tests
1521    // -----------------------------------------------------------------------
1522
1523    #[test]
1524    fn write_block_direct_contiguous() {
1525        let backend = PosixDiskIo::new(&test_config());
1526        let ih = make_hash_n(60);
1527        let storage = make_storage(100);
1528        backend.register(ih, Arc::clone(&storage));
1529
1530        // Common case: all data in s0, empty s1.
1531        let data = vec![0xCDu8; 50];
1532        backend.write_block_direct(ih, 0, 0, &data, &[]).unwrap();
1533
1534        let read = storage.read_chunk(0, 0, 50).unwrap();
1535        assert_eq!(read, data);
1536
1537        // Stats should reflect the write.
1538        let stats = backend.stats();
1539        assert_eq!(stats.write_bytes, 50);
1540    }
1541
1542    #[test]
1543    fn write_block_direct_split() {
1544        let backend = PosixDiskIo::new(&test_config());
1545        let ih = make_hash_n(61);
1546        let storage = make_storage(100);
1547        backend.register(ih, Arc::clone(&storage));
1548
1549        // Ring-buffer wrap: 30 bytes in s0, 20 bytes in s1.
1550        let s0: Vec<u8> = (0..30).collect();
1551        let s1: Vec<u8> = (30..50).collect();
1552        backend.write_block_direct(ih, 0, 10, &s0, &s1).unwrap();
1553
1554        let read = storage.read_chunk(0, 10, 50).unwrap();
1555        let expected: Vec<u8> = (0..50).collect();
1556        assert_eq!(read, expected);
1557
1558        let stats = backend.stats();
1559        assert_eq!(stats.write_bytes, 50);
1560    }
1561}