Skip to main content

irontide_engine/
disk.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_precision_loss,
4    clippy::cast_possible_wrap,
5    clippy::cast_sign_loss,
6    reason = "M175: disk job sizes bounded by piece_length (u32 by construction in Lengths::new)"
7)]
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use parking_lot::Mutex;
13
14use bitflags::bitflags;
15use bytes::Bytes;
16use irontide_core::{Id20, Id32};
17use irontide_storage::TorrentStorage;
18use tokio::sync::{mpsc, oneshot};
19use tracing::warn;
20
21bitflags! {
22    /// Hint flags for disk I/O operations.
23    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
24    pub struct DiskJobFlags: u8 {
25        /// Copy cached blocks rather than sharing references.
26        const FORCE_COPY      = 0x01;
27        /// Hint: sequential file access pattern (read-ahead friendly).
28        const SEQUENTIAL      = 0x02;
29        /// Don't cache this read result.
30        const VOLATILE_READ   = 0x04;
31        /// Flush completed piece to disk immediately.
32        const FLUSH_PIECE     = 0x08;
33    }
34}
35
36/// Error reported asynchronously from a non-blocking disk write.
37#[derive(Debug)]
38pub struct DiskWriteError {
39    /// Piece index that failed to write.
40    pub piece: u32,
41    /// Byte offset within the piece.
42    pub begin: u32,
43    /// The underlying storage error.
44    pub error: irontide_storage::Error,
45}
46
47/// Result of an asynchronous piece hash verification.
48#[derive(Debug)]
49pub struct VerifyResult {
50    /// Piece index that was verified.
51    pub piece: u32,
52    /// Whether the piece hash matched the expected value.
53    pub passed: bool,
54}
55
56/// A single block write job for the deferred writer task.
57pub(crate) struct WriteJob {
58    piece: u32,
59    begin: u32,
60    data: Bytes,
61}
62
63/// State for the per-torrent deferred write queue.
64///
65/// Peers enqueue writes via an MPSC channel; a dedicated writer task
66/// drains the channel and calls `block_in_place(storage.write_chunk())`.
67/// A per-piece pending counter + Notify allows callers to wait until
68/// all writes for a piece are flushed before hash verification.
69pub(crate) struct DiskWriteState {
70    tx: mpsc::Sender<WriteJob>,
71    /// Per-piece outstanding write count.
72    pending: Mutex<HashMap<u32, u32>>,
73    /// Signalled whenever any piece's pending count hits zero.
74    notify: tokio::sync::Notify,
75    /// M120: Lock timing diagnostics for the pending mutex.
76    lock_timing: crate::timed_lock::LockTimingSettings,
77}
78
79pub(crate) enum DiskJob {
80    Register {
81        info_hash: Id20,
82        storage: Arc<dyn TorrentStorage>,
83        reply: oneshot::Sender<()>,
84    },
85    Unregister {
86        info_hash: Id20,
87    },
88
89    Write {
90        info_hash: Id20,
91        piece: u32,
92        begin: u32,
93        data: Bytes,
94        flags: DiskJobFlags,
95        reply: oneshot::Sender<irontide_storage::Result<()>>,
96    },
97    Read {
98        info_hash: Id20,
99        piece: u32,
100        begin: u32,
101        length: u32,
102        flags: DiskJobFlags,
103        reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
104    },
105    Hash {
106        info_hash: Id20,
107        piece: u32,
108        expected: Id20,
109        #[allow(dead_code)]
110        flags: DiskJobFlags,
111        reply: oneshot::Sender<irontide_storage::Result<bool>>,
112    },
113    HashV2 {
114        info_hash: Id20,
115        piece: u32,
116        expected: Id32,
117        #[allow(dead_code)]
118        flags: DiskJobFlags,
119        reply: oneshot::Sender<irontide_storage::Result<bool>>,
120    },
121    BlockHash {
122        info_hash: Id20,
123        piece: u32,
124        begin: u32,
125        length: u32,
126        #[allow(dead_code)]
127        flags: DiskJobFlags,
128        reply: oneshot::Sender<irontide_storage::Result<Id32>>,
129    },
130
131    ClearPiece {
132        info_hash: Id20,
133        piece: u32,
134    },
135    FlushWriteBuffer {
136        info_hash: Id20,
137        piece: u32,
138        reply: oneshot::Sender<irontide_storage::Result<()>>,
139    },
140
141    CachedPieces {
142        info_hash: Id20,
143        reply: oneshot::Sender<Vec<u32>>,
144    },
145
146    /// Flush all buffered writes across all torrents.
147    FlushAll {
148        reply: oneshot::Sender<irontide_storage::Result<()>>,
149    },
150
151    Shutdown {
152        reply: oneshot::Sender<()>,
153    },
154}
155
156/// Configuration for the disk I/O subsystem.
157#[derive(Debug, Clone)]
158pub struct DiskConfig {
159    /// Number of concurrent I/O threads (semaphore permits). Default: 4.
160    pub io_threads: usize,
161    /// Total cache size in bytes (read + write). Default: 16 MiB.
162    /// Deprecated: use `buffer_pool_capacity` instead.
163    pub cache_size: usize,
164    /// Fraction of `cache_size` reserved for write buffering. Default: 0.5.
165    /// Deprecated: buffer pool handles write/read split implicitly.
166    pub write_cache_ratio: f32,
167    /// Bounded channel capacity. Default: 512.
168    pub channel_capacity: usize,
169    /// Unified buffer pool capacity in bytes. Default: 64 MiB.
170    pub buffer_pool_capacity: usize,
171    /// Lock cached piece data in physical memory. Default: true on Unix.
172    pub enable_mlock: bool,
173    /// M120: Lock timing warning threshold in milliseconds (0 = disabled).
174    pub lock_warn_threshold_ms: u64,
175    /// Enable direct I/O for filesystem storage (`O_DIRECT` / `F_NOCACHE`). Default: false.
176    pub filesystem_direct_io: bool,
177}
178
179impl Default for DiskConfig {
180    fn default() -> Self {
181        Self {
182            io_threads: 4,
183            cache_size: 16 * 1024 * 1024,
184            write_cache_ratio: 0.5,
185            channel_capacity: 512,
186            buffer_pool_capacity: 64 * 1024 * 1024,
187            enable_mlock: cfg!(unix),
188            lock_warn_threshold_ms: 50,
189            filesystem_direct_io: false,
190        }
191    }
192}
193
194/// Disk I/O performance counters.
195#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
196pub struct DiskStats {
197    /// Total bytes read from disk.
198    pub read_bytes: u64,
199    /// Total bytes written to disk.
200    pub write_bytes: u64,
201    /// Number of read requests served from cache.
202    pub cache_hits: u64,
203    /// Number of read requests that required disk I/O.
204    pub cache_misses: u64,
205    /// Current size of the write buffer in bytes.
206    pub write_buffer_bytes: usize,
207    /// Number of pending disk I/O jobs in the queue.
208    pub queued_jobs: usize,
209    /// Current size of the read cache in bytes (M102).
210    #[serde(default)]
211    pub read_cache_bytes: usize,
212    /// Total number of entries in the buffer pool (M102).
213    #[serde(default)]
214    pub pool_entries: usize,
215    /// Number of prefetch insertions into the read cache (M102).
216    #[serde(default)]
217    pub prefetch_count: u64,
218    /// Number of ARC evictions from the read cache (M102).
219    #[serde(default)]
220    pub eviction_count: u64,
221    /// Number of Writing-to-Skeleton demotions (M102).
222    #[serde(default)]
223    pub skeleton_count: u64,
224}
225
226impl From<crate::disk_backend::DiskIoStats> for DiskStats {
227    fn from(s: crate::disk_backend::DiskIoStats) -> Self {
228        Self {
229            read_bytes: s.read_bytes,
230            write_bytes: s.write_bytes,
231            cache_hits: s.cache_hits,
232            cache_misses: s.cache_misses,
233            write_buffer_bytes: s.write_buffer_bytes,
234            queued_jobs: 0,
235            read_cache_bytes: s.read_cache_bytes,
236            pool_entries: s.pool_entries,
237            prefetch_count: s.prefetch_count,
238            eviction_count: s.eviction_count,
239            skeleton_count: s.skeleton_count,
240        }
241    }
242}
243
244// ---------------------------------------------------------------------------
245// DiskManagerHandle — session-level handle
246// ---------------------------------------------------------------------------
247
248/// Session-level handle for managing the disk subsystem.
249#[derive(Clone)]
250pub struct DiskManagerHandle {
251    tx: mpsc::Sender<DiskJob>,
252    /// Backend reference for per-torrent deferred writes (M100).
253    backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
254    /// Bounded blocking-task spawner (M116).
255    spawner: crate::blocking_spawner::BlockingSpawner,
256}
257
258impl DiskManagerHandle {
259    /// Create a new disk manager with the default backend selected from config.
260    /// Returns the handle and a `JoinHandle` for the background actor task.
261    #[must_use]
262    pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
263        let backend = crate::disk_backend::create_backend_from_config(&config);
264        let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
265        Self::new_with_backend(config, backend, spawner)
266    }
267
268    /// Create a new disk manager with a custom disk I/O backend.
269    /// Returns the handle and a `JoinHandle` for the background actor task.
270    pub fn new_with_backend(
271        config: DiskConfig,
272        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
273        spawner: crate::blocking_spawner::BlockingSpawner,
274    ) -> (Self, tokio::task::JoinHandle<()>) {
275        let (tx, rx) = mpsc::channel(config.channel_capacity);
276        let backend_for_actor = Arc::clone(&backend);
277        let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
278        let join = tokio::spawn(actor.run());
279        (
280            Self {
281                tx,
282                backend,
283                spawner,
284            },
285            join,
286        )
287    }
288
289    /// Register a torrent's storage with the disk subsystem and return a
290    /// per-torrent `DiskHandle`.
291    pub async fn register_torrent(
292        &self,
293        info_hash: Id20,
294        storage: Arc<dyn TorrentStorage>,
295    ) -> DiskHandle {
296        // Clone storage: one for the DiskJob::Register, one for the DiskHandle.
297        let storage_for_handle = Arc::clone(&storage);
298
299        let (reply_tx, reply_rx) = oneshot::channel();
300        let _ = self
301            .tx
302            .send(DiskJob::Register {
303                info_hash,
304                storage,
305                reply: reply_tx,
306            })
307            .await;
308        let _ = reply_rx.await;
309
310        // Create the deferred write queue (M100).
311        let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
312        let write_state = Arc::new(DiskWriteState {
313            tx: write_tx,
314            pending: Mutex::new(HashMap::new()),
315            notify: tokio::sync::Notify::new(),
316            lock_timing: crate::timed_lock::LockTimingSettings::default(),
317        });
318
319        // Spawn the per-torrent writer task.
320        let writer_storage = Arc::clone(&storage_for_handle);
321        let writer_state = Arc::clone(&write_state);
322        let writer_spawner = self.spawner.clone();
323        tokio::spawn(async move {
324            while let Some(first) = write_rx.recv().await {
325                // Drain up to 64 jobs into a batch, then execute all in a
326                // single block_in_place call to reduce thread pool overhead.
327                let mut batch = vec![first];
328                while batch.len() < 64 {
329                    match write_rx.try_recv() {
330                        Ok(job) => batch.push(job),
331                        Err(_) => break,
332                    }
333                }
334
335                // Collect piece indices before moving batch into the closure.
336                let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
337
338                let ws = Arc::clone(&writer_storage);
339                let spawner = writer_spawner.clone();
340                spawner
341                    .block_in_place(move || {
342                        for WriteJob { piece, begin, data } in &batch {
343                            if let Err(e) = ws.write_chunk(*piece, *begin, data) {
344                                tracing::warn!(piece, begin, %e, "deferred write failed");
345                            }
346                        }
347                    })
348                    .await;
349
350                // Decrement pending counts for all jobs in batch, notify once.
351                {
352                    let mut pending = crate::timed_lock::TimedGuard::new(
353                        writer_state.pending.lock(),
354                        &writer_state.lock_timing,
355                        "disk_pending",
356                    );
357                    for piece in &pieces {
358                        if let Some(count) = pending.get_mut(piece) {
359                            *count = count.saturating_sub(1);
360                            if *count == 0 {
361                                pending.remove(piece);
362                            }
363                        }
364                    }
365                }
366                writer_state.notify.notify_waiters();
367            }
368        });
369
370        DiskHandle {
371            tx: self.tx.clone(),
372            info_hash,
373            hash_pool: None,
374            hash_result_tx: None,
375            storage: Some(storage_for_handle),
376            backend: Some(Arc::clone(&self.backend)),
377            write_state: Some(write_state),
378            spawner: Some(self.spawner.clone()),
379        }
380    }
381
382    /// Unregister a torrent, flushing and clearing its write buffer and cache.
383    pub async fn unregister_torrent(&self, info_hash: Id20) {
384        let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
385    }
386
387    /// Gracefully shut down the disk subsystem, flushing all buffers.
388    pub async fn shutdown(&self) {
389        let (tx, rx) = oneshot::channel();
390        let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
391        let _ = rx.await;
392    }
393}
394
395// ---------------------------------------------------------------------------
396// DiskHandle — per-torrent handle
397// ---------------------------------------------------------------------------
398
399/// Per-torrent handle for async disk I/O.
400#[derive(Clone)]
401pub struct DiskHandle {
402    tx: mpsc::Sender<DiskJob>,
403    info_hash: Id20,
404    /// Hash pool for parallel piece verification (M96).
405    hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
406    /// Per-torrent hash result sender (M96).
407    hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
408    /// Direct storage reference for deferred writes (M100).
409    storage: Option<Arc<dyn TorrentStorage>>,
410    /// Backend reference for disk-based verify (M100).
411    backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
412    /// Deferred write queue state (M100).
413    write_state: Option<Arc<DiskWriteState>>,
414    /// Bounded blocking-task spawner (M116).
415    spawner: Option<crate::blocking_spawner::BlockingSpawner>,
416}
417
418impl std::fmt::Debug for DiskHandle {
419    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420        f.debug_struct("DiskHandle")
421            .field("info_hash", &self.info_hash)
422            .finish_non_exhaustive()
423    }
424}
425
426impl DiskHandle {
427    /// Create a `DiskHandle` from raw parts (for internal/test use).
428    #[cfg_attr(not(test), allow(dead_code))]
429    pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
430        Self {
431            tx,
432            info_hash,
433            hash_pool: None,
434            hash_result_tx: None,
435            storage: None,
436            backend: None,
437            write_state: None,
438            spawner: None,
439        }
440    }
441
442    /// Set the hash pool reference (M96).
443    pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
444        self.hash_pool = Some(pool);
445    }
446
447    /// Set the per-torrent hash result sender (M96).
448    pub fn set_hash_result_tx(
449        &mut self,
450        tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
451    ) {
452        self.hash_result_tx = Some(tx);
453    }
454
455    /// Write a chunk to disk (may be buffered).
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the disk I/O fails or the disk actor is gone.
460    pub async fn write_chunk(
461        &self,
462        piece: u32,
463        begin: u32,
464        data: Bytes,
465        flags: DiskJobFlags,
466    ) -> irontide_storage::Result<()> {
467        let (tx, rx) = oneshot::channel();
468        let _ = self
469            .tx
470            .send(DiskJob::Write {
471                info_hash: self.info_hash,
472                piece,
473                begin,
474                data,
475                flags,
476                reply: tx,
477            })
478            .await;
479        rx.await.unwrap_or_else(|_| {
480            Err(irontide_storage::Error::Io(std::io::Error::new(
481                std::io::ErrorKind::BrokenPipe,
482                "disk actor gone",
483            )))
484        })
485    }
486
487    /// Read a chunk from disk (may hit cache or write buffer).
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if the disk I/O fails or the disk actor is gone.
492    pub async fn read_chunk(
493        &self,
494        piece: u32,
495        begin: u32,
496        length: u32,
497        flags: DiskJobFlags,
498    ) -> irontide_storage::Result<Bytes> {
499        let (tx, rx) = oneshot::channel();
500        let _ = self
501            .tx
502            .send(DiskJob::Read {
503                info_hash: self.info_hash,
504                piece,
505                begin,
506                length,
507                flags,
508                reply: tx,
509            })
510            .await;
511        rx.await.unwrap_or_else(|_| {
512            Err(irontide_storage::Error::Io(std::io::Error::new(
513                std::io::ErrorKind::BrokenPipe,
514                "disk actor gone",
515            )))
516        })
517    }
518
519    /// Verify a piece hash against an expected value.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the disk I/O fails or the disk actor is gone.
524    pub async fn verify_piece(
525        &self,
526        piece: u32,
527        expected: Id20,
528        flags: DiskJobFlags,
529    ) -> irontide_storage::Result<bool> {
530        let (tx, rx) = oneshot::channel();
531        let _ = self
532            .tx
533            .send(DiskJob::Hash {
534                info_hash: self.info_hash,
535                piece,
536                expected,
537                flags,
538                reply: tx,
539            })
540            .await;
541        rx.await.unwrap_or_else(|_| {
542            Err(irontide_storage::Error::Io(std::io::Error::new(
543                std::io::ErrorKind::BrokenPipe,
544                "disk actor gone",
545            )))
546        })
547    }
548
549    /// Verify a piece hash against an expected SHA-256 value (v2).
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the disk I/O fails or the disk actor is gone.
554    pub async fn verify_piece_v2(
555        &self,
556        piece: u32,
557        expected: Id32,
558        flags: DiskJobFlags,
559    ) -> irontide_storage::Result<bool> {
560        let (tx, rx) = oneshot::channel();
561        let _ = self
562            .tx
563            .send(DiskJob::HashV2 {
564                info_hash: self.info_hash,
565                piece,
566                expected,
567                flags,
568                reply: tx,
569            })
570            .await;
571        rx.await.unwrap_or_else(|_| {
572            Err(irontide_storage::Error::Io(std::io::Error::new(
573                std::io::ErrorKind::BrokenPipe,
574                "disk actor gone",
575            )))
576        })
577    }
578
579    /// Hash a single block with SHA-256 for Merkle verification (v2).
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if the disk I/O fails or the disk actor is gone.
584    pub async fn hash_block(
585        &self,
586        piece: u32,
587        begin: u32,
588        length: u32,
589        flags: DiskJobFlags,
590    ) -> irontide_storage::Result<Id32> {
591        let (tx, rx) = oneshot::channel();
592        let _ = self
593            .tx
594            .send(DiskJob::BlockHash {
595                info_hash: self.info_hash,
596                piece,
597                begin,
598                length,
599                flags,
600                reply: tx,
601            })
602            .await;
603        rx.await.unwrap_or_else(|_| {
604            Err(irontide_storage::Error::Io(std::io::Error::new(
605                std::io::ErrorKind::BrokenPipe,
606                "disk actor gone",
607            )))
608        })
609    }
610
611    /// Clear a piece from cache and write buffer (e.g. on hash failure).
612    pub async fn clear_piece(&self, piece: u32) {
613        let _ = self
614            .tx
615            .send(DiskJob::ClearPiece {
616                info_hash: self.info_hash,
617                piece,
618            })
619            .await;
620    }
621
622    /// Flush a specific piece from the write buffer to disk.
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the disk I/O fails or the disk actor is gone.
627    pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
628        let (tx, rx) = oneshot::channel();
629        let _ = self
630            .tx
631            .send(DiskJob::FlushWriteBuffer {
632                info_hash: self.info_hash,
633                piece,
634                reply: tx,
635            })
636            .await;
637        rx.await.unwrap_or_else(|_| {
638            Err(irontide_storage::Error::Io(std::io::Error::new(
639                std::io::ErrorKind::BrokenPipe,
640                "disk actor gone",
641            )))
642        })
643    }
644
645    /// Query which pieces are currently in the read cache for this torrent.
646    pub async fn cached_pieces(&self) -> Vec<u32> {
647        let (tx, rx) = oneshot::channel();
648        let _ = self
649            .tx
650            .send(DiskJob::CachedPieces {
651                info_hash: self.info_hash,
652                reply: tx,
653            })
654            .await;
655        rx.await.unwrap_or_default()
656    }
657
658    /// Flush all buffered writes to persistent storage.
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if the disk I/O fails or the disk actor is gone.
663    pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
664        let (tx, rx) = oneshot::channel();
665        let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
666        rx.await.unwrap_or_else(|_| {
667            Err(irontide_storage::Error::Io(std::io::Error::new(
668                std::io::ErrorKind::BrokenPipe,
669                "disk actor gone",
670            )))
671        })
672    }
673
674    /// Spawn a non-blocking v1 piece hash verification.
675    ///
676    /// M96: If a hash pool is configured, submits the job to the pool.
677    /// M101: Uses `HashJob::Streaming` to delegate reading + hashing to the
678    /// backend, eliminating the full-piece allocation on the caller side.
679    ///
680    /// The `generation` parameter enables staleness detection by the caller.
681    pub fn enqueue_verify(
682        &self,
683        piece: u32,
684        expected: Id20,
685        generation: u64,
686        result_tx: &mpsc::Sender<VerifyResult>,
687    ) {
688        // M96/M101: If hash pool is available, submit streaming job.
689        if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
690            if let Some(backend) = &self.backend {
691                let pool = pool.clone();
692                let hash_tx = hash_tx.clone();
693                let backend = Arc::clone(backend);
694                let info_hash = self.info_hash;
695                let job = crate::hash_pool::HashJob::Streaming {
696                    piece,
697                    expected,
698                    generation,
699                    info_hash,
700                    backend,
701                    result_tx: hash_tx,
702                };
703                tokio::spawn(async move {
704                    if pool.submit(job).await.is_err() {
705                        tracing::warn!(piece, "hash pool shut down, treating as failed");
706                    }
707                });
708                return;
709            }
710
711            // No backend — send failure.
712            let hash_tx = hash_tx.clone();
713            tokio::spawn(async move {
714                tracing::warn!(piece, "verify: no backend (hash pool path)");
715                let _ = hash_tx
716                    .send(crate::hash_pool::HashResult {
717                        piece,
718                        passed: false,
719                        generation,
720                    })
721                    .await;
722            });
723            return;
724        }
725
726        // Non-pool path: delegate to backend.hash_piece().
727        if let Some(backend) = &self.backend {
728            let backend = Arc::clone(backend);
729            let info_hash = self.info_hash;
730            let result_tx = result_tx.clone();
731            let spawner = self.spawner.clone().unwrap();
732            tokio::spawn(async move {
733                let passed = spawner
734                    .block_in_place(move || {
735                        backend
736                            .hash_piece(info_hash, piece, &expected)
737                            .unwrap_or_else(|e| {
738                                warn!(piece, %e, "verify: hash_piece failed");
739                                false
740                            })
741                    })
742                    .await;
743                let _ = result_tx.send(VerifyResult { piece, passed }).await;
744            });
745            return;
746        }
747
748        // No data source at all — treat as failure.
749        let result_tx = result_tx.clone();
750        tokio::spawn(async move {
751            warn!(piece, "verify: no data source, treating as failed");
752            let _ = result_tx
753                .send(VerifyResult {
754                    piece,
755                    passed: false,
756                })
757                .await;
758        });
759    }
760
761    /// Spawn a non-blocking v2 piece hash verification (SHA-256).
762    ///
763    /// Reads the piece from disk via the backend's `read_piece()` method.
764    pub fn enqueue_verify_v2(
765        &self,
766        piece: u32,
767        expected: Id32,
768        result_tx: &mpsc::Sender<VerifyResult>,
769    ) {
770        if let Some(backend) = &self.backend {
771            let backend = Arc::clone(backend);
772            let info_hash = self.info_hash;
773            let result_tx = result_tx.clone();
774            let spawner = self.spawner.clone().unwrap();
775            tokio::spawn(async move {
776                let passed = spawner
777                    .block_in_place(move || match backend.read_piece(info_hash, piece) {
778                        Ok(data) => {
779                            let actual = irontide_core::sha256(&data);
780                            actual == expected
781                        }
782                        Err(e) => {
783                            warn!(piece, %e, "verify v2: read_piece failed");
784                            false
785                        }
786                    })
787                    .await;
788                let _ = result_tx.send(VerifyResult { piece, passed }).await;
789            });
790            return;
791        }
792
793        // No backend — treat as failure.
794        let result_tx = result_tx.clone();
795        tokio::spawn(async move {
796            warn!(piece, "verify v2: no data source, treating as failed");
797            let _ = result_tx
798                .send(VerifyResult {
799                    piece,
800                    passed: false,
801                })
802                .await;
803        });
804    }
805
806    /// Enqueue a block write via the deferred writer task (M100).
807    ///
808    /// The write is sent to a dedicated per-torrent writer task that calls
809    /// `block_in_place(storage.write_chunk())`. If the channel is full, falls
810    /// back to a synchronous `block_in_place` write from the calling task.
811    ///
812    /// Returns early (no-op) if `write_state` is `None` (pre-M100 code path).
813    #[allow(
814        clippy::needless_pass_by_value,
815        reason = "Bytes is refcounted, pass-by-value is the bytes-crate idiom"
816    )]
817    pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
818        let (Some(write_state), Some(storage)) = (&self.write_state, &self.storage) else {
819            return;
820        };
821
822        // Increment pending count before sending.
823        {
824            let mut pending = crate::timed_lock::TimedGuard::new(
825                write_state.pending.lock(),
826                &write_state.lock_timing,
827                "disk_pending",
828            );
829            *pending.entry(piece).or_insert(0) += 1;
830        }
831
832        match write_state.tx.try_send(WriteJob {
833            piece,
834            begin,
835            data: data.clone(),
836        }) {
837            Ok(()) => {}
838            Err(mpsc::error::TrySendError::Full(_)) => {
839                // Channel full: write synchronously to avoid unbounded backlog.
840                let storage = Arc::clone(storage);
841                if let Some(ref spawner) = self.spawner {
842                    spawner.block_in_place_sync(|| {
843                        if let Err(e) = storage.write_chunk(piece, begin, &data) {
844                            tracing::warn!(piece, begin, %e, "deferred write fallback failed");
845                        }
846                    });
847                } else {
848                    // Fallback: direct call (pre-M116 path)
849                    if let Err(e) = storage.write_chunk(piece, begin, &data) {
850                        tracing::warn!(piece, begin, %e, "deferred write fallback failed");
851                    }
852                }
853                // Decrement pending + notify.
854                let mut pending = crate::timed_lock::TimedGuard::new(
855                    write_state.pending.lock(),
856                    &write_state.lock_timing,
857                    "disk_pending",
858                );
859                if let Some(count) = pending.get_mut(&piece) {
860                    *count = count.saturating_sub(1);
861                    if *count == 0 {
862                        pending.remove(&piece);
863                        drop(pending);
864                        write_state.notify.notify_waiters();
865                    }
866                }
867            }
868            Err(mpsc::error::TrySendError::Closed(_)) => {
869                // Writer task gone — decrement pending to avoid stuck waiters.
870                let mut pending = crate::timed_lock::TimedGuard::new(
871                    write_state.pending.lock(),
872                    &write_state.lock_timing,
873                    "disk_pending",
874                );
875                if let Some(count) = pending.get_mut(&piece) {
876                    *count = count.saturating_sub(1);
877                    if *count == 0 {
878                        pending.remove(&piece);
879                        drop(pending);
880                        write_state.notify.notify_waiters();
881                    }
882                }
883            }
884        }
885    }
886
887    /// Write a block directly to storage from two slices (vectored write).
888    ///
889    /// Bypasses the buffer pool and deferred write queue — data goes straight
890    /// to the underlying storage via `DiskIoBackend::write_block_direct`.
891    /// This is the zero-copy direct-pwrite path used by peer tasks when
892    /// the ring buffer wraps and produces two slices.
893    ///
894    /// Returns `Ok(())` on success, or an error if the backend is not set
895    /// or the underlying write fails.
896    pub(crate) fn write_block_direct(
897        &self,
898        piece: u32,
899        begin: u32,
900        s0: &[u8],
901        s1: &[u8],
902    ) -> crate::Result<()> {
903        let Some(backend) = &self.backend else {
904            return Ok(());
905        };
906        backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
907    }
908
909    /// Wait until all deferred writes for `piece` have been flushed to storage.
910    ///
911    /// Returns immediately if `write_state` is `None` (pre-M100 path) or if
912    /// there are no pending writes for the given piece.
913    pub(crate) async fn flush_piece_writes(&self, piece: u32) {
914        let Some(write_state) = &self.write_state else {
915            return;
916        };
917
918        loop {
919            {
920                let pending = crate::timed_lock::TimedGuard::new(
921                    write_state.pending.lock(),
922                    &write_state.lock_timing,
923                    "disk_pending",
924                );
925                if !pending.contains_key(&piece) {
926                    return;
927                }
928            }
929            write_state.notify.notified().await;
930        }
931    }
932
933    /// Direct storage reference (M100).
934    #[allow(dead_code)]
935    pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
936        self.storage.clone()
937    }
938}
939
940// ---------------------------------------------------------------------------
941// DiskActor — dispatcher loop (all I/O runs on tokio's blocking thread pool)
942// ---------------------------------------------------------------------------
943
944struct DiskActor {
945    rx: mpsc::Receiver<DiskJob>,
946    backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
947    spawner: crate::blocking_spawner::BlockingSpawner,
948    #[allow(dead_code)]
949    config: DiskConfig,
950}
951
952impl DiskActor {
953    fn new(
954        rx: mpsc::Receiver<DiskJob>,
955        config: DiskConfig,
956        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
957        spawner: crate::blocking_spawner::BlockingSpawner,
958    ) -> Self {
959        Self {
960            rx,
961            backend,
962            spawner,
963            config,
964        }
965    }
966
967    async fn run(mut self) {
968        loop {
969            // Block on first job
970            let Some(first) = self.rx.recv().await else {
971                break;
972            };
973
974            // Drain remaining pending jobs (batch processing)
975            let mut batch = vec![first];
976            while let Ok(job) = self.rx.try_recv() {
977                batch.push(job);
978            }
979
980            for job in batch {
981                if let DiskJob::Shutdown { reply } = job {
982                    // Flush on blocking thread to avoid stalling tokio runtime.
983                    let backend = Arc::clone(&self.backend);
984                    let spawner = self.spawner.clone();
985                    let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
986                    if let Err(e) = flush_result {
987                        warn!("flush_all on shutdown failed: {e}");
988                    }
989                    let _ = reply.send(());
990                    return;
991                }
992                self.dispatch_job(job);
993            }
994        }
995    }
996
997    /// Dispatch a job for execution. Fast metadata ops run inline;
998    /// all I/O ops are spawned as fire-and-forget tasks bounded by the
999    /// semaphore, so the dispatcher loop never blocks on disk operations.
1000    fn dispatch_job(&self, job: DiskJob) {
1001        match job {
1002            // --- Fast metadata ops (inline) ---
1003            DiskJob::Register {
1004                info_hash,
1005                storage,
1006                reply,
1007            } => {
1008                self.backend.register(info_hash, storage);
1009                let _ = reply.send(());
1010            }
1011            DiskJob::Unregister { info_hash } => {
1012                self.backend.unregister(info_hash);
1013            }
1014            DiskJob::ClearPiece { info_hash, piece } => {
1015                self.backend.clear_piece(info_hash, piece);
1016            }
1017            DiskJob::CachedPieces { info_hash, reply } => {
1018                let pieces = self.backend.cached_pieces(info_hash);
1019                let _ = reply.send(pieces);
1020            }
1021
1022            // --- Synchronous write (caller awaits reply) ---
1023            DiskJob::Write {
1024                info_hash,
1025                piece,
1026                begin,
1027                data,
1028                flags,
1029                reply,
1030            } => {
1031                let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
1032                let backend = Arc::clone(&self.backend);
1033                let spawner = self.spawner.clone();
1034                tokio::spawn(async move {
1035                    let result = spawner
1036                        .block_in_place(move || {
1037                            backend.write_chunk(info_hash, piece, begin, data, flush)
1038                        })
1039                        .await;
1040                    let _ = reply.send(to_storage_result(result));
1041                });
1042            }
1043
1044            // --- Read ---
1045            DiskJob::Read {
1046                info_hash,
1047                piece,
1048                begin,
1049                length,
1050                flags,
1051                reply,
1052            } => {
1053                let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
1054                let backend = Arc::clone(&self.backend);
1055                let spawner = self.spawner.clone();
1056                tokio::spawn(async move {
1057                    let result = spawner
1058                        .block_in_place(move || {
1059                            backend.read_chunk(info_hash, piece, begin, length, volatile)
1060                        })
1061                        .await;
1062                    let _ = reply.send(to_storage_result(result));
1063                });
1064            }
1065
1066            // --- Synchronous hash (v1) ---
1067            DiskJob::Hash {
1068                info_hash,
1069                piece,
1070                expected,
1071                reply,
1072                ..
1073            } => {
1074                let backend = Arc::clone(&self.backend);
1075                let spawner = self.spawner.clone();
1076                tokio::spawn(async move {
1077                    let result = spawner
1078                        .block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
1079                        .await;
1080                    let _ = reply.send(to_storage_result(result));
1081                });
1082            }
1083
1084            // --- Synchronous hash (v2) ---
1085            DiskJob::HashV2 {
1086                info_hash,
1087                piece,
1088                expected,
1089                reply,
1090                ..
1091            } => {
1092                let backend = Arc::clone(&self.backend);
1093                let spawner = self.spawner.clone();
1094                tokio::spawn(async move {
1095                    let result = spawner
1096                        .block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
1097                        .await;
1098                    let _ = reply.send(to_storage_result(result));
1099                });
1100            }
1101
1102            // --- Block hash (v2 Merkle) ---
1103            DiskJob::BlockHash {
1104                info_hash,
1105                piece,
1106                begin,
1107                length,
1108                reply,
1109                ..
1110            } => {
1111                let backend = Arc::clone(&self.backend);
1112                let spawner = self.spawner.clone();
1113                tokio::spawn(async move {
1114                    let result = spawner
1115                        .block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
1116                        .await;
1117                    let _ = reply.send(to_storage_result(result));
1118                });
1119            }
1120
1121            // --- Flush piece ---
1122            DiskJob::FlushWriteBuffer {
1123                info_hash,
1124                piece,
1125                reply,
1126            } => {
1127                let backend = Arc::clone(&self.backend);
1128                let spawner = self.spawner.clone();
1129                tokio::spawn(async move {
1130                    let result = spawner
1131                        .block_in_place(move || backend.flush_piece(info_hash, piece))
1132                        .await;
1133                    let _ = reply.send(to_storage_result(result));
1134                });
1135            }
1136
1137            // --- Flush all ---
1138            DiskJob::FlushAll { reply } => {
1139                let backend = Arc::clone(&self.backend);
1140                let spawner = self.spawner.clone();
1141                tokio::spawn(async move {
1142                    let result = spawner.block_in_place(move || backend.flush_all()).await;
1143                    let _ = reply.send(to_storage_result(result));
1144                });
1145            }
1146
1147            DiskJob::Shutdown { .. } => unreachable!(),
1148        }
1149    }
1150}
1151
1152/// Convert `crate::Result<T>` to `irontide_storage::Result<T>` for reply channels.
1153fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
1154    r.map_err(|e| match e {
1155        crate::Error::Storage(se) => se,
1156        other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
1157    })
1158}
1159
1160// `From<&Settings>` lifted here from settings.rs (M242) so settings.rs carries
1161// no reference to session-local config types before the irontide-settings extract.
1162impl From<&irontide_settings::Settings> for DiskConfig {
1163    fn from(s: &irontide_settings::Settings) -> Self {
1164        Self {
1165            io_threads: s.disk_io_threads,
1166            cache_size: s.disk_cache_size,
1167            write_cache_ratio: s.disk_write_cache_ratio,
1168            channel_capacity: s.disk_channel_capacity,
1169            buffer_pool_capacity: s.buffer_pool_capacity,
1170            enable_mlock: s.enable_mlock,
1171            lock_warn_threshold_ms: s.lock_warn_threshold_ms,
1172            filesystem_direct_io: s.filesystem_direct_io,
1173        }
1174    }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::*;
1180    use irontide_core::Lengths;
1181    use irontide_settings::Settings;
1182    use irontide_storage::MemoryStorage;
1183
1184    #[test]
1185    fn disk_config_from_settings() {
1186        let s = Settings::default();
1187        let dc = crate::disk::DiskConfig::from(&s);
1188        assert_eq!(dc.io_threads, s.disk_io_threads);
1189        assert_eq!(dc.cache_size, 16 * 1024 * 1024);
1190        assert!((dc.write_cache_ratio - 0.5).abs() < f32::EPSILON);
1191        assert_eq!(dc.channel_capacity, 512);
1192    }
1193
1194    // ── DiskActor integration tests ──────────────────────────────────
1195
1196    fn test_config() -> DiskConfig {
1197        DiskConfig {
1198            io_threads: 2,
1199            cache_size: 1024 * 1024,
1200            ..DiskConfig::default()
1201        }
1202    }
1203
1204    fn make_hash(n: u8) -> Id20 {
1205        let mut b = [0u8; 20];
1206        b[0] = n;
1207        Id20(b)
1208    }
1209
1210    #[tokio::test]
1211    async fn async_write_read() {
1212        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1213        let ih = make_hash(1);
1214        let lengths = Lengths::new(100, 50, 25);
1215        let storage = Arc::new(MemoryStorage::new(lengths));
1216        let disk = mgr.register_torrent(ih, storage).await;
1217
1218        let data = Bytes::from(vec![42u8; 25]);
1219        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1220            .await
1221            .unwrap();
1222        let read = disk
1223            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1224            .await
1225            .unwrap();
1226        assert_eq!(read, data);
1227
1228        mgr.shutdown().await;
1229    }
1230
1231    #[tokio::test]
1232    async fn verify_through_handle() {
1233        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1234        let ih = make_hash(2);
1235        let lengths = Lengths::new(100, 50, 25);
1236        let storage = Arc::new(MemoryStorage::new(lengths));
1237        let disk = mgr.register_torrent(ih, storage).await;
1238
1239        let piece_data = vec![9u8; 50];
1240        disk.write_chunk(
1241            0,
1242            0,
1243            Bytes::from(piece_data.clone()),
1244            DiskJobFlags::FLUSH_PIECE,
1245        )
1246        .await
1247        .unwrap();
1248        disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
1249            .await
1250            .unwrap();
1251
1252        let expected = irontide_core::sha1(&piece_data);
1253        assert!(
1254            disk.verify_piece(0, expected, DiskJobFlags::empty())
1255                .await
1256                .unwrap()
1257        );
1258        assert!(
1259            !disk
1260                .verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
1261                .await
1262                .unwrap()
1263        );
1264
1265        mgr.shutdown().await;
1266    }
1267
1268    #[tokio::test]
1269    async fn cache_hit_avoids_io() {
1270        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1271        let ih = make_hash(3);
1272        let lengths = Lengths::new(100, 50, 25);
1273        let storage = Arc::new(MemoryStorage::new(lengths));
1274        let disk = mgr.register_torrent(ih, storage).await;
1275
1276        let data = Bytes::from(vec![7u8; 25]);
1277        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1278            .await
1279            .unwrap();
1280
1281        // First read: cache miss, reads from storage
1282        let r1 = disk
1283            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1284            .await
1285            .unwrap();
1286        assert_eq!(r1, data);
1287
1288        // Second read: should be cache hit
1289        let r2 = disk
1290            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1291            .await
1292            .unwrap();
1293        assert_eq!(r2, data);
1294
1295        mgr.shutdown().await;
1296    }
1297
1298    #[tokio::test]
1299    async fn volatile_read_bypasses_cache() {
1300        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1301        let ih = make_hash(4);
1302        let lengths = Lengths::new(100, 50, 25);
1303        let storage = Arc::new(MemoryStorage::new(lengths));
1304        let disk = mgr.register_torrent(ih, storage).await;
1305
1306        let data = Bytes::from(vec![5u8; 25]);
1307        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1308            .await
1309            .unwrap();
1310
1311        // Volatile read: should not cache
1312        let r = disk
1313            .read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
1314            .await
1315            .unwrap();
1316        assert_eq!(r, data);
1317
1318        mgr.shutdown().await;
1319    }
1320
1321    #[tokio::test]
1322    async fn clear_piece_evicts_cache() {
1323        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1324        let ih = make_hash(5);
1325        let lengths = Lengths::new(100, 50, 25);
1326        let storage = Arc::new(MemoryStorage::new(lengths));
1327        let disk = mgr.register_torrent(ih, storage).await;
1328
1329        let data = Bytes::from(vec![5u8; 25]);
1330        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1331            .await
1332            .unwrap();
1333        // Populate cache
1334        disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
1335            .await
1336            .unwrap();
1337        // Clear
1338        disk.clear_piece(0).await;
1339
1340        // Can still read (from storage, not cache)
1341        let r = disk
1342            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1343            .await
1344            .unwrap();
1345        assert_eq!(r, data);
1346
1347        mgr.shutdown().await;
1348    }
1349
1350    #[tokio::test]
1351    async fn write_buffer_flush() {
1352        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1353        let ih = make_hash(6);
1354        let lengths = Lengths::new(100, 50, 25);
1355        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1356        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1357
1358        // Write to buffer (no FLUSH_PIECE)
1359        disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
1360            .await
1361            .unwrap();
1362        disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
1363            .await
1364            .unwrap();
1365
1366        // Explicitly flush
1367        disk.flush_piece(0).await.unwrap();
1368
1369        // Read back from storage directly to verify flush happened
1370        let piece = storage.read_piece(0).unwrap();
1371        assert_eq!(&piece[..25], &[1u8; 25]);
1372        assert_eq!(&piece[25..], &[2u8; 25]);
1373
1374        mgr.shutdown().await;
1375    }
1376
1377    #[tokio::test]
1378    async fn verify_piece_v2_via_disk_handle() {
1379        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1380        let ih = make_hash(11);
1381        let data = vec![0xABu8; 16384];
1382        let expected = irontide_core::sha256(&data);
1383        let lengths = Lengths::new(16384, 16384, 16384);
1384        let storage = Arc::new(MemoryStorage::new(lengths));
1385        storage.write_chunk(0, 0, &data).unwrap();
1386
1387        let disk = mgr.register_torrent(ih, storage).await;
1388        let result = disk
1389            .verify_piece_v2(0, expected, DiskJobFlags::empty())
1390            .await;
1391        assert!(result.unwrap());
1392        mgr.shutdown().await;
1393    }
1394
1395    #[tokio::test]
1396    async fn hash_block_via_disk_handle() {
1397        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1398        let ih = make_hash(12);
1399        let data = vec![0xCDu8; 16384];
1400        let lengths = Lengths::new(16384, 16384, 16384);
1401        let storage = Arc::new(MemoryStorage::new(lengths));
1402        storage.write_chunk(0, 0, &data).unwrap();
1403
1404        let disk = mgr.register_torrent(ih, storage).await;
1405        let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
1406        assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
1407        mgr.shutdown().await;
1408    }
1409
1410    #[tokio::test]
1411    async fn concurrent_verify_multiple_pieces() {
1412        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1413        let ih = make_hash(10);
1414
1415        // 8 pieces of 50 bytes each = 400 bytes total
1416        let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
1417        let piece_len = 50u64;
1418        let lengths = Lengths::new(data.len() as u64, piece_len, 25);
1419        let storage = Arc::new(MemoryStorage::new(lengths.clone()));
1420
1421        // Write all piece data
1422        let num_pieces = lengths.num_pieces();
1423        for p in 0..num_pieces {
1424            let offset = lengths.piece_offset(p) as usize;
1425            let size = lengths.piece_size(p) as usize;
1426            storage
1427                .write_chunk(p, 0, &data[offset..offset + size])
1428                .unwrap();
1429        }
1430
1431        let disk = mgr.register_torrent(ih, storage).await;
1432
1433        // Compute expected hashes
1434        let mut expected_hashes = Vec::new();
1435        for p in 0..num_pieces {
1436            let offset = lengths.piece_offset(p) as usize;
1437            let size = lengths.piece_size(p) as usize;
1438            expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
1439        }
1440
1441        // Verify all 8 pieces concurrently via JoinSet
1442        let mut js = tokio::task::JoinSet::new();
1443        for p in 0..num_pieces {
1444            let d = disk.clone();
1445            let hash = expected_hashes[p as usize];
1446            js.spawn(async move {
1447                let valid = d
1448                    .verify_piece(p, hash, DiskJobFlags::empty())
1449                    .await
1450                    .unwrap();
1451                (p, valid)
1452            });
1453        }
1454
1455        let mut results = Vec::new();
1456        while let Some(r) = js.join_next().await {
1457            results.push(r.unwrap());
1458        }
1459        results.sort_by_key(|&(p, _)| p);
1460
1461        assert_eq!(results.len(), num_pieces as usize);
1462        for (p, valid) in &results {
1463            assert!(valid, "piece {p} should be valid");
1464        }
1465
1466        mgr.shutdown().await;
1467    }
1468
1469    // ── Deferred write queue tests (M100) ────────────────────────────
1470
1471    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1472    async fn write_block_deferred_writes_to_storage() {
1473        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1474        let ih = make_hash(30);
1475        let lengths = Lengths::new(100, 50, 25);
1476        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1477        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1478
1479        let block0 = Bytes::from(vec![0xAAu8; 25]);
1480        let block1 = Bytes::from(vec![0xBBu8; 25]);
1481
1482        disk.write_block_deferred(0, 0, block0.clone());
1483        disk.write_block_deferred(0, 25, block1.clone());
1484
1485        // Wait for all writes to piece 0 to flush.
1486        disk.flush_piece_writes(0).await;
1487
1488        // Read back from storage to verify data landed on disk.
1489        let read0 = storage.read_chunk(0, 0, 25).unwrap();
1490        assert_eq!(&read0[..], &block0[..], "block 0 should match");
1491        let read1 = storage.read_chunk(0, 25, 25).unwrap();
1492        assert_eq!(&read1[..], &block1[..], "block 1 should match");
1493
1494        mgr.shutdown().await;
1495    }
1496
1497    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1498    async fn flush_piece_writes_waits_for_completion() {
1499        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1500        let ih = make_hash(31);
1501        let lengths = Lengths::new(200, 100, 25);
1502        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1503        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1504
1505        // Enqueue 4 blocks for piece 0.
1506        for i in 0u32..4 {
1507            let data = Bytes::from(vec![(i as u8) + 1; 25]);
1508            disk.write_block_deferred(0, i * 25, data);
1509        }
1510
1511        // flush_piece_writes must block until all 4 writes complete.
1512        disk.flush_piece_writes(0).await;
1513
1514        // Verify all blocks are visible on storage.
1515        let piece = storage.read_piece(0).unwrap();
1516        assert_eq!(&piece[0..25], &[1u8; 25]);
1517        assert_eq!(&piece[25..50], &[2u8; 25]);
1518        assert_eq!(&piece[50..75], &[3u8; 25]);
1519        assert_eq!(&piece[75..100], &[4u8; 25]);
1520
1521        mgr.shutdown().await;
1522    }
1523
1524    // ── M101: Batch writer tests ──────────────────────────────────────
1525
1526    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1527    async fn batch_writer_drains_multiple_jobs() {
1528        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1529        let ih = make_hash(50);
1530        // 10 blocks of 25 bytes each across a single piece of 250 bytes.
1531        let lengths = Lengths::new(250, 250, 25);
1532        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1533        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1534
1535        // Enqueue 10 writes (well within the 64-job batch cap).
1536        for i in 0u32..10 {
1537            let data = Bytes::from(vec![i as u8 + 1; 25]);
1538            disk.write_block_deferred(0, i * 25, data);
1539        }
1540
1541        // Wait for all writes to piece 0 to flush.
1542        disk.flush_piece_writes(0).await;
1543
1544        // Verify all 10 blocks landed correctly.
1545        for i in 0u32..10 {
1546            let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
1547            assert_eq!(
1548                &chunk[..],
1549                vec![i as u8 + 1; 25].as_slice(),
1550                "block {i} mismatch"
1551            );
1552        }
1553
1554        mgr.shutdown().await;
1555    }
1556
1557    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1558    async fn batch_writer_caps_at_64() {
1559        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1560        let ih = make_hash(51);
1561        // 100 blocks of 16 bytes each across a single piece of 1600 bytes.
1562        let lengths = Lengths::new(1600, 1600, 16);
1563        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1564        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1565
1566        // Enqueue 100 writes — more than the 64-job batch cap, so at least
1567        // two batches are needed to drain the channel.
1568        for i in 0u32..100 {
1569            let data = Bytes::from(vec![i as u8; 16]);
1570            disk.write_block_deferred(0, i * 16, data);
1571        }
1572
1573        // Wait for all 100 writes to complete (requires multiple batches).
1574        disk.flush_piece_writes(0).await;
1575
1576        // Verify every block landed correctly.
1577        for i in 0u32..100 {
1578            let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
1579            assert_eq!(
1580                &chunk[..],
1581                vec![i as u8; 16].as_slice(),
1582                "block {i} mismatch after overflow to next batch"
1583            );
1584        }
1585
1586        mgr.shutdown().await;
1587    }
1588
1589    // ── M100: Disk-based verify tests ────────────────────────────────
1590
1591    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1592    async fn verify_from_disk_after_deferred_write() {
1593        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1594        let ih = make_hash(40);
1595        let chunk_size = 16384u32;
1596        let piece_size = u64::from(chunk_size) * 2;
1597        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1598        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1599        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1600
1601        // Write both chunks via deferred queue.
1602        let chunk0 = vec![0xAAu8; chunk_size as usize];
1603        let chunk1 = vec![0xBBu8; chunk_size as usize];
1604        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1605        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1606        disk.flush_piece_writes(0).await;
1607
1608        // Compute expected SHA-1 hash.
1609        let mut full_piece = Vec::with_capacity(piece_size as usize);
1610        full_piece.extend_from_slice(&chunk0);
1611        full_piece.extend_from_slice(&chunk1);
1612        let expected_hash = irontide_core::sha1(&full_piece);
1613
1614        // Verify via disk-read path.
1615        let (result_tx, mut result_rx) = mpsc::channel(4);
1616        disk.enqueue_verify(0, expected_hash, 0, &result_tx);
1617        let result = result_rx
1618            .recv()
1619            .await
1620            .expect("should receive verify result");
1621        assert_eq!(result.piece, 0);
1622        assert!(result.passed, "disk-based SHA-1 verify should pass");
1623
1624        // Wrong hash should fail.
1625        disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1626        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1627        disk.flush_piece_writes(0).await;
1628        disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
1629        let result = result_rx
1630            .recv()
1631            .await
1632            .expect("should receive verify result");
1633        assert!(!result.passed, "wrong hash should fail disk-based verify");
1634
1635        mgr.shutdown().await;
1636    }
1637
1638    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1639    async fn verify_v2_from_disk_after_deferred_write() {
1640        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1641        let ih = make_hash(41);
1642        let chunk_size = 16384u32;
1643        let piece_size = u64::from(chunk_size) * 2;
1644        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1645        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1646        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1647
1648        // Write both chunks via deferred queue.
1649        let chunk0 = vec![0xCCu8; chunk_size as usize];
1650        let chunk1 = vec![0xDDu8; chunk_size as usize];
1651        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1652        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1653        disk.flush_piece_writes(0).await;
1654
1655        // Compute expected SHA-256 hash.
1656        let mut full_piece = Vec::with_capacity(piece_size as usize);
1657        full_piece.extend_from_slice(&chunk0);
1658        full_piece.extend_from_slice(&chunk1);
1659        let expected_hash = irontide_core::sha256(&full_piece);
1660
1661        // Verify via disk-read path.
1662        let (result_tx, mut result_rx) = mpsc::channel(4);
1663        disk.enqueue_verify_v2(0, expected_hash, &result_tx);
1664        let result = result_rx
1665            .recv()
1666            .await
1667            .expect("should receive v2 verify result");
1668        assert_eq!(result.piece, 0);
1669        assert!(result.passed, "disk-based SHA-256 verify should pass");
1670
1671        // Wrong hash should fail.
1672        disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1673        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1674        disk.flush_piece_writes(0).await;
1675        disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
1676        let result = result_rx
1677            .recv()
1678            .await
1679            .expect("should receive v2 verify result");
1680        assert!(
1681            !result.passed,
1682            "wrong hash should fail disk-based v2 verify"
1683        );
1684
1685        mgr.shutdown().await;
1686    }
1687
1688    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1689    async fn verify_with_hash_pool_from_disk() {
1690        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1691        let ih = make_hash(42);
1692        let chunk_size = 16384u32;
1693        let piece_size = u64::from(chunk_size) * 2;
1694        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1695        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1696        let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1697
1698        // Configure hash pool.
1699        let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
1700        disk.set_hash_result_tx(hash_result_tx);
1701        let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
1702        disk.set_hash_pool(hash_pool);
1703
1704        // Write both chunks via deferred queue.
1705        let chunk0 = vec![0xEEu8; chunk_size as usize];
1706        let chunk1 = vec![0xFFu8; chunk_size as usize];
1707        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1708        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1709        disk.flush_piece_writes(0).await;
1710
1711        // Compute expected SHA-1 hash.
1712        let mut full_piece = Vec::with_capacity(piece_size as usize);
1713        full_piece.extend_from_slice(&chunk0);
1714        full_piece.extend_from_slice(&chunk1);
1715        let expected_hash = irontide_core::sha1(&full_piece);
1716
1717        // Verify via hash pool path (reads from disk, submits to pool).
1718        let (verify_result_tx, _) = mpsc::channel(4); // not used for pool path
1719        disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
1720        let result = hash_result_rx
1721            .recv()
1722            .await
1723            .expect("should receive hash pool result");
1724        assert!(result.passed, "hash pool disk-based verify should pass");
1725        assert_eq!(result.piece, 0);
1726        assert_eq!(result.generation, 42);
1727
1728        mgr.shutdown().await;
1729    }
1730}