Skip to main content

irontide_session/
disk.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_precision_loss,
4    clippy::cast_possible_wrap,
5    clippy::cast_sign_loss,
6    reason = "M175: disk job sizes bounded by piece_length (u32 by construction in Lengths::new)"
7)]
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use parking_lot::Mutex;
13
14use bitflags::bitflags;
15use bytes::Bytes;
16use irontide_core::{Id20, Id32};
17use irontide_storage::TorrentStorage;
18use tokio::sync::{mpsc, oneshot};
19use tracing::warn;
20
21bitflags! {
22    /// Hint flags for disk I/O operations.
23    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
24    pub struct DiskJobFlags: u8 {
25        /// Copy cached blocks rather than sharing references.
26        const FORCE_COPY      = 0x01;
27        /// Hint: sequential file access pattern (read-ahead friendly).
28        const SEQUENTIAL      = 0x02;
29        /// Don't cache this read result.
30        const VOLATILE_READ   = 0x04;
31        /// Flush completed piece to disk immediately.
32        const FLUSH_PIECE     = 0x08;
33    }
34}
35
36/// Error reported asynchronously from a non-blocking disk write.
37#[derive(Debug)]
38pub struct DiskWriteError {
39    /// Piece index that failed to write.
40    pub piece: u32,
41    /// Byte offset within the piece.
42    pub begin: u32,
43    /// The underlying storage error.
44    pub error: irontide_storage::Error,
45}
46
47/// Result of an asynchronous piece hash verification.
48#[derive(Debug)]
49pub struct VerifyResult {
50    /// Piece index that was verified.
51    pub piece: u32,
52    /// Whether the piece hash matched the expected value.
53    pub passed: bool,
54}
55
56/// A single block write job for the deferred writer task.
57pub(crate) struct WriteJob {
58    piece: u32,
59    begin: u32,
60    data: Bytes,
61}
62
63/// State for the per-torrent deferred write queue.
64///
65/// Peers enqueue writes via an MPSC channel; a dedicated writer task
66/// drains the channel and calls `block_in_place(storage.write_chunk())`.
67/// A per-piece pending counter + Notify allows callers to wait until
68/// all writes for a piece are flushed before hash verification.
69pub(crate) struct DiskWriteState {
70    tx: mpsc::Sender<WriteJob>,
71    /// Per-piece outstanding write count.
72    pending: Mutex<HashMap<u32, u32>>,
73    /// Signalled whenever any piece's pending count hits zero.
74    notify: tokio::sync::Notify,
75    /// M120: Lock timing diagnostics for the pending mutex.
76    lock_timing: crate::timed_lock::LockTimingSettings,
77}
78
79pub(crate) enum DiskJob {
80    Register {
81        info_hash: Id20,
82        storage: Arc<dyn TorrentStorage>,
83        reply: oneshot::Sender<()>,
84    },
85    Unregister {
86        info_hash: Id20,
87    },
88
89    Write {
90        info_hash: Id20,
91        piece: u32,
92        begin: u32,
93        data: Bytes,
94        flags: DiskJobFlags,
95        reply: oneshot::Sender<irontide_storage::Result<()>>,
96    },
97    Read {
98        info_hash: Id20,
99        piece: u32,
100        begin: u32,
101        length: u32,
102        flags: DiskJobFlags,
103        reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
104    },
105    Hash {
106        info_hash: Id20,
107        piece: u32,
108        expected: Id20,
109        #[allow(dead_code)]
110        flags: DiskJobFlags,
111        reply: oneshot::Sender<irontide_storage::Result<bool>>,
112    },
113    HashV2 {
114        info_hash: Id20,
115        piece: u32,
116        expected: Id32,
117        #[allow(dead_code)]
118        flags: DiskJobFlags,
119        reply: oneshot::Sender<irontide_storage::Result<bool>>,
120    },
121    BlockHash {
122        info_hash: Id20,
123        piece: u32,
124        begin: u32,
125        length: u32,
126        #[allow(dead_code)]
127        flags: DiskJobFlags,
128        reply: oneshot::Sender<irontide_storage::Result<Id32>>,
129    },
130
131    ClearPiece {
132        info_hash: Id20,
133        piece: u32,
134    },
135    FlushWriteBuffer {
136        info_hash: Id20,
137        piece: u32,
138        reply: oneshot::Sender<irontide_storage::Result<()>>,
139    },
140
141    CachedPieces {
142        info_hash: Id20,
143        reply: oneshot::Sender<Vec<u32>>,
144    },
145
146    /// Flush all buffered writes across all torrents.
147    FlushAll {
148        reply: oneshot::Sender<irontide_storage::Result<()>>,
149    },
150
151    Shutdown {
152        reply: oneshot::Sender<()>,
153    },
154}
155
156/// Configuration for the disk I/O subsystem.
157#[derive(Debug, Clone)]
158pub struct DiskConfig {
159    /// Number of concurrent I/O threads (semaphore permits). Default: 4.
160    pub io_threads: usize,
161    /// Storage allocation mode. Default: Auto.
162    pub storage_mode: irontide_core::StorageMode,
163    /// Total cache size in bytes (read + write). Default: 16 MiB.
164    /// Deprecated: use `buffer_pool_capacity` instead.
165    pub cache_size: usize,
166    /// Fraction of `cache_size` reserved for write buffering. Default: 0.5.
167    /// Deprecated: buffer pool handles write/read split implicitly.
168    pub write_cache_ratio: f32,
169    /// Bounded channel capacity. Default: 512.
170    pub channel_capacity: usize,
171    /// Unified buffer pool capacity in bytes. Default: 64 MiB.
172    pub buffer_pool_capacity: usize,
173    /// Lock cached piece data in physical memory. Default: true on Unix.
174    pub enable_mlock: bool,
175    /// M120: Lock timing warning threshold in milliseconds (0 = disabled).
176    pub lock_warn_threshold_ms: u64,
177    /// `io_uring` submission queue depth. Default: 256.
178    pub io_uring_sq_depth: u32,
179    /// Enable `O_DIRECT` for `io_uring` writes. Default: false.
180    pub io_uring_direct_io: bool,
181    /// Enable direct I/O for filesystem storage (`O_DIRECT` / `F_NOCACHE`). Default: false.
182    pub filesystem_direct_io: bool,
183    /// Minimum segments to batch for `io_uring`. Default: 4.
184    pub io_uring_batch_threshold: usize,
185    /// IOCP concurrent thread count (0 = system default). Default: 0.
186    pub iocp_concurrent_threads: u32,
187    /// Enable `FILE_FLAG_NO_BUFFERING` for IOCP I/O. Default: false.
188    pub iocp_direct_io: bool,
189}
190
191impl Default for DiskConfig {
192    fn default() -> Self {
193        Self {
194            io_threads: 4,
195            storage_mode: irontide_core::StorageMode::Auto,
196            cache_size: 16 * 1024 * 1024,
197            write_cache_ratio: 0.5,
198            channel_capacity: 512,
199            buffer_pool_capacity: 64 * 1024 * 1024,
200            enable_mlock: cfg!(unix),
201            lock_warn_threshold_ms: 50,
202            io_uring_sq_depth: 256,
203            io_uring_direct_io: false,
204            filesystem_direct_io: false,
205            io_uring_batch_threshold: 4,
206            iocp_concurrent_threads: 0,
207            iocp_direct_io: false,
208        }
209    }
210}
211
212/// Disk I/O performance counters.
213#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
214pub struct DiskStats {
215    /// Total bytes read from disk.
216    pub read_bytes: u64,
217    /// Total bytes written to disk.
218    pub write_bytes: u64,
219    /// Number of read requests served from cache.
220    pub cache_hits: u64,
221    /// Number of read requests that required disk I/O.
222    pub cache_misses: u64,
223    /// Current size of the write buffer in bytes.
224    pub write_buffer_bytes: usize,
225    /// Number of pending disk I/O jobs in the queue.
226    pub queued_jobs: usize,
227    /// Current size of the read cache in bytes (M102).
228    #[serde(default)]
229    pub read_cache_bytes: usize,
230    /// Total number of entries in the buffer pool (M102).
231    #[serde(default)]
232    pub pool_entries: usize,
233    /// Number of prefetch insertions into the read cache (M102).
234    #[serde(default)]
235    pub prefetch_count: u64,
236    /// Number of ARC evictions from the read cache (M102).
237    #[serde(default)]
238    pub eviction_count: u64,
239    /// Number of Writing-to-Skeleton demotions (M102).
240    #[serde(default)]
241    pub skeleton_count: u64,
242}
243
244impl From<crate::disk_backend::DiskIoStats> for DiskStats {
245    fn from(s: crate::disk_backend::DiskIoStats) -> Self {
246        Self {
247            read_bytes: s.read_bytes,
248            write_bytes: s.write_bytes,
249            cache_hits: s.cache_hits,
250            cache_misses: s.cache_misses,
251            write_buffer_bytes: s.write_buffer_bytes,
252            queued_jobs: 0,
253            read_cache_bytes: s.read_cache_bytes,
254            pool_entries: s.pool_entries,
255            prefetch_count: s.prefetch_count,
256            eviction_count: s.eviction_count,
257            skeleton_count: s.skeleton_count,
258        }
259    }
260}
261
262// ---------------------------------------------------------------------------
263// DiskManagerHandle — session-level handle
264// ---------------------------------------------------------------------------
265
266/// Session-level handle for managing the disk subsystem.
267#[derive(Clone)]
268pub struct DiskManagerHandle {
269    tx: mpsc::Sender<DiskJob>,
270    /// Backend reference for per-torrent deferred writes (M100).
271    backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
272    /// Bounded blocking-task spawner (M116).
273    spawner: crate::blocking_spawner::BlockingSpawner,
274}
275
276impl DiskManagerHandle {
277    /// Create a new disk manager with the default backend selected from config.
278    /// Returns the handle and a `JoinHandle` for the background actor task.
279    #[must_use]
280    pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
281        let backend = crate::disk_backend::create_backend_from_config(&config);
282        let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
283        Self::new_with_backend(config, backend, spawner)
284    }
285
286    /// Create a new disk manager with a custom disk I/O backend.
287    /// Returns the handle and a `JoinHandle` for the background actor task.
288    pub(crate) fn new_with_backend(
289        config: DiskConfig,
290        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
291        spawner: crate::blocking_spawner::BlockingSpawner,
292    ) -> (Self, tokio::task::JoinHandle<()>) {
293        let (tx, rx) = mpsc::channel(config.channel_capacity);
294        let backend_for_actor = Arc::clone(&backend);
295        let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
296        let join = tokio::spawn(actor.run());
297        (
298            Self {
299                tx,
300                backend,
301                spawner,
302            },
303            join,
304        )
305    }
306
307    /// Register a torrent's storage with the disk subsystem and return a
308    /// per-torrent `DiskHandle`.
309    pub async fn register_torrent(
310        &self,
311        info_hash: Id20,
312        storage: Arc<dyn TorrentStorage>,
313    ) -> DiskHandle {
314        // Clone storage: one for the DiskJob::Register, one for the DiskHandle.
315        let storage_for_handle = Arc::clone(&storage);
316
317        let (reply_tx, reply_rx) = oneshot::channel();
318        let _ = self
319            .tx
320            .send(DiskJob::Register {
321                info_hash,
322                storage,
323                reply: reply_tx,
324            })
325            .await;
326        let _ = reply_rx.await;
327
328        // Create the deferred write queue (M100).
329        let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
330        let write_state = Arc::new(DiskWriteState {
331            tx: write_tx,
332            pending: Mutex::new(HashMap::new()),
333            notify: tokio::sync::Notify::new(),
334            lock_timing: crate::timed_lock::LockTimingSettings::default(),
335        });
336
337        // Spawn the per-torrent writer task.
338        let writer_storage = Arc::clone(&storage_for_handle);
339        let writer_state = Arc::clone(&write_state);
340        let writer_spawner = self.spawner.clone();
341        tokio::spawn(async move {
342            while let Some(first) = write_rx.recv().await {
343                // Drain up to 64 jobs into a batch, then execute all in a
344                // single block_in_place call to reduce thread pool overhead.
345                let mut batch = vec![first];
346                while batch.len() < 64 {
347                    match write_rx.try_recv() {
348                        Ok(job) => batch.push(job),
349                        Err(_) => break,
350                    }
351                }
352
353                // Collect piece indices before moving batch into the closure.
354                let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
355
356                let ws = Arc::clone(&writer_storage);
357                let spawner = writer_spawner.clone();
358                spawner
359                    .block_in_place(move || {
360                        for WriteJob { piece, begin, data } in &batch {
361                            if let Err(e) = ws.write_chunk(*piece, *begin, data) {
362                                tracing::warn!(piece, begin, %e, "deferred write failed");
363                            }
364                        }
365                    })
366                    .await;
367
368                // Decrement pending counts for all jobs in batch, notify once.
369                {
370                    let mut pending = crate::timed_lock::TimedGuard::new(
371                        writer_state.pending.lock(),
372                        &writer_state.lock_timing,
373                        "disk_pending",
374                    );
375                    for piece in &pieces {
376                        if let Some(count) = pending.get_mut(piece) {
377                            *count = count.saturating_sub(1);
378                            if *count == 0 {
379                                pending.remove(piece);
380                            }
381                        }
382                    }
383                }
384                writer_state.notify.notify_waiters();
385            }
386        });
387
388        DiskHandle {
389            tx: self.tx.clone(),
390            info_hash,
391            hash_pool: None,
392            hash_result_tx: None,
393            storage: Some(storage_for_handle),
394            backend: Some(Arc::clone(&self.backend)),
395            write_state: Some(write_state),
396            spawner: Some(self.spawner.clone()),
397        }
398    }
399
400    /// Unregister a torrent, flushing and clearing its write buffer and cache.
401    pub async fn unregister_torrent(&self, info_hash: Id20) {
402        let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
403    }
404
405    /// Gracefully shut down the disk subsystem, flushing all buffers.
406    pub async fn shutdown(&self) {
407        let (tx, rx) = oneshot::channel();
408        let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
409        let _ = rx.await;
410    }
411}
412
413// ---------------------------------------------------------------------------
414// DiskHandle — per-torrent handle
415// ---------------------------------------------------------------------------
416
417/// Per-torrent handle for async disk I/O.
418#[derive(Clone)]
419pub struct DiskHandle {
420    tx: mpsc::Sender<DiskJob>,
421    info_hash: Id20,
422    /// Hash pool for parallel piece verification (M96).
423    hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
424    /// Per-torrent hash result sender (M96).
425    hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
426    /// Direct storage reference for deferred writes (M100).
427    storage: Option<Arc<dyn TorrentStorage>>,
428    /// Backend reference for disk-based verify (M100).
429    backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
430    /// Deferred write queue state (M100).
431    write_state: Option<Arc<DiskWriteState>>,
432    /// Bounded blocking-task spawner (M116).
433    spawner: Option<crate::blocking_spawner::BlockingSpawner>,
434}
435
436impl std::fmt::Debug for DiskHandle {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct("DiskHandle")
439            .field("info_hash", &self.info_hash)
440            .finish_non_exhaustive()
441    }
442}
443
444impl DiskHandle {
445    /// Create a `DiskHandle` from raw parts (for internal/test use).
446    #[cfg_attr(not(test), allow(dead_code))]
447    pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
448        Self {
449            tx,
450            info_hash,
451            hash_pool: None,
452            hash_result_tx: None,
453            storage: None,
454            backend: None,
455            write_state: None,
456            spawner: None,
457        }
458    }
459
460    /// Set the hash pool reference (M96).
461    pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
462        self.hash_pool = Some(pool);
463    }
464
465    /// Set the per-torrent hash result sender (M96).
466    pub fn set_hash_result_tx(
467        &mut self,
468        tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
469    ) {
470        self.hash_result_tx = Some(tx);
471    }
472
473    /// Write a chunk to disk (may be buffered).
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if the disk I/O fails or the disk actor is gone.
478    pub async fn write_chunk(
479        &self,
480        piece: u32,
481        begin: u32,
482        data: Bytes,
483        flags: DiskJobFlags,
484    ) -> irontide_storage::Result<()> {
485        let (tx, rx) = oneshot::channel();
486        let _ = self
487            .tx
488            .send(DiskJob::Write {
489                info_hash: self.info_hash,
490                piece,
491                begin,
492                data,
493                flags,
494                reply: tx,
495            })
496            .await;
497        rx.await.unwrap_or_else(|_| {
498            Err(irontide_storage::Error::Io(std::io::Error::new(
499                std::io::ErrorKind::BrokenPipe,
500                "disk actor gone",
501            )))
502        })
503    }
504
505    /// Read a chunk from disk (may hit cache or write buffer).
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the disk I/O fails or the disk actor is gone.
510    pub async fn read_chunk(
511        &self,
512        piece: u32,
513        begin: u32,
514        length: u32,
515        flags: DiskJobFlags,
516    ) -> irontide_storage::Result<Bytes> {
517        let (tx, rx) = oneshot::channel();
518        let _ = self
519            .tx
520            .send(DiskJob::Read {
521                info_hash: self.info_hash,
522                piece,
523                begin,
524                length,
525                flags,
526                reply: tx,
527            })
528            .await;
529        rx.await.unwrap_or_else(|_| {
530            Err(irontide_storage::Error::Io(std::io::Error::new(
531                std::io::ErrorKind::BrokenPipe,
532                "disk actor gone",
533            )))
534        })
535    }
536
537    /// Verify a piece hash against an expected value.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if the disk I/O fails or the disk actor is gone.
542    pub async fn verify_piece(
543        &self,
544        piece: u32,
545        expected: Id20,
546        flags: DiskJobFlags,
547    ) -> irontide_storage::Result<bool> {
548        let (tx, rx) = oneshot::channel();
549        let _ = self
550            .tx
551            .send(DiskJob::Hash {
552                info_hash: self.info_hash,
553                piece,
554                expected,
555                flags,
556                reply: tx,
557            })
558            .await;
559        rx.await.unwrap_or_else(|_| {
560            Err(irontide_storage::Error::Io(std::io::Error::new(
561                std::io::ErrorKind::BrokenPipe,
562                "disk actor gone",
563            )))
564        })
565    }
566
567    /// Verify a piece hash against an expected SHA-256 value (v2).
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the disk I/O fails or the disk actor is gone.
572    pub async fn verify_piece_v2(
573        &self,
574        piece: u32,
575        expected: Id32,
576        flags: DiskJobFlags,
577    ) -> irontide_storage::Result<bool> {
578        let (tx, rx) = oneshot::channel();
579        let _ = self
580            .tx
581            .send(DiskJob::HashV2 {
582                info_hash: self.info_hash,
583                piece,
584                expected,
585                flags,
586                reply: tx,
587            })
588            .await;
589        rx.await.unwrap_or_else(|_| {
590            Err(irontide_storage::Error::Io(std::io::Error::new(
591                std::io::ErrorKind::BrokenPipe,
592                "disk actor gone",
593            )))
594        })
595    }
596
597    /// Hash a single block with SHA-256 for Merkle verification (v2).
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the disk I/O fails or the disk actor is gone.
602    pub async fn hash_block(
603        &self,
604        piece: u32,
605        begin: u32,
606        length: u32,
607        flags: DiskJobFlags,
608    ) -> irontide_storage::Result<Id32> {
609        let (tx, rx) = oneshot::channel();
610        let _ = self
611            .tx
612            .send(DiskJob::BlockHash {
613                info_hash: self.info_hash,
614                piece,
615                begin,
616                length,
617                flags,
618                reply: tx,
619            })
620            .await;
621        rx.await.unwrap_or_else(|_| {
622            Err(irontide_storage::Error::Io(std::io::Error::new(
623                std::io::ErrorKind::BrokenPipe,
624                "disk actor gone",
625            )))
626        })
627    }
628
629    /// Clear a piece from cache and write buffer (e.g. on hash failure).
630    pub async fn clear_piece(&self, piece: u32) {
631        let _ = self
632            .tx
633            .send(DiskJob::ClearPiece {
634                info_hash: self.info_hash,
635                piece,
636            })
637            .await;
638    }
639
640    /// Flush a specific piece from the write buffer to disk.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the disk I/O fails or the disk actor is gone.
645    pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
646        let (tx, rx) = oneshot::channel();
647        let _ = self
648            .tx
649            .send(DiskJob::FlushWriteBuffer {
650                info_hash: self.info_hash,
651                piece,
652                reply: tx,
653            })
654            .await;
655        rx.await.unwrap_or_else(|_| {
656            Err(irontide_storage::Error::Io(std::io::Error::new(
657                std::io::ErrorKind::BrokenPipe,
658                "disk actor gone",
659            )))
660        })
661    }
662
663    /// Query which pieces are currently in the read cache for this torrent.
664    pub async fn cached_pieces(&self) -> Vec<u32> {
665        let (tx, rx) = oneshot::channel();
666        let _ = self
667            .tx
668            .send(DiskJob::CachedPieces {
669                info_hash: self.info_hash,
670                reply: tx,
671            })
672            .await;
673        rx.await.unwrap_or_default()
674    }
675
676    /// Flush all buffered writes to persistent storage.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the disk I/O fails or the disk actor is gone.
681    pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
682        let (tx, rx) = oneshot::channel();
683        let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
684        rx.await.unwrap_or_else(|_| {
685            Err(irontide_storage::Error::Io(std::io::Error::new(
686                std::io::ErrorKind::BrokenPipe,
687                "disk actor gone",
688            )))
689        })
690    }
691
692    /// Spawn a non-blocking v1 piece hash verification.
693    ///
694    /// M96: If a hash pool is configured, submits the job to the pool.
695    /// M101: Uses `HashJob::Streaming` to delegate reading + hashing to the
696    /// backend, eliminating the full-piece allocation on the caller side.
697    ///
698    /// The `generation` parameter enables staleness detection by the caller.
699    pub fn enqueue_verify(
700        &self,
701        piece: u32,
702        expected: Id20,
703        generation: u64,
704        result_tx: &mpsc::Sender<VerifyResult>,
705    ) {
706        // M96/M101: If hash pool is available, submit streaming job.
707        if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
708            if let Some(backend) = &self.backend {
709                let pool = pool.clone();
710                let hash_tx = hash_tx.clone();
711                let backend = Arc::clone(backend);
712                let info_hash = self.info_hash;
713                let job = crate::hash_pool::HashJob::Streaming {
714                    piece,
715                    expected,
716                    generation,
717                    info_hash,
718                    backend,
719                    result_tx: hash_tx,
720                };
721                tokio::spawn(async move {
722                    if pool.submit(job).await.is_err() {
723                        tracing::warn!(piece, "hash pool shut down, treating as failed");
724                    }
725                });
726                return;
727            }
728
729            // No backend — send failure.
730            let hash_tx = hash_tx.clone();
731            tokio::spawn(async move {
732                tracing::warn!(piece, "verify: no backend (hash pool path)");
733                let _ = hash_tx
734                    .send(crate::hash_pool::HashResult {
735                        piece,
736                        passed: false,
737                        generation,
738                    })
739                    .await;
740            });
741            return;
742        }
743
744        // Non-pool path: delegate to backend.hash_piece().
745        if let Some(backend) = &self.backend {
746            let backend = Arc::clone(backend);
747            let info_hash = self.info_hash;
748            let result_tx = result_tx.clone();
749            let spawner = self.spawner.clone().unwrap();
750            tokio::spawn(async move {
751                let passed = spawner
752                    .block_in_place(move || {
753                        backend
754                            .hash_piece(info_hash, piece, &expected)
755                            .unwrap_or_else(|e| {
756                                warn!(piece, %e, "verify: hash_piece failed");
757                                false
758                            })
759                    })
760                    .await;
761                let _ = result_tx.send(VerifyResult { piece, passed }).await;
762            });
763            return;
764        }
765
766        // No data source at all — treat as failure.
767        let result_tx = result_tx.clone();
768        tokio::spawn(async move {
769            warn!(piece, "verify: no data source, treating as failed");
770            let _ = result_tx
771                .send(VerifyResult {
772                    piece,
773                    passed: false,
774                })
775                .await;
776        });
777    }
778
779    /// Spawn a non-blocking v2 piece hash verification (SHA-256).
780    ///
781    /// Reads the piece from disk via the backend's `read_piece()` method.
782    pub fn enqueue_verify_v2(
783        &self,
784        piece: u32,
785        expected: Id32,
786        result_tx: &mpsc::Sender<VerifyResult>,
787    ) {
788        if let Some(backend) = &self.backend {
789            let backend = Arc::clone(backend);
790            let info_hash = self.info_hash;
791            let result_tx = result_tx.clone();
792            let spawner = self.spawner.clone().unwrap();
793            tokio::spawn(async move {
794                let passed = spawner
795                    .block_in_place(move || match backend.read_piece(info_hash, piece) {
796                        Ok(data) => {
797                            let actual = irontide_core::sha256(&data);
798                            actual == expected
799                        }
800                        Err(e) => {
801                            warn!(piece, %e, "verify v2: read_piece failed");
802                            false
803                        }
804                    })
805                    .await;
806                let _ = result_tx.send(VerifyResult { piece, passed }).await;
807            });
808            return;
809        }
810
811        // No backend — treat as failure.
812        let result_tx = result_tx.clone();
813        tokio::spawn(async move {
814            warn!(piece, "verify v2: no data source, treating as failed");
815            let _ = result_tx
816                .send(VerifyResult {
817                    piece,
818                    passed: false,
819                })
820                .await;
821        });
822    }
823
824    /// Enqueue a block write via the deferred writer task (M100).
825    ///
826    /// The write is sent to a dedicated per-torrent writer task that calls
827    /// `block_in_place(storage.write_chunk())`. If the channel is full, falls
828    /// back to a synchronous `block_in_place` write from the calling task.
829    ///
830    /// Returns early (no-op) if `write_state` is `None` (pre-M100 code path).
831    #[allow(
832        clippy::needless_pass_by_value,
833        reason = "Bytes is refcounted, pass-by-value is the bytes-crate idiom"
834    )]
835    pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
836        let (Some(write_state), Some(storage)) = (&self.write_state, &self.storage) else {
837            return;
838        };
839
840        // Increment pending count before sending.
841        {
842            let mut pending = crate::timed_lock::TimedGuard::new(
843                write_state.pending.lock(),
844                &write_state.lock_timing,
845                "disk_pending",
846            );
847            *pending.entry(piece).or_insert(0) += 1;
848        }
849
850        match write_state.tx.try_send(WriteJob {
851            piece,
852            begin,
853            data: data.clone(),
854        }) {
855            Ok(()) => {}
856            Err(mpsc::error::TrySendError::Full(_)) => {
857                // Channel full: write synchronously to avoid unbounded backlog.
858                let storage = Arc::clone(storage);
859                if let Some(ref spawner) = self.spawner {
860                    spawner.block_in_place_sync(|| {
861                        if let Err(e) = storage.write_chunk(piece, begin, &data) {
862                            tracing::warn!(piece, begin, %e, "deferred write fallback failed");
863                        }
864                    });
865                } else {
866                    // Fallback: direct call (pre-M116 path)
867                    if let Err(e) = storage.write_chunk(piece, begin, &data) {
868                        tracing::warn!(piece, begin, %e, "deferred write fallback failed");
869                    }
870                }
871                // Decrement pending + notify.
872                let mut pending = crate::timed_lock::TimedGuard::new(
873                    write_state.pending.lock(),
874                    &write_state.lock_timing,
875                    "disk_pending",
876                );
877                if let Some(count) = pending.get_mut(&piece) {
878                    *count = count.saturating_sub(1);
879                    if *count == 0 {
880                        pending.remove(&piece);
881                        drop(pending);
882                        write_state.notify.notify_waiters();
883                    }
884                }
885            }
886            Err(mpsc::error::TrySendError::Closed(_)) => {
887                // Writer task gone — decrement pending to avoid stuck waiters.
888                let mut pending = crate::timed_lock::TimedGuard::new(
889                    write_state.pending.lock(),
890                    &write_state.lock_timing,
891                    "disk_pending",
892                );
893                if let Some(count) = pending.get_mut(&piece) {
894                    *count = count.saturating_sub(1);
895                    if *count == 0 {
896                        pending.remove(&piece);
897                        drop(pending);
898                        write_state.notify.notify_waiters();
899                    }
900                }
901            }
902        }
903    }
904
905    /// Write a block directly to storage from two slices (vectored write).
906    ///
907    /// Bypasses the buffer pool and deferred write queue — data goes straight
908    /// to the underlying storage via `DiskIoBackend::write_block_direct`.
909    /// This is the zero-copy direct-pwrite path used by peer tasks when
910    /// the ring buffer wraps and produces two slices.
911    ///
912    /// Returns `Ok(())` on success, or an error if the backend is not set
913    /// or the underlying write fails.
914    pub(crate) fn write_block_direct(
915        &self,
916        piece: u32,
917        begin: u32,
918        s0: &[u8],
919        s1: &[u8],
920    ) -> crate::Result<()> {
921        let Some(backend) = &self.backend else {
922            return Ok(());
923        };
924        backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
925    }
926
927    /// Wait until all deferred writes for `piece` have been flushed to storage.
928    ///
929    /// Returns immediately if `write_state` is `None` (pre-M100 path) or if
930    /// there are no pending writes for the given piece.
931    pub(crate) async fn flush_piece_writes(&self, piece: u32) {
932        let Some(write_state) = &self.write_state else {
933            return;
934        };
935
936        loop {
937            {
938                let pending = crate::timed_lock::TimedGuard::new(
939                    write_state.pending.lock(),
940                    &write_state.lock_timing,
941                    "disk_pending",
942                );
943                if !pending.contains_key(&piece) {
944                    return;
945                }
946            }
947            write_state.notify.notified().await;
948        }
949    }
950
951    /// Direct storage reference (M100).
952    #[allow(dead_code)]
953    pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
954        self.storage.clone()
955    }
956}
957
958// ---------------------------------------------------------------------------
959// DiskActor — dispatcher loop (all I/O runs on tokio's blocking thread pool)
960// ---------------------------------------------------------------------------
961
962struct DiskActor {
963    rx: mpsc::Receiver<DiskJob>,
964    backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
965    spawner: crate::blocking_spawner::BlockingSpawner,
966    #[allow(dead_code)]
967    config: DiskConfig,
968}
969
970impl DiskActor {
971    fn new(
972        rx: mpsc::Receiver<DiskJob>,
973        config: DiskConfig,
974        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
975        spawner: crate::blocking_spawner::BlockingSpawner,
976    ) -> Self {
977        Self {
978            rx,
979            backend,
980            spawner,
981            config,
982        }
983    }
984
985    async fn run(mut self) {
986        loop {
987            // Block on first job
988            let Some(first) = self.rx.recv().await else {
989                break;
990            };
991
992            // Drain remaining pending jobs (batch processing)
993            let mut batch = vec![first];
994            while let Ok(job) = self.rx.try_recv() {
995                batch.push(job);
996            }
997
998            for job in batch {
999                if let DiskJob::Shutdown { reply } = job {
1000                    // Flush on blocking thread to avoid stalling tokio runtime.
1001                    let backend = Arc::clone(&self.backend);
1002                    let spawner = self.spawner.clone();
1003                    let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
1004                    if let Err(e) = flush_result {
1005                        warn!("flush_all on shutdown failed: {e}");
1006                    }
1007                    let _ = reply.send(());
1008                    return;
1009                }
1010                self.dispatch_job(job);
1011            }
1012        }
1013    }
1014
1015    /// Dispatch a job for execution. Fast metadata ops run inline;
1016    /// all I/O ops are spawned as fire-and-forget tasks bounded by the
1017    /// semaphore, so the dispatcher loop never blocks on disk operations.
1018    fn dispatch_job(&self, job: DiskJob) {
1019        match job {
1020            // --- Fast metadata ops (inline) ---
1021            DiskJob::Register {
1022                info_hash,
1023                storage,
1024                reply,
1025            } => {
1026                self.backend.register(info_hash, storage);
1027                let _ = reply.send(());
1028            }
1029            DiskJob::Unregister { info_hash } => {
1030                self.backend.unregister(info_hash);
1031            }
1032            DiskJob::ClearPiece { info_hash, piece } => {
1033                self.backend.clear_piece(info_hash, piece);
1034            }
1035            DiskJob::CachedPieces { info_hash, reply } => {
1036                let pieces = self.backend.cached_pieces(info_hash);
1037                let _ = reply.send(pieces);
1038            }
1039
1040            // --- Synchronous write (caller awaits reply) ---
1041            DiskJob::Write {
1042                info_hash,
1043                piece,
1044                begin,
1045                data,
1046                flags,
1047                reply,
1048            } => {
1049                let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
1050                let backend = Arc::clone(&self.backend);
1051                let spawner = self.spawner.clone();
1052                tokio::spawn(async move {
1053                    let result = spawner
1054                        .block_in_place(move || {
1055                            backend.write_chunk(info_hash, piece, begin, data, flush)
1056                        })
1057                        .await;
1058                    let _ = reply.send(to_storage_result(result));
1059                });
1060            }
1061
1062            // --- Read ---
1063            DiskJob::Read {
1064                info_hash,
1065                piece,
1066                begin,
1067                length,
1068                flags,
1069                reply,
1070            } => {
1071                let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
1072                let backend = Arc::clone(&self.backend);
1073                let spawner = self.spawner.clone();
1074                tokio::spawn(async move {
1075                    let result = spawner
1076                        .block_in_place(move || {
1077                            backend.read_chunk(info_hash, piece, begin, length, volatile)
1078                        })
1079                        .await;
1080                    let _ = reply.send(to_storage_result(result));
1081                });
1082            }
1083
1084            // --- Synchronous hash (v1) ---
1085            DiskJob::Hash {
1086                info_hash,
1087                piece,
1088                expected,
1089                reply,
1090                ..
1091            } => {
1092                let backend = Arc::clone(&self.backend);
1093                let spawner = self.spawner.clone();
1094                tokio::spawn(async move {
1095                    let result = spawner
1096                        .block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
1097                        .await;
1098                    let _ = reply.send(to_storage_result(result));
1099                });
1100            }
1101
1102            // --- Synchronous hash (v2) ---
1103            DiskJob::HashV2 {
1104                info_hash,
1105                piece,
1106                expected,
1107                reply,
1108                ..
1109            } => {
1110                let backend = Arc::clone(&self.backend);
1111                let spawner = self.spawner.clone();
1112                tokio::spawn(async move {
1113                    let result = spawner
1114                        .block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
1115                        .await;
1116                    let _ = reply.send(to_storage_result(result));
1117                });
1118            }
1119
1120            // --- Block hash (v2 Merkle) ---
1121            DiskJob::BlockHash {
1122                info_hash,
1123                piece,
1124                begin,
1125                length,
1126                reply,
1127                ..
1128            } => {
1129                let backend = Arc::clone(&self.backend);
1130                let spawner = self.spawner.clone();
1131                tokio::spawn(async move {
1132                    let result = spawner
1133                        .block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
1134                        .await;
1135                    let _ = reply.send(to_storage_result(result));
1136                });
1137            }
1138
1139            // --- Flush piece ---
1140            DiskJob::FlushWriteBuffer {
1141                info_hash,
1142                piece,
1143                reply,
1144            } => {
1145                let backend = Arc::clone(&self.backend);
1146                let spawner = self.spawner.clone();
1147                tokio::spawn(async move {
1148                    let result = spawner
1149                        .block_in_place(move || backend.flush_piece(info_hash, piece))
1150                        .await;
1151                    let _ = reply.send(to_storage_result(result));
1152                });
1153            }
1154
1155            // --- Flush all ---
1156            DiskJob::FlushAll { reply } => {
1157                let backend = Arc::clone(&self.backend);
1158                let spawner = self.spawner.clone();
1159                tokio::spawn(async move {
1160                    let result = spawner.block_in_place(move || backend.flush_all()).await;
1161                    let _ = reply.send(to_storage_result(result));
1162                });
1163            }
1164
1165            DiskJob::Shutdown { .. } => unreachable!(),
1166        }
1167    }
1168}
1169
1170/// Convert `crate::Result<T>` to `irontide_storage::Result<T>` for reply channels.
1171fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
1172    r.map_err(|e| match e {
1173        crate::Error::Storage(se) => se,
1174        other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
1175    })
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use super::*;
1181    use irontide_core::Lengths;
1182    use irontide_storage::MemoryStorage;
1183
1184    // ── DiskActor integration tests ──────────────────────────────────
1185
1186    fn test_config() -> DiskConfig {
1187        DiskConfig {
1188            io_threads: 2,
1189            cache_size: 1024 * 1024,
1190            ..DiskConfig::default()
1191        }
1192    }
1193
1194    fn make_hash(n: u8) -> Id20 {
1195        let mut b = [0u8; 20];
1196        b[0] = n;
1197        Id20(b)
1198    }
1199
1200    #[tokio::test]
1201    async fn async_write_read() {
1202        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1203        let ih = make_hash(1);
1204        let lengths = Lengths::new(100, 50, 25);
1205        let storage = Arc::new(MemoryStorage::new(lengths));
1206        let disk = mgr.register_torrent(ih, storage).await;
1207
1208        let data = Bytes::from(vec![42u8; 25]);
1209        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1210            .await
1211            .unwrap();
1212        let read = disk
1213            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1214            .await
1215            .unwrap();
1216        assert_eq!(read, data);
1217
1218        mgr.shutdown().await;
1219    }
1220
1221    #[tokio::test]
1222    async fn verify_through_handle() {
1223        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1224        let ih = make_hash(2);
1225        let lengths = Lengths::new(100, 50, 25);
1226        let storage = Arc::new(MemoryStorage::new(lengths));
1227        let disk = mgr.register_torrent(ih, storage).await;
1228
1229        let piece_data = vec![9u8; 50];
1230        disk.write_chunk(
1231            0,
1232            0,
1233            Bytes::from(piece_data.clone()),
1234            DiskJobFlags::FLUSH_PIECE,
1235        )
1236        .await
1237        .unwrap();
1238        disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
1239            .await
1240            .unwrap();
1241
1242        let expected = irontide_core::sha1(&piece_data);
1243        assert!(
1244            disk.verify_piece(0, expected, DiskJobFlags::empty())
1245                .await
1246                .unwrap()
1247        );
1248        assert!(
1249            !disk
1250                .verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
1251                .await
1252                .unwrap()
1253        );
1254
1255        mgr.shutdown().await;
1256    }
1257
1258    #[tokio::test]
1259    async fn cache_hit_avoids_io() {
1260        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1261        let ih = make_hash(3);
1262        let lengths = Lengths::new(100, 50, 25);
1263        let storage = Arc::new(MemoryStorage::new(lengths));
1264        let disk = mgr.register_torrent(ih, storage).await;
1265
1266        let data = Bytes::from(vec![7u8; 25]);
1267        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1268            .await
1269            .unwrap();
1270
1271        // First read: cache miss, reads from storage
1272        let r1 = disk
1273            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1274            .await
1275            .unwrap();
1276        assert_eq!(r1, data);
1277
1278        // Second read: should be cache hit
1279        let r2 = disk
1280            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1281            .await
1282            .unwrap();
1283        assert_eq!(r2, data);
1284
1285        mgr.shutdown().await;
1286    }
1287
1288    #[tokio::test]
1289    async fn volatile_read_bypasses_cache() {
1290        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1291        let ih = make_hash(4);
1292        let lengths = Lengths::new(100, 50, 25);
1293        let storage = Arc::new(MemoryStorage::new(lengths));
1294        let disk = mgr.register_torrent(ih, storage).await;
1295
1296        let data = Bytes::from(vec![5u8; 25]);
1297        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1298            .await
1299            .unwrap();
1300
1301        // Volatile read: should not cache
1302        let r = disk
1303            .read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
1304            .await
1305            .unwrap();
1306        assert_eq!(r, data);
1307
1308        mgr.shutdown().await;
1309    }
1310
1311    #[tokio::test]
1312    async fn clear_piece_evicts_cache() {
1313        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1314        let ih = make_hash(5);
1315        let lengths = Lengths::new(100, 50, 25);
1316        let storage = Arc::new(MemoryStorage::new(lengths));
1317        let disk = mgr.register_torrent(ih, storage).await;
1318
1319        let data = Bytes::from(vec![5u8; 25]);
1320        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1321            .await
1322            .unwrap();
1323        // Populate cache
1324        disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
1325            .await
1326            .unwrap();
1327        // Clear
1328        disk.clear_piece(0).await;
1329
1330        // Can still read (from storage, not cache)
1331        let r = disk
1332            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1333            .await
1334            .unwrap();
1335        assert_eq!(r, data);
1336
1337        mgr.shutdown().await;
1338    }
1339
1340    #[tokio::test]
1341    async fn write_buffer_flush() {
1342        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1343        let ih = make_hash(6);
1344        let lengths = Lengths::new(100, 50, 25);
1345        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1346        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1347
1348        // Write to buffer (no FLUSH_PIECE)
1349        disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
1350            .await
1351            .unwrap();
1352        disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
1353            .await
1354            .unwrap();
1355
1356        // Explicitly flush
1357        disk.flush_piece(0).await.unwrap();
1358
1359        // Read back from storage directly to verify flush happened
1360        let piece = storage.read_piece(0).unwrap();
1361        assert_eq!(&piece[..25], &[1u8; 25]);
1362        assert_eq!(&piece[25..], &[2u8; 25]);
1363
1364        mgr.shutdown().await;
1365    }
1366
1367    #[tokio::test]
1368    async fn verify_piece_v2_via_disk_handle() {
1369        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1370        let ih = make_hash(11);
1371        let data = vec![0xABu8; 16384];
1372        let expected = irontide_core::sha256(&data);
1373        let lengths = Lengths::new(16384, 16384, 16384);
1374        let storage = Arc::new(MemoryStorage::new(lengths));
1375        storage.write_chunk(0, 0, &data).unwrap();
1376
1377        let disk = mgr.register_torrent(ih, storage).await;
1378        let result = disk
1379            .verify_piece_v2(0, expected, DiskJobFlags::empty())
1380            .await;
1381        assert!(result.unwrap());
1382        mgr.shutdown().await;
1383    }
1384
1385    #[tokio::test]
1386    async fn hash_block_via_disk_handle() {
1387        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1388        let ih = make_hash(12);
1389        let data = vec![0xCDu8; 16384];
1390        let lengths = Lengths::new(16384, 16384, 16384);
1391        let storage = Arc::new(MemoryStorage::new(lengths));
1392        storage.write_chunk(0, 0, &data).unwrap();
1393
1394        let disk = mgr.register_torrent(ih, storage).await;
1395        let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
1396        assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
1397        mgr.shutdown().await;
1398    }
1399
1400    #[tokio::test]
1401    async fn concurrent_verify_multiple_pieces() {
1402        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1403        let ih = make_hash(10);
1404
1405        // 8 pieces of 50 bytes each = 400 bytes total
1406        let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
1407        let piece_len = 50u64;
1408        let lengths = Lengths::new(data.len() as u64, piece_len, 25);
1409        let storage = Arc::new(MemoryStorage::new(lengths.clone()));
1410
1411        // Write all piece data
1412        let num_pieces = lengths.num_pieces();
1413        for p in 0..num_pieces {
1414            let offset = lengths.piece_offset(p) as usize;
1415            let size = lengths.piece_size(p) as usize;
1416            storage
1417                .write_chunk(p, 0, &data[offset..offset + size])
1418                .unwrap();
1419        }
1420
1421        let disk = mgr.register_torrent(ih, storage).await;
1422
1423        // Compute expected hashes
1424        let mut expected_hashes = Vec::new();
1425        for p in 0..num_pieces {
1426            let offset = lengths.piece_offset(p) as usize;
1427            let size = lengths.piece_size(p) as usize;
1428            expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
1429        }
1430
1431        // Verify all 8 pieces concurrently via JoinSet
1432        let mut js = tokio::task::JoinSet::new();
1433        for p in 0..num_pieces {
1434            let d = disk.clone();
1435            let hash = expected_hashes[p as usize];
1436            js.spawn(async move {
1437                let valid = d
1438                    .verify_piece(p, hash, DiskJobFlags::empty())
1439                    .await
1440                    .unwrap();
1441                (p, valid)
1442            });
1443        }
1444
1445        let mut results = Vec::new();
1446        while let Some(r) = js.join_next().await {
1447            results.push(r.unwrap());
1448        }
1449        results.sort_by_key(|&(p, _)| p);
1450
1451        assert_eq!(results.len(), num_pieces as usize);
1452        for (p, valid) in &results {
1453            assert!(valid, "piece {p} should be valid");
1454        }
1455
1456        mgr.shutdown().await;
1457    }
1458
1459    // ── Deferred write queue tests (M100) ────────────────────────────
1460
1461    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1462    async fn write_block_deferred_writes_to_storage() {
1463        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1464        let ih = make_hash(30);
1465        let lengths = Lengths::new(100, 50, 25);
1466        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1467        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1468
1469        let block0 = Bytes::from(vec![0xAAu8; 25]);
1470        let block1 = Bytes::from(vec![0xBBu8; 25]);
1471
1472        disk.write_block_deferred(0, 0, block0.clone());
1473        disk.write_block_deferred(0, 25, block1.clone());
1474
1475        // Wait for all writes to piece 0 to flush.
1476        disk.flush_piece_writes(0).await;
1477
1478        // Read back from storage to verify data landed on disk.
1479        let read0 = storage.read_chunk(0, 0, 25).unwrap();
1480        assert_eq!(&read0[..], &block0[..], "block 0 should match");
1481        let read1 = storage.read_chunk(0, 25, 25).unwrap();
1482        assert_eq!(&read1[..], &block1[..], "block 1 should match");
1483
1484        mgr.shutdown().await;
1485    }
1486
1487    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1488    async fn flush_piece_writes_waits_for_completion() {
1489        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1490        let ih = make_hash(31);
1491        let lengths = Lengths::new(200, 100, 25);
1492        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1493        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1494
1495        // Enqueue 4 blocks for piece 0.
1496        for i in 0u32..4 {
1497            let data = Bytes::from(vec![(i as u8) + 1; 25]);
1498            disk.write_block_deferred(0, i * 25, data);
1499        }
1500
1501        // flush_piece_writes must block until all 4 writes complete.
1502        disk.flush_piece_writes(0).await;
1503
1504        // Verify all blocks are visible on storage.
1505        let piece = storage.read_piece(0).unwrap();
1506        assert_eq!(&piece[0..25], &[1u8; 25]);
1507        assert_eq!(&piece[25..50], &[2u8; 25]);
1508        assert_eq!(&piece[50..75], &[3u8; 25]);
1509        assert_eq!(&piece[75..100], &[4u8; 25]);
1510
1511        mgr.shutdown().await;
1512    }
1513
1514    // ── M101: Batch writer tests ──────────────────────────────────────
1515
1516    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1517    async fn batch_writer_drains_multiple_jobs() {
1518        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1519        let ih = make_hash(50);
1520        // 10 blocks of 25 bytes each across a single piece of 250 bytes.
1521        let lengths = Lengths::new(250, 250, 25);
1522        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1523        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1524
1525        // Enqueue 10 writes (well within the 64-job batch cap).
1526        for i in 0u32..10 {
1527            let data = Bytes::from(vec![i as u8 + 1; 25]);
1528            disk.write_block_deferred(0, i * 25, data);
1529        }
1530
1531        // Wait for all writes to piece 0 to flush.
1532        disk.flush_piece_writes(0).await;
1533
1534        // Verify all 10 blocks landed correctly.
1535        for i in 0u32..10 {
1536            let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
1537            assert_eq!(
1538                &chunk[..],
1539                vec![i as u8 + 1; 25].as_slice(),
1540                "block {i} mismatch"
1541            );
1542        }
1543
1544        mgr.shutdown().await;
1545    }
1546
1547    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1548    async fn batch_writer_caps_at_64() {
1549        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1550        let ih = make_hash(51);
1551        // 100 blocks of 16 bytes each across a single piece of 1600 bytes.
1552        let lengths = Lengths::new(1600, 1600, 16);
1553        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1554        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1555
1556        // Enqueue 100 writes — more than the 64-job batch cap, so at least
1557        // two batches are needed to drain the channel.
1558        for i in 0u32..100 {
1559            let data = Bytes::from(vec![i as u8; 16]);
1560            disk.write_block_deferred(0, i * 16, data);
1561        }
1562
1563        // Wait for all 100 writes to complete (requires multiple batches).
1564        disk.flush_piece_writes(0).await;
1565
1566        // Verify every block landed correctly.
1567        for i in 0u32..100 {
1568            let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
1569            assert_eq!(
1570                &chunk[..],
1571                vec![i as u8; 16].as_slice(),
1572                "block {i} mismatch after overflow to next batch"
1573            );
1574        }
1575
1576        mgr.shutdown().await;
1577    }
1578
1579    // ── M100: Disk-based verify tests ────────────────────────────────
1580
1581    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1582    async fn verify_from_disk_after_deferred_write() {
1583        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1584        let ih = make_hash(40);
1585        let chunk_size = 16384u32;
1586        let piece_size = u64::from(chunk_size) * 2;
1587        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1588        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1589        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1590
1591        // Write both chunks via deferred queue.
1592        let chunk0 = vec![0xAAu8; chunk_size as usize];
1593        let chunk1 = vec![0xBBu8; chunk_size as usize];
1594        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1595        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1596        disk.flush_piece_writes(0).await;
1597
1598        // Compute expected SHA-1 hash.
1599        let mut full_piece = Vec::with_capacity(piece_size as usize);
1600        full_piece.extend_from_slice(&chunk0);
1601        full_piece.extend_from_slice(&chunk1);
1602        let expected_hash = irontide_core::sha1(&full_piece);
1603
1604        // Verify via disk-read path.
1605        let (result_tx, mut result_rx) = mpsc::channel(4);
1606        disk.enqueue_verify(0, expected_hash, 0, &result_tx);
1607        let result = result_rx
1608            .recv()
1609            .await
1610            .expect("should receive verify result");
1611        assert_eq!(result.piece, 0);
1612        assert!(result.passed, "disk-based SHA-1 verify should pass");
1613
1614        // Wrong hash should fail.
1615        disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1616        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1617        disk.flush_piece_writes(0).await;
1618        disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
1619        let result = result_rx
1620            .recv()
1621            .await
1622            .expect("should receive verify result");
1623        assert!(!result.passed, "wrong hash should fail disk-based verify");
1624
1625        mgr.shutdown().await;
1626    }
1627
1628    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1629    async fn verify_v2_from_disk_after_deferred_write() {
1630        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1631        let ih = make_hash(41);
1632        let chunk_size = 16384u32;
1633        let piece_size = u64::from(chunk_size) * 2;
1634        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1635        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1636        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1637
1638        // Write both chunks via deferred queue.
1639        let chunk0 = vec![0xCCu8; chunk_size as usize];
1640        let chunk1 = vec![0xDDu8; chunk_size as usize];
1641        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1642        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1643        disk.flush_piece_writes(0).await;
1644
1645        // Compute expected SHA-256 hash.
1646        let mut full_piece = Vec::with_capacity(piece_size as usize);
1647        full_piece.extend_from_slice(&chunk0);
1648        full_piece.extend_from_slice(&chunk1);
1649        let expected_hash = irontide_core::sha256(&full_piece);
1650
1651        // Verify via disk-read path.
1652        let (result_tx, mut result_rx) = mpsc::channel(4);
1653        disk.enqueue_verify_v2(0, expected_hash, &result_tx);
1654        let result = result_rx
1655            .recv()
1656            .await
1657            .expect("should receive v2 verify result");
1658        assert_eq!(result.piece, 0);
1659        assert!(result.passed, "disk-based SHA-256 verify should pass");
1660
1661        // Wrong hash should fail.
1662        disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1663        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1664        disk.flush_piece_writes(0).await;
1665        disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
1666        let result = result_rx
1667            .recv()
1668            .await
1669            .expect("should receive v2 verify result");
1670        assert!(
1671            !result.passed,
1672            "wrong hash should fail disk-based v2 verify"
1673        );
1674
1675        mgr.shutdown().await;
1676    }
1677
1678    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1679    async fn verify_with_hash_pool_from_disk() {
1680        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1681        let ih = make_hash(42);
1682        let chunk_size = 16384u32;
1683        let piece_size = u64::from(chunk_size) * 2;
1684        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1685        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1686        let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1687
1688        // Configure hash pool.
1689        let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
1690        disk.set_hash_result_tx(hash_result_tx);
1691        let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
1692        disk.set_hash_pool(hash_pool);
1693
1694        // Write both chunks via deferred queue.
1695        let chunk0 = vec![0xEEu8; chunk_size as usize];
1696        let chunk1 = vec![0xFFu8; chunk_size as usize];
1697        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1698        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1699        disk.flush_piece_writes(0).await;
1700
1701        // Compute expected SHA-1 hash.
1702        let mut full_piece = Vec::with_capacity(piece_size as usize);
1703        full_piece.extend_from_slice(&chunk0);
1704        full_piece.extend_from_slice(&chunk1);
1705        let expected_hash = irontide_core::sha1(&full_piece);
1706
1707        // Verify via hash pool path (reads from disk, submits to pool).
1708        let (verify_result_tx, _) = mpsc::channel(4); // not used for pool path
1709        disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
1710        let result = hash_result_rx
1711            .recv()
1712            .await
1713            .expect("should receive hash pool result");
1714        assert!(result.passed, "hash pool disk-based verify should pass");
1715        assert_eq!(result.piece, 0);
1716        assert_eq!(result.generation, 42);
1717
1718        mgr.shutdown().await;
1719    }
1720}