Skip to main content

irontide_engine/
disk_backend.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_precision_loss,
4    clippy::cast_possible_wrap,
5    clippy::cast_sign_loss,
6    reason = "M175: disk I/O sizes bounded by piece_length (u32 by construction in Lengths::new); offset arithmetic stays within file lengths"
7)]
8
9//! Pluggable disk I/O backend abstraction.
10//!
11//! The [`DiskIoBackend`] trait defines the interface for session-level disk I/O,
12//! allowing custom storage backends (POSIX, disabled/null, etc.).
13//! [`DisabledDiskIo`] is a no-op backend useful for network throughput benchmarking.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::{Mutex, RwLock};
19
20use bytes::Bytes;
21use irontide_core::{Id20, Id32};
22use irontide_storage::TorrentStorage;
23
24use crate::buffer_pool::BufferPool;
25use crate::disk::DiskConfig;
26
27/// Aggregate I/O statistics for a disk backend.
28#[derive(Debug, Clone, Default)]
29pub struct DiskIoStats {
30    /// Total bytes read from storage.
31    pub read_bytes: u64,
32    /// Total bytes written to storage.
33    pub write_bytes: u64,
34    /// Number of read requests satisfied from cache.
35    pub cache_hits: u64,
36    /// Number of read requests that required disk access.
37    pub cache_misses: u64,
38    /// Current size of the write buffer in bytes.
39    pub write_buffer_bytes: usize,
40    /// Current size of the read cache in bytes (M102).
41    pub read_cache_bytes: usize,
42    /// Total number of entries in the buffer pool (M102).
43    pub pool_entries: usize,
44    /// Number of prefetch insertions into the read cache (M102).
45    pub prefetch_count: u64,
46    /// Number of ARC evictions from the read cache (M102).
47    pub eviction_count: u64,
48    /// Number of Writing-to-Skeleton demotions (M102).
49    pub skeleton_count: u64,
50}
51
52/// Trait for pluggable disk I/O backends.
53///
54/// Implementations are shared across the session via `Arc<dyn DiskIoBackend>`.
55/// All methods must be safe to call from multiple threads concurrently.
56pub trait DiskIoBackend: Send + Sync {
57    /// Human-readable backend name (e.g., "posix", "disabled").
58    fn name(&self) -> &str;
59
60    /// Register a torrent's storage for I/O operations.
61    fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>);
62
63    /// Unregister a torrent's storage.
64    fn unregister(&self, info_hash: Id20);
65
66    /// Write a chunk of piece data. If `flush` is true, persist immediately.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the underlying storage write fails.
71    fn write_chunk(
72        &self,
73        info_hash: Id20,
74        piece: u32,
75        begin: u32,
76        data: Bytes,
77        flush: bool,
78    ) -> crate::Result<()>;
79
80    /// Read a chunk of piece data. `volatile` hints the data won't be re-read soon.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the underlying storage read fails.
85    fn read_chunk(
86        &self,
87        info_hash: Id20,
88        piece: u32,
89        begin: u32,
90        length: u32,
91        volatile: bool,
92    ) -> crate::Result<Bytes>;
93
94    /// Read an entire piece from storage, returning its raw bytes.
95    ///
96    /// For backends with a write buffer, the piece is flushed before reading.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the underlying storage read fails.
101    fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>>;
102
103    /// Verify a piece against its expected SHA-1 hash.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the underlying storage read fails.
108    fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool>;
109
110    /// Verify a piece against its expected SHA-256 hash (BEP 52).
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the underlying storage read fails.
115    fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool>;
116
117    /// Compute SHA-256 hash of a single block within a piece (BEP 52 Merkle).
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the underlying storage read fails.
122    fn hash_block(
123        &self,
124        info_hash: Id20,
125        piece: u32,
126        begin: u32,
127        length: u32,
128    ) -> crate::Result<Id32>;
129
130    /// Discard buffered data for a piece (e.g., after hash failure).
131    fn clear_piece(&self, info_hash: Id20, piece: u32);
132
133    /// Flush buffered writes for a piece to persistent storage.
134    ///
135    /// # Errors
136    ///
137    /// Returns an error if the underlying storage flush fails.
138    fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()>;
139
140    /// Flush all buffered writes across all torrents.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the underlying storage flush fails.
145    fn flush_all(&self) -> crate::Result<()>;
146
147    /// Return piece indices currently held in cache (for `SuggestPiece`, M44).
148    fn cached_pieces(&self, info_hash: Id20) -> Vec<u32>;
149
150    /// Return current I/O statistics.
151    fn stats(&self) -> DiskIoStats;
152
153    /// Write a block directly to storage from two slices (vectored write).
154    ///
155    /// Bypasses the buffer pool entirely — data goes straight to the
156    /// underlying `TorrentStorage`. The two slices represent contiguous
157    /// block data split across a ring-buffer wrap boundary.
158    ///
159    /// This is the zero-copy direct-pwrite path used by peer tasks.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the underlying storage write fails.
164    fn write_block_direct(
165        &self,
166        info_hash: Id20,
167        piece: u32,
168        begin: u32,
169        s0: &[u8],
170        s1: &[u8],
171    ) -> crate::Result<()>;
172}
173
174/// No-op disk I/O backend for network throughput benchmarking.
175///
176/// All writes succeed silently, reads return zeroed bytes, and hash
177/// verifications always pass.
178pub struct DisabledDiskIo;
179
180impl DiskIoBackend for DisabledDiskIo {
181    fn name(&self) -> &'static str {
182        "disabled"
183    }
184
185    fn register(&self, _info_hash: Id20, _storage: Arc<dyn TorrentStorage>) {}
186
187    fn unregister(&self, _info_hash: Id20) {}
188
189    fn write_chunk(
190        &self,
191        _info_hash: Id20,
192        _piece: u32,
193        _begin: u32,
194        _data: Bytes,
195        _flush: bool,
196    ) -> crate::Result<()> {
197        Ok(())
198    }
199
200    fn read_chunk(
201        &self,
202        _info_hash: Id20,
203        _piece: u32,
204        _begin: u32,
205        length: u32,
206        _volatile: bool,
207    ) -> crate::Result<Bytes> {
208        Ok(Bytes::from(vec![0u8; length as usize]))
209    }
210
211    fn read_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<Vec<u8>> {
212        Ok(Vec::new())
213    }
214
215    fn hash_piece(&self, _info_hash: Id20, _piece: u32, _expected: &Id20) -> crate::Result<bool> {
216        Ok(true)
217    }
218
219    fn hash_piece_v2(
220        &self,
221        _info_hash: Id20,
222        _piece: u32,
223        _expected: &Id32,
224    ) -> crate::Result<bool> {
225        Ok(true)
226    }
227
228    fn hash_block(
229        &self,
230        _info_hash: Id20,
231        _piece: u32,
232        _begin: u32,
233        _length: u32,
234    ) -> crate::Result<Id32> {
235        Ok(Id32([0u8; 32]))
236    }
237
238    fn clear_piece(&self, _info_hash: Id20, _piece: u32) {}
239
240    fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
241        Ok(())
242    }
243
244    fn flush_all(&self) -> crate::Result<()> {
245        Ok(())
246    }
247
248    fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
249        vec![]
250    }
251
252    fn stats(&self) -> DiskIoStats {
253        DiskIoStats::default()
254    }
255
256    fn write_block_direct(
257        &self,
258        _info_hash: Id20,
259        _piece: u32,
260        _begin: u32,
261        _s0: &[u8],
262        _s1: &[u8],
263    ) -> crate::Result<()> {
264        Ok(())
265    }
266}
267
268// ---------------------------------------------------------------------------
269// PosixDiskIo — unified BufferPool backend
270// ---------------------------------------------------------------------------
271
272/// POSIX disk I/O backend with unified buffer pool (combined read cache +
273/// write buffer under a single byte budget).
274///
275/// Replaces the previous separate `ArcCache` + `WriteBuffer` design with
276/// a single [`BufferPool`] that uses a byte-budget ARC for read caching and
277/// tracks in-flight writes as `Writing` entries.
278pub struct PosixDiskIo {
279    storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
280    pool: Mutex<BufferPool>,
281    stats: Mutex<DiskIoStats>,
282}
283
284impl PosixDiskIo {
285    /// Create a new POSIX disk I/O backend with the given configuration.
286    #[must_use]
287    pub fn new(config: &DiskConfig) -> Self {
288        let mut pool = BufferPool::with_capacity(config.buffer_pool_capacity);
289        pool.set_mlock(config.enable_mlock);
290        Self {
291            storages: RwLock::new(HashMap::new()),
292            pool: Mutex::new(pool),
293            stats: Mutex::new(DiskIoStats::default()),
294        }
295    }
296
297    fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
298        self.storages
299            .read()
300            .get(&info_hash)
301            .cloned()
302            .ok_or_else(|| {
303                crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
304                    std::io::ErrorKind::NotFound,
305                    "torrent not registered",
306                )))
307            })
308    }
309}
310
311impl DiskIoBackend for PosixDiskIo {
312    fn name(&self) -> &'static str {
313        "posix"
314    }
315
316    fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
317        self.storages.write().insert(info_hash, storage);
318    }
319
320    fn unregister(&self, info_hash: Id20) {
321        self.storages.write().remove(&info_hash);
322        self.pool.lock().clear_torrent(info_hash);
323    }
324
325    fn write_chunk(
326        &self,
327        info_hash: Id20,
328        piece: u32,
329        begin: u32,
330        data: Bytes,
331        flush: bool,
332    ) -> crate::Result<()> {
333        let len = data.len();
334
335        if flush {
336            let storage = self.get_storage(info_hash)?;
337            storage.write_chunk(piece, begin, &data)?;
338            self.stats.lock().write_bytes += len as u64;
339            return Ok(());
340        }
341
342        // Buffer the write in the pool. Pass piece_size=0 since we don't know
343        // it here — completion detection is handled by ChunkTracker in
344        // torrent.rs, not by the buffer pool.
345        let key = (info_hash, piece);
346        self.pool.lock().write_block(key, begin, data, 0);
347        self.stats.lock().write_bytes += len as u64;
348        Ok(())
349    }
350
351    fn read_chunk(
352        &self,
353        info_hash: Id20,
354        piece: u32,
355        begin: u32,
356        length: u32,
357        volatile: bool,
358    ) -> crate::Result<Bytes> {
359        let key = (info_hash, piece);
360
361        // 1. Check buffer pool (Writing entries + Cached entries)
362        if !volatile {
363            let mut pool = self.pool.lock();
364            if let Some(data) = pool.read_block(key, begin, length as usize) {
365                drop(pool);
366                self.stats.lock().cache_hits += 1;
367                return Ok(data);
368            }
369        }
370        self.stats.lock().cache_misses += 1;
371
372        // 2. Read full piece from storage and cache it (unless volatile)
373        let storage = self.get_storage(info_hash)?;
374        if !volatile {
375            if let Ok(piece_data) = storage.read_piece(piece) {
376                let piece_bytes = Bytes::from(piece_data);
377                self.stats.lock().read_bytes += piece_bytes.len() as u64;
378
379                let mut pool = self.pool.lock();
380                pool.prefetch_piece(key, piece_bytes.clone());
381                drop(pool);
382
383                // Return the requested slice
384                let end = (begin as usize).saturating_add(length as usize);
385                if end <= piece_bytes.len() {
386                    return Ok(piece_bytes.slice(begin as usize..end));
387                }
388                // Fall through to chunk read if slice is out of bounds
389                // (shouldn't happen in practice, but defensive)
390            } else {
391                // Fall through to chunk read on piece-level failure
392            }
393        }
394
395        // 3. Volatile path or fallback: read just the requested chunk
396        let data = storage.read_chunk(piece, begin, length)?;
397        let bytes = Bytes::from(data);
398        self.stats.lock().read_bytes += bytes.len() as u64;
399        Ok(bytes)
400    }
401
402    fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
403        self.flush_piece(info_hash, piece)?;
404        let storage = self.get_storage(info_hash)?;
405        let data = storage.read_piece(piece)?;
406        self.stats.lock().read_bytes += data.len() as u64;
407        Ok(data)
408    }
409
410    fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
411        let key = (info_hash, piece);
412
413        // Try hash from cache: take all blocks from the Writing entry and hash
414        // them in memory, avoiding a disk round-trip. This works because
415        // hash_piece is only called after ChunkTracker confirms the piece is
416        // complete, so all blocks should be present.
417        let cached_data = {
418            let mut pool = self.pool.lock();
419            pool.take_all_blocks(key)
420        };
421
422        if let Some(data) = cached_data {
423            let hash = irontide_core::sha1(&data);
424            if hash == *expected {
425                // Hash pass: flush to disk + promote to read cache.
426                let storage = self.get_storage(info_hash)?;
427                storage.write_chunk(piece, 0, &data)?;
428                self.stats.lock().write_bytes += data.len() as u64;
429
430                let mut pool = self.pool.lock();
431                pool.promote_to_cached(key, Bytes::from(data));
432                return Ok(true);
433            }
434            // Hash fail: data already removed from pool by take_all_blocks.
435            return Ok(false);
436        }
437
438        // No cached data — flush and verify from disk (legacy path).
439        self.flush_piece(info_hash, piece)?;
440        let storage = self.get_storage(info_hash)?;
441        Ok(storage.verify_piece(piece, expected)?)
442    }
443
444    fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
445        let key = (info_hash, piece);
446
447        // Try hash from cache (same pattern as hash_piece but SHA-256).
448        let cached_data = {
449            let mut pool = self.pool.lock();
450            pool.take_all_blocks(key)
451        };
452
453        if let Some(data) = cached_data {
454            let hash = irontide_core::sha256(&data);
455            if hash == *expected {
456                let storage = self.get_storage(info_hash)?;
457                storage.write_chunk(piece, 0, &data)?;
458                self.stats.lock().write_bytes += data.len() as u64;
459
460                let mut pool = self.pool.lock();
461                pool.promote_to_cached(key, Bytes::from(data));
462                return Ok(true);
463            }
464            return Ok(false);
465        }
466
467        self.flush_piece(info_hash, piece)?;
468        let storage = self.get_storage(info_hash)?;
469        Ok(storage.verify_piece_v2(piece, expected)?)
470    }
471
472    fn hash_block(
473        &self,
474        info_hash: Id20,
475        piece: u32,
476        begin: u32,
477        length: u32,
478    ) -> crate::Result<Id32> {
479        self.flush_piece(info_hash, piece)?;
480        let storage = self.get_storage(info_hash)?;
481        Ok(storage.hash_block(piece, begin, length)?)
482    }
483
484    fn clear_piece(&self, info_hash: Id20, piece: u32) {
485        self.pool.lock().clear_piece((info_hash, piece));
486    }
487
488    fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()> {
489        let blocks = {
490            let mut pool = self.pool.lock();
491            match pool.flush_piece((info_hash, piece)) {
492                Some(b) => b,
493                None => return Ok(()),
494            }
495        };
496        let storage = self.get_storage(info_hash)?;
497        for (begin, data) in blocks {
498            storage.write_chunk(piece, begin, &data)?;
499        }
500        Ok(())
501    }
502
503    fn flush_all(&self) -> crate::Result<()> {
504        let all_blocks = {
505            let mut pool = self.pool.lock();
506            pool.flush_all()
507        };
508        for ((info_hash, piece), blocks) in all_blocks {
509            let storage = self.get_storage(info_hash)?;
510            for (begin, data) in blocks {
511                storage.write_chunk(piece, begin, &data)?;
512            }
513        }
514        Ok(())
515    }
516
517    fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
518        let pool = self.pool.lock();
519        pool.hot_pieces(info_hash)
520    }
521
522    fn stats(&self) -> DiskIoStats {
523        let mut s = self.stats.lock().clone();
524        let pool = self.pool.lock();
525        let ps = pool.stats();
526        s.write_buffer_bytes = ps.write_buffer_bytes;
527        s.read_cache_bytes = ps.read_cache_bytes;
528        s.pool_entries = ps.total_entries;
529        s.prefetch_count = ps.prefetch_count;
530        s.eviction_count = ps.eviction_count;
531        s.skeleton_count = ps.skeleton_count;
532        s
533    }
534
535    fn write_block_direct(
536        &self,
537        info_hash: Id20,
538        piece: u32,
539        begin: u32,
540        s0: &[u8],
541        s1: &[u8],
542    ) -> crate::Result<()> {
543        let storage = self.get_storage(info_hash)?;
544        storage.write_chunk_vectored(piece, begin, s0, s1)?;
545        let total = (s0.len() + s1.len()) as u64;
546        self.stats.lock().write_bytes += total;
547        Ok(())
548    }
549}
550
551// ---------------------------------------------------------------------------
552// Factory
553// ---------------------------------------------------------------------------
554
555/// Create the disk I/O backend.
556///
557/// M257a: the universal pwritev path ([`PosixDiskIo`]) is the only production
558/// backend — the optional mmap / `io_uring` / IOCP backends were deleted
559/// (decision record: `docs/audits/2026-06-11-reference-architecture-gap-review.md`,
560/// decision addendum). The `TorrentStorage` trait remains the seam for the
561/// in-memory test backend.
562#[must_use]
563pub fn create_backend_from_config(config: &DiskConfig) -> Arc<dyn DiskIoBackend> {
564    Arc::new(PosixDiskIo::new(config))
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570    use irontide_core::Lengths;
571    use irontide_storage::MemoryStorage;
572
573    fn make_hash() -> Id20 {
574        Id20([0xAB; 20])
575    }
576
577    fn make_hash_n(n: u8) -> Id20 {
578        let mut b = [0u8; 20];
579        b[0] = n;
580        Id20(b)
581    }
582
583    fn test_config() -> DiskConfig {
584        DiskConfig {
585            io_threads: 2,
586            cache_size: 1024 * 1024, // 1 MiB
587            ..DiskConfig::default()
588        }
589    }
590
591    /// Create a `MemoryStorage` with a single piece of the given size.
592    fn make_storage(piece_size: u64) -> Arc<dyn TorrentStorage> {
593        let chunk = piece_size.min(16384) as u32;
594        let lengths = Lengths::new(piece_size, piece_size, chunk);
595        Arc::new(MemoryStorage::new(lengths))
596    }
597
598    /// Create a `MemoryStorage` with the given total/piece/chunk sizes.
599    fn make_storage_full(total: u64, piece_len: u64, chunk_size: u32) -> Arc<dyn TorrentStorage> {
600        let lengths = Lengths::new(total, piece_len, chunk_size);
601        Arc::new(MemoryStorage::new(lengths))
602    }
603
604    // -----------------------------------------------------------------------
605    // DisabledDiskIo tests
606    // -----------------------------------------------------------------------
607
608    #[test]
609    fn disabled_backend_name() {
610        let backend = DisabledDiskIo;
611        assert_eq!(backend.name(), "disabled");
612    }
613
614    #[test]
615    fn disabled_backend_write_succeeds() {
616        let backend = DisabledDiskIo;
617        let result =
618            backend.write_chunk(make_hash(), 0, 0, Bytes::from_static(&[1, 2, 3, 4]), false);
619        assert!(result.is_ok());
620    }
621
622    #[test]
623    fn disabled_backend_read_returns_zeroed() {
624        let backend = DisabledDiskIo;
625        let length = 16384u32;
626        let data = backend
627            .read_chunk(make_hash(), 0, 0, length, false)
628            .unwrap();
629        assert_eq!(data.len(), length as usize);
630        assert!(data.iter().all(|&b| b == 0));
631    }
632
633    #[test]
634    fn disabled_backend_hash_always_passes() {
635        let backend = DisabledDiskIo;
636        let expected = Id20([0xFF; 20]);
637        let result = backend.hash_piece(make_hash(), 42, &expected).unwrap();
638        assert!(result);
639    }
640
641    #[test]
642    fn disabled_backend_hash_v2_always_passes() {
643        let backend = DisabledDiskIo;
644        let expected = Id32([0xFF; 32]);
645        let result = backend.hash_piece_v2(make_hash(), 42, &expected).unwrap();
646        assert!(result);
647    }
648
649    #[test]
650    fn disabled_backend_stats_default() {
651        let backend = DisabledDiskIo;
652        let stats = backend.stats();
653        assert_eq!(stats.read_bytes, 0);
654        assert_eq!(stats.write_bytes, 0);
655        assert_eq!(stats.cache_hits, 0);
656        assert_eq!(stats.cache_misses, 0);
657        assert_eq!(stats.write_buffer_bytes, 0);
658    }
659
660    #[test]
661    fn disabled_backend_cached_pieces_empty() {
662        let backend = DisabledDiskIo;
663        let pieces = backend.cached_pieces(make_hash());
664        assert!(pieces.is_empty());
665    }
666
667    // -----------------------------------------------------------------------
668    // PosixDiskIo tests
669    // -----------------------------------------------------------------------
670
671    #[test]
672    fn posix_backend_name() {
673        let backend = PosixDiskIo::new(&test_config());
674        assert_eq!(backend.name(), "posix");
675    }
676
677    #[test]
678    fn posix_register_unregister() {
679        let backend = PosixDiskIo::new(&test_config());
680        let ih = make_hash_n(1);
681        let storage = make_storage(100);
682        backend.register(ih, storage);
683
684        // Should be able to read (empty data)
685        assert!(backend.read_chunk(ih, 0, 0, 10, false).is_ok());
686
687        backend.unregister(ih);
688
689        // Should fail after unregister
690        assert!(backend.read_chunk(ih, 0, 0, 10, false).is_err());
691    }
692
693    #[test]
694    fn posix_write_and_read_flush() {
695        let backend = PosixDiskIo::new(&test_config());
696        let ih = make_hash_n(2);
697        let storage = make_storage(100);
698        backend.register(ih, storage);
699
700        let data = vec![42u8; 50];
701        backend
702            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
703            .unwrap();
704        let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
705        assert_eq!(&read[..], &data[..]);
706    }
707
708    #[test]
709    fn posix_write_buffered_then_read() {
710        let backend = PosixDiskIo::new(&test_config());
711        let ih = make_hash_n(3);
712        let storage = make_storage(100);
713        backend.register(ih, storage);
714
715        let data = vec![99u8; 50];
716        // Write without flush — goes to buffer pool
717        backend
718            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
719            .unwrap();
720
721        // Read should find it in buffer pool
722        let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
723        assert_eq!(&read[..], &data[..]);
724
725        // Should be a cache hit (from buffer pool)
726        let stats = backend.stats();
727        assert!(stats.cache_hits >= 1);
728    }
729
730    #[test]
731    fn posix_read_cache_hit() {
732        let backend = PosixDiskIo::new(&test_config());
733        let ih = make_hash_n(4);
734        let storage = make_storage(100);
735        backend.register(ih, storage);
736
737        let data = vec![7u8; 50];
738        backend
739            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
740            .unwrap();
741
742        // First read: cache miss, reads from storage and prefetches
743        let r1 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
744        assert_eq!(&r1[..], &data[..]);
745        let s1 = backend.stats();
746        assert_eq!(s1.cache_misses, 1);
747
748        // Second read: should be cache hit (from prefetched Cached entry)
749        let r2 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
750        assert_eq!(&r2[..], &data[..]);
751        let s2 = backend.stats();
752        assert_eq!(s2.cache_hits, 1);
753    }
754
755    #[test]
756    fn posix_hash_piece_correct() {
757        let backend = PosixDiskIo::new(&test_config());
758        let ih = make_hash_n(5);
759        let storage = make_storage(50);
760        backend.register(ih, storage);
761
762        let data = vec![9u8; 50];
763        backend
764            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
765            .unwrap();
766
767        let expected = irontide_core::sha1(&data);
768        assert!(backend.hash_piece(ih, 0, &expected).unwrap());
769    }
770
771    #[test]
772    fn posix_hash_piece_wrong() {
773        let backend = PosixDiskIo::new(&test_config());
774        let ih = make_hash_n(6);
775        let storage = make_storage(50);
776        backend.register(ih, storage);
777
778        let data = vec![9u8; 50];
779        backend
780            .write_chunk(ih, 0, 0, Bytes::from(data), true)
781            .unwrap();
782
783        let wrong = Id20([0xFF; 20]);
784        assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
785    }
786
787    #[test]
788    fn posix_hash_piece_v2() {
789        let backend = PosixDiskIo::new(&test_config());
790        let ih = make_hash_n(7);
791        let storage = make_storage(16384);
792        backend.register(ih, storage);
793
794        let data = vec![0xABu8; 16384];
795        backend
796            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
797            .unwrap();
798
799        let expected = irontide_core::sha256(&data);
800        assert!(backend.hash_piece_v2(ih, 0, &expected).unwrap());
801    }
802
803    #[test]
804    fn posix_clear_piece_drops_buffer() {
805        let backend = PosixDiskIo::new(&test_config());
806        let ih = make_hash_n(8);
807        let storage = make_storage(100);
808        backend.register(ih, storage);
809
810        let data = vec![55u8; 50];
811        // Write buffered (not flushed)
812        backend
813            .write_chunk(ih, 0, 0, Bytes::from(data), false)
814            .unwrap();
815        assert!(backend.stats().write_buffer_bytes > 0);
816
817        backend.clear_piece(ih, 0);
818        assert_eq!(backend.stats().write_buffer_bytes, 0);
819    }
820
821    #[test]
822    fn posix_cached_pieces() {
823        let backend = PosixDiskIo::new(&test_config());
824        let ih = make_hash_n(9);
825        // Two pieces: piece 0 and piece 1
826        let storage = make_storage_full(100, 50, 25);
827        backend.register(ih, storage);
828
829        let data = vec![1u8; 25];
830        backend
831            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
832            .unwrap();
833        backend
834            .write_chunk(ih, 1, 0, Bytes::from(data), true)
835            .unwrap();
836
837        // Read piece 0 twice to promote to T2 (hot_pieces requires ≥2 accesses)
838        backend.read_chunk(ih, 0, 0, 25, false).unwrap();
839        backend.read_chunk(ih, 0, 0, 25, false).unwrap();
840
841        let cached = backend.cached_pieces(ih);
842        assert!(cached.contains(&0));
843        // piece 1 was not read, should not be cached
844        assert!(!cached.contains(&1));
845    }
846
847    // -----------------------------------------------------------------------
848    // read_piece tests
849    // -----------------------------------------------------------------------
850
851    #[test]
852    fn read_piece_returns_full_piece() {
853        let backend = PosixDiskIo::new(&test_config());
854        let ih = make_hash_n(20);
855        let chunk_size = 16384u32;
856        let piece_size = u64::from(chunk_size) * 2; // 32768 — two chunks
857        let storage = make_storage_full(piece_size, piece_size, chunk_size);
858        backend.register(ih, storage);
859
860        // Write two chunks via the backend (buffered, not flushed)
861        let chunk0 = vec![0xAAu8; chunk_size as usize];
862        let chunk1 = vec![0xBBu8; chunk_size as usize];
863        backend
864            .write_chunk(ih, 0, 0, Bytes::from(chunk0.clone()), false)
865            .expect("write chunk 0");
866        backend
867            .write_chunk(ih, 0, chunk_size, Bytes::from(chunk1.clone()), false)
868            .expect("write chunk 1");
869
870        // read_piece should flush the buffer pool and return the full piece
871        let piece_data = backend.read_piece(ih, 0).expect("read_piece");
872        assert_eq!(piece_data.len(), piece_size as usize);
873        assert_eq!(&piece_data[..chunk_size as usize], &chunk0[..]);
874        assert_eq!(&piece_data[chunk_size as usize..], &chunk1[..]);
875
876        // Stats should reflect the read
877        let stats = backend.stats();
878        assert!(stats.read_bytes >= piece_size);
879    }
880
881    // -----------------------------------------------------------------------
882    // Hash-from-cache tests
883    // -----------------------------------------------------------------------
884
885    #[test]
886    fn posix_hash_from_cache_pass() {
887        let backend = PosixDiskIo::new(&test_config());
888        let ih = make_hash_n(30);
889        let storage = make_storage(100);
890        backend.register(ih, storage);
891
892        // Write two blocks into the buffer pool (not flushed)
893        let d1 = vec![0xAA_u8; 50];
894        let d2 = vec![0xBB_u8; 50];
895        backend
896            .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
897            .unwrap();
898        backend
899            .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
900            .unwrap();
901
902        // Compute expected hash of the full piece
903        let mut full = d1;
904        full.extend_from_slice(&d2);
905        let expected = irontide_core::sha1(&full);
906
907        // hash_piece should hash from cache, write to disk, and promote to Cached
908        assert!(backend.hash_piece(ih, 0, &expected).unwrap());
909
910        // The piece should now be readable from disk via read_piece
911        let piece_data = backend.read_piece(ih, 0).unwrap();
912        assert_eq!(piece_data.len(), 100);
913
914        // Read again to promote from T1 to T2 (hot_pieces requires ≥2 accesses)
915        backend.read_chunk(ih, 0, 0, 50, false).unwrap();
916
917        // The piece should now be in the hot pieces list (T2)
918        let cached = backend.cached_pieces(ih);
919        assert!(cached.contains(&0));
920    }
921
922    #[test]
923    fn posix_hash_from_cache_fail() {
924        let backend = PosixDiskIo::new(&test_config());
925        let ih = make_hash_n(31);
926        let storage = make_storage(50);
927        backend.register(ih, storage);
928
929        // Write one block into buffer pool
930        backend
931            .write_chunk(ih, 0, 0, Bytes::from(vec![0xCC_u8; 50]), false)
932            .unwrap();
933
934        // Wrong hash — should fail, data discarded from pool
935        let wrong = Id20([0xFF; 20]);
936        assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
937
938        // Buffer pool should no longer hold the data
939        assert_eq!(backend.stats().write_buffer_bytes, 0);
940    }
941
942    // -----------------------------------------------------------------------
943    // Factory tests
944    // -----------------------------------------------------------------------
945
946    #[test]
947    fn factory_creates_posix() {
948        let config = DiskConfig::default();
949        let backend = create_backend_from_config(&config);
950        assert_eq!(backend.name(), "posix");
951    }
952
953    // -----------------------------------------------------------------------
954    // Integration tests — full buffer pool lifecycle through PosixDiskIo
955    // -----------------------------------------------------------------------
956
957    #[test]
958    fn piece_completion_flows_to_hash_pool() {
959        let backend = PosixDiskIo::new(&test_config());
960        let ih = make_hash_n(40);
961        let storage = make_storage(100);
962        backend.register(ih, storage);
963
964        // Write two blocks (not flushed — buffered in pool)
965        let d1 = vec![0xAA; 50];
966        let d2 = vec![0xBB; 50];
967        backend
968            .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
969            .expect("write block 0");
970        backend
971            .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
972            .expect("write block 1");
973
974        let mut full = d1;
975        full.extend_from_slice(&d2);
976        let expected_hash = irontide_core::sha1(&full);
977
978        // Hash from cache should pass
979        assert!(
980            backend
981                .hash_piece(ih, 0, &expected_hash)
982                .expect("hash_piece"),
983            "hash should match"
984        );
985
986        // Data should now be on disk (read_piece flushes then reads from storage)
987        let piece = backend.read_piece(ih, 0).expect("read_piece after hash");
988        assert_eq!(piece, full);
989
990        // After hash_piece, the piece was promoted to Cached. A second read
991        // promotes it from T1 to T2, making it appear in hot_pieces.
992        let _ = backend
993            .read_chunk(ih, 0, 0, 100, false)
994            .expect("second read");
995        let cached = backend.cached_pieces(ih);
996        assert!(
997            cached.contains(&0),
998            "piece 0 should be in hot pieces after two accesses"
999        );
1000    }
1001
1002    #[test]
1003    fn back_pressure_flush_reaches_disk() {
1004        // Use a tiny buffer pool (64 KiB) to force eviction under pressure.
1005        let config = DiskConfig {
1006            buffer_pool_capacity: 64 * 1024,
1007            ..test_config()
1008        };
1009        let backend = PosixDiskIo::new(&config);
1010        let ih = make_hash_n(41);
1011        let piece_size = 16384_u64;
1012        let storage = make_storage_full(piece_size * 10, piece_size, 16384);
1013        backend.register(ih, storage);
1014
1015        // Write 8 pieces, flushing each before the next. This exercises the
1016        // back-pressure path: the pool never exceeds its budget because we
1017        // flush proactively, and every piece reaches disk.
1018        for p in 0..8_u32 {
1019            let data = vec![p as u8; piece_size as usize];
1020            backend
1021                .write_chunk(ih, p, 0, Bytes::from(data), false)
1022                .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1023            backend
1024                .flush_piece(ih, p)
1025                .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1026        }
1027
1028        // All 8 pieces should be on disk and readable.
1029        for p in 0..8_u32 {
1030            let piece = backend
1031                .read_piece(ih, p)
1032                .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1033            assert_eq!(piece.len(), piece_size as usize);
1034            assert!(
1035                piece.iter().all(|&b| b == p as u8),
1036                "piece {p} data mismatch"
1037            );
1038        }
1039
1040        // Also verify: writing beyond budget without flushing causes eviction.
1041        // Write 6 more pieces without flushing (96 KiB > 64 KiB).
1042        for p in 0..6_u32 {
1043            let data = vec![(p + 10) as u8; piece_size as usize];
1044            backend
1045                .write_chunk(ih, p, 0, Bytes::from(data), false)
1046                .unwrap_or_else(|e| panic!("overwrite piece {p}: {e}"));
1047        }
1048        let stats = backend.stats();
1049        assert!(
1050            stats.skeleton_count > 0 || stats.eviction_count > 0,
1051            "should see evictions when exceeding budget: skeleton={}, eviction={}",
1052            stats.skeleton_count,
1053            stats.eviction_count
1054        );
1055    }
1056
1057    #[test]
1058    fn prefetch_then_suggest_then_serve() {
1059        let backend = PosixDiskIo::new(&test_config());
1060        let ih = make_hash_n(42);
1061        let storage = make_storage(100);
1062        backend.register(ih, storage);
1063
1064        // Write data and flush to disk first (simulating a completed piece on disk).
1065        let data = vec![0xCC; 100];
1066        backend
1067            .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
1068            .expect("write flushed");
1069
1070        // Read piece (triggers prefetch into cache — cache miss path).
1071        let r1 = backend
1072            .read_chunk(ih, 0, 0, 100, false)
1073            .expect("first read");
1074        assert_eq!(&r1[..], &data[..]);
1075
1076        // Second read promotes to T2 in ARC.
1077        let r2 = backend
1078            .read_chunk(ih, 0, 0, 100, false)
1079            .expect("second read");
1080        assert_eq!(&r2[..], &data[..]);
1081
1082        // Should appear in hot pieces now (T2).
1083        let cached = backend.cached_pieces(ih);
1084        assert!(cached.contains(&0), "piece 0 should be hot after two reads");
1085
1086        // Third read should be a pure cache hit.
1087        let s1 = backend.stats();
1088        let _ = backend
1089            .read_chunk(ih, 0, 0, 100, false)
1090            .expect("third read");
1091        let s2 = backend.stats();
1092        assert!(
1093            s2.cache_hits > s1.cache_hits,
1094            "third read should be a cache hit: before={}, after={}",
1095            s1.cache_hits,
1096            s2.cache_hits
1097        );
1098    }
1099
1100    #[test]
1101    fn full_download_verify_cycle() {
1102        let backend = PosixDiskIo::new(&test_config());
1103        let ih = make_hash_n(43);
1104        let chunk = 16384_u32;
1105        let piece_len = u64::from(chunk) * 4; // 4 blocks per piece
1106        let total = piece_len * 3; // 3 pieces
1107        let storage = make_storage_full(total, piece_len, chunk);
1108        backend.register(ih, storage);
1109
1110        // Download 3 pieces: write all blocks then hash from cache.
1111        for p in 0..3_u32 {
1112            let mut full_piece = Vec::new();
1113            for b in 0..4_u32 {
1114                let block = vec![(p * 4 + b) as u8; chunk as usize];
1115                full_piece.extend_from_slice(&block);
1116                backend
1117                    .write_chunk(ih, p, b * chunk, Bytes::from(block), false)
1118                    .unwrap_or_else(|e| panic!("write piece {p} block {b}: {e}"));
1119            }
1120            let expected = irontide_core::sha1(&full_piece);
1121            assert!(
1122                backend
1123                    .hash_piece(ih, p, &expected)
1124                    .unwrap_or_else(|e| panic!("hash piece {p}: {e}")),
1125                "piece {p} hash should pass"
1126            );
1127        }
1128
1129        // Verify all 3 pieces on disk.
1130        for p in 0..3_u32 {
1131            let data = backend
1132                .read_piece(ih, p)
1133                .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1134            assert_eq!(data.len(), piece_len as usize);
1135            for b in 0..4_u32 {
1136                let expected_byte = (p * 4 + b) as u8;
1137                let start = (b * chunk) as usize;
1138                let end = start + chunk as usize;
1139                assert!(
1140                    data[start..end].iter().all(|&x| x == expected_byte),
1141                    "piece {p} block {b}: expected 0x{expected_byte:02X}"
1142                );
1143            }
1144        }
1145    }
1146
1147    #[test]
1148    fn skeleton_eviction_then_completion() {
1149        // Tiny pool: 32 KiB — forces Writing-to-Skeleton demotion.
1150        let config = DiskConfig {
1151            buffer_pool_capacity: 32 * 1024,
1152            ..test_config()
1153        };
1154        let backend = PosixDiskIo::new(&config);
1155        let ih = make_hash_n(44);
1156        // 4 pieces of 16 KiB each = 64 KiB > 32 KiB budget.
1157        let storage = make_storage_full(16384 * 4, 16384, 16384);
1158        backend.register(ih, storage);
1159
1160        for p in 0..4_u32 {
1161            let data = vec![p as u8; 16384];
1162            backend
1163                .write_chunk(ih, p, 0, Bytes::from(data), false)
1164                .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1165        }
1166
1167        // Flush and verify all pieces can complete via disk path.
1168        for p in 0..4_u32 {
1169            backend
1170                .flush_piece(ih, p)
1171                .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1172            let piece = backend
1173                .read_piece(ih, p)
1174                .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1175            assert_eq!(piece.len(), 16384);
1176        }
1177
1178        // Verify eviction/skeleton stats reflect memory pressure.
1179        let stats = backend.stats();
1180        assert!(
1181            stats.skeleton_count > 0 || stats.eviction_count > 0,
1182            "should have evictions under pressure: skeleton={}, eviction={}",
1183            stats.skeleton_count,
1184            stats.eviction_count
1185        );
1186    }
1187
1188    #[test]
1189    fn hash_from_cache_fail_no_disk_write() {
1190        let backend = PosixDiskIo::new(&test_config());
1191        let ih = make_hash_n(46);
1192        let storage = make_storage(50);
1193        backend.register(ih, storage);
1194
1195        let data = vec![0xDD; 50];
1196        backend
1197            .write_chunk(ih, 0, 0, Bytes::from(data), false)
1198            .expect("buffered write");
1199
1200        // Hash with wrong expected value — should fail.
1201        let wrong_hash = Id20([0xFF; 20]);
1202        assert!(
1203            !backend
1204                .hash_piece(ih, 0, &wrong_hash)
1205                .expect("hash_piece should not error"),
1206            "hash should fail with wrong expected value"
1207        );
1208
1209        // The piece should NOT be on disk. MemoryStorage is zero-initialized,
1210        // so reading back should yield all zeros (no write occurred).
1211        let piece = backend.read_piece(ih, 0).expect("read_piece");
1212        assert!(
1213            piece.iter().all(|&b| b == 0),
1214            "failed piece should not be written to disk"
1215        );
1216    }
1217
1218    #[test]
1219    fn seeding_throughput_with_cache() {
1220        let backend = PosixDiskIo::new(&test_config());
1221        let ih = make_hash_n(47);
1222        let chunk = 16384_u32;
1223        let piece_len = u64::from(chunk);
1224        let num_pieces = 64_u32;
1225        let storage = make_storage_full(piece_len * u64::from(num_pieces), piece_len, chunk);
1226        backend.register(ih, storage);
1227
1228        // Write all pieces to disk (flush=true simulates completed seeded data).
1229        for p in 0..num_pieces {
1230            let data = vec![p as u8; chunk as usize];
1231            backend
1232                .write_chunk(ih, p, 0, Bytes::from(data), true)
1233                .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1234        }
1235
1236        // First read pass: cache miss + prefetch for each piece.
1237        for p in 0..num_pieces {
1238            backend
1239                .read_chunk(ih, p, 0, chunk, false)
1240                .unwrap_or_else(|e| panic!("first read piece {p}: {e}"));
1241        }
1242
1243        // Second read pass: should be mostly cache hits.
1244        let stats_before = backend.stats();
1245        for p in 0..num_pieces {
1246            backend
1247                .read_chunk(ih, p, 0, chunk, false)
1248                .unwrap_or_else(|e| panic!("second read piece {p}: {e}"));
1249        }
1250        let stats_after = backend.stats();
1251
1252        let hits = stats_after
1253            .cache_hits
1254            .saturating_sub(stats_before.cache_hits);
1255        let total = u64::from(num_pieces);
1256        let hit_rate = hits as f64 / total as f64;
1257
1258        // Should have >80% cache hit rate with 64 x 16 KiB pieces in 1 MiB cache.
1259        assert!(
1260            hit_rate > 0.8,
1261            "cache hit rate {hit_rate:.1} should be >80% (hits={hits}, total={total})"
1262        );
1263    }
1264
1265    // -----------------------------------------------------------------------
1266    // write_block_direct tests
1267    // -----------------------------------------------------------------------
1268
1269    #[test]
1270    fn write_block_direct_contiguous() {
1271        let backend = PosixDiskIo::new(&test_config());
1272        let ih = make_hash_n(60);
1273        let storage = make_storage(100);
1274        backend.register(ih, Arc::clone(&storage));
1275
1276        // Common case: all data in s0, empty s1.
1277        let data = vec![0xCDu8; 50];
1278        backend.write_block_direct(ih, 0, 0, &data, &[]).unwrap();
1279
1280        let read = storage.read_chunk(0, 0, 50).unwrap();
1281        assert_eq!(read, data);
1282
1283        // Stats should reflect the write.
1284        let stats = backend.stats();
1285        assert_eq!(stats.write_bytes, 50);
1286    }
1287
1288    #[test]
1289    fn write_block_direct_split() {
1290        let backend = PosixDiskIo::new(&test_config());
1291        let ih = make_hash_n(61);
1292        let storage = make_storage(100);
1293        backend.register(ih, Arc::clone(&storage));
1294
1295        // Ring-buffer wrap: 30 bytes in s0, 20 bytes in s1.
1296        let s0: Vec<u8> = (0..30).collect();
1297        let s1: Vec<u8> = (30..50).collect();
1298        backend.write_block_direct(ih, 0, 10, &s0, &s1).unwrap();
1299
1300        let read = storage.read_chunk(0, 10, 50).unwrap();
1301        let expected: Vec<u8> = (0..50).collect();
1302        assert_eq!(read, expected);
1303
1304        let stats = backend.stats();
1305        assert_eq!(stats.write_bytes, 50);
1306    }
1307}