Skip to main content

irontide_session/
disk.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use bitflags::bitflags;
7use bytes::Bytes;
8use irontide_core::{Id20, Id32};
9use irontide_storage::TorrentStorage;
10use tokio::sync::{mpsc, oneshot};
11use tracing::warn;
12
13bitflags! {
14    /// Hint flags for disk I/O operations.
15    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
16    pub struct DiskJobFlags: u8 {
17        /// Copy cached blocks rather than sharing references.
18        const FORCE_COPY      = 0x01;
19        /// Hint: sequential file access pattern (read-ahead friendly).
20        const SEQUENTIAL      = 0x02;
21        /// Don't cache this read result.
22        const VOLATILE_READ   = 0x04;
23        /// Flush completed piece to disk immediately.
24        const FLUSH_PIECE     = 0x08;
25    }
26}
27
28/// Error reported asynchronously from a non-blocking disk write.
29#[derive(Debug)]
30pub struct DiskWriteError {
31    /// Piece index that failed to write.
32    pub piece: u32,
33    /// Byte offset within the piece.
34    pub begin: u32,
35    /// The underlying storage error.
36    pub error: irontide_storage::Error,
37}
38
39/// Result of an asynchronous piece hash verification.
40#[derive(Debug)]
41pub struct VerifyResult {
42    /// Piece index that was verified.
43    pub piece: u32,
44    /// Whether the piece hash matched the expected value.
45    pub passed: bool,
46}
47
48/// A single block write job for the deferred writer task.
49pub(crate) struct WriteJob {
50    piece: u32,
51    begin: u32,
52    data: Bytes,
53}
54
55/// State for the per-torrent deferred write queue.
56///
57/// Peers enqueue writes via an MPSC channel; a dedicated writer task
58/// drains the channel and calls `block_in_place(storage.write_chunk())`.
59/// A per-piece pending counter + Notify allows callers to wait until
60/// all writes for a piece are flushed before hash verification.
61pub(crate) struct DiskWriteState {
62    tx: mpsc::Sender<WriteJob>,
63    /// Per-piece outstanding write count.
64    pending: Mutex<HashMap<u32, u32>>,
65    /// Signalled whenever any piece's pending count hits zero.
66    notify: tokio::sync::Notify,
67    /// M120: Lock timing diagnostics for the pending mutex.
68    lock_timing: crate::timed_lock::LockTimingSettings,
69}
70
71pub(crate) enum DiskJob {
72    Register {
73        info_hash: Id20,
74        storage: Arc<dyn TorrentStorage>,
75        reply: oneshot::Sender<()>,
76    },
77    Unregister {
78        info_hash: Id20,
79    },
80
81    Write {
82        info_hash: Id20,
83        piece: u32,
84        begin: u32,
85        data: Bytes,
86        flags: DiskJobFlags,
87        reply: oneshot::Sender<irontide_storage::Result<()>>,
88    },
89    Read {
90        info_hash: Id20,
91        piece: u32,
92        begin: u32,
93        length: u32,
94        flags: DiskJobFlags,
95        reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
96    },
97    Hash {
98        info_hash: Id20,
99        piece: u32,
100        expected: Id20,
101        #[allow(dead_code)]
102        flags: DiskJobFlags,
103        reply: oneshot::Sender<irontide_storage::Result<bool>>,
104    },
105    HashV2 {
106        info_hash: Id20,
107        piece: u32,
108        expected: Id32,
109        #[allow(dead_code)]
110        flags: DiskJobFlags,
111        reply: oneshot::Sender<irontide_storage::Result<bool>>,
112    },
113    BlockHash {
114        info_hash: Id20,
115        piece: u32,
116        begin: u32,
117        length: u32,
118        #[allow(dead_code)]
119        flags: DiskJobFlags,
120        reply: oneshot::Sender<irontide_storage::Result<Id32>>,
121    },
122
123    ClearPiece {
124        info_hash: Id20,
125        piece: u32,
126    },
127    FlushWriteBuffer {
128        info_hash: Id20,
129        piece: u32,
130        reply: oneshot::Sender<irontide_storage::Result<()>>,
131    },
132
133    CachedPieces {
134        info_hash: Id20,
135        reply: oneshot::Sender<Vec<u32>>,
136    },
137
138    /// Flush all buffered writes across all torrents.
139    FlushAll {
140        reply: oneshot::Sender<irontide_storage::Result<()>>,
141    },
142
143    Shutdown {
144        reply: oneshot::Sender<()>,
145    },
146}
147
148/// Configuration for the disk I/O subsystem.
149#[derive(Debug, Clone)]
150pub struct DiskConfig {
151    /// Number of concurrent I/O threads (semaphore permits). Default: 4.
152    pub io_threads: usize,
153    /// Storage allocation mode. Default: Auto.
154    pub storage_mode: irontide_core::StorageMode,
155    /// Total cache size in bytes (read + write). Default: 16 MiB.
156    /// Deprecated: use `buffer_pool_capacity` instead.
157    pub cache_size: usize,
158    /// Fraction of cache_size reserved for write buffering. Default: 0.5.
159    /// Deprecated: buffer pool handles write/read split implicitly.
160    pub write_cache_ratio: f32,
161    /// Bounded channel capacity. Default: 512.
162    pub channel_capacity: usize,
163    /// Unified buffer pool capacity in bytes. Default: 64 MiB.
164    pub buffer_pool_capacity: usize,
165    /// Lock cached piece data in physical memory. Default: true on Unix.
166    pub enable_mlock: bool,
167    /// M120: Lock timing warning threshold in milliseconds (0 = disabled).
168    pub lock_warn_threshold_ms: u64,
169    /// io_uring submission queue depth. Default: 256.
170    pub io_uring_sq_depth: u32,
171    /// Enable O_DIRECT for io_uring writes. Default: false.
172    pub io_uring_direct_io: bool,
173    /// Enable direct I/O for filesystem storage (O_DIRECT / F_NOCACHE). Default: false.
174    pub filesystem_direct_io: bool,
175    /// Minimum segments to batch for io_uring. Default: 4.
176    pub io_uring_batch_threshold: usize,
177    /// IOCP concurrent thread count (0 = system default). Default: 0.
178    pub iocp_concurrent_threads: u32,
179    /// Enable FILE_FLAG_NO_BUFFERING for IOCP I/O. Default: false.
180    pub iocp_direct_io: bool,
181}
182
183impl Default for DiskConfig {
184    fn default() -> Self {
185        DiskConfig {
186            io_threads: 4,
187            storage_mode: irontide_core::StorageMode::Auto,
188            cache_size: 16 * 1024 * 1024,
189            write_cache_ratio: 0.5,
190            channel_capacity: 512,
191            buffer_pool_capacity: 64 * 1024 * 1024,
192            enable_mlock: cfg!(unix),
193            lock_warn_threshold_ms: 50,
194            io_uring_sq_depth: 256,
195            io_uring_direct_io: false,
196            filesystem_direct_io: false,
197            io_uring_batch_threshold: 4,
198            iocp_concurrent_threads: 0,
199            iocp_direct_io: false,
200        }
201    }
202}
203
204/// Disk I/O performance counters.
205#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
206pub struct DiskStats {
207    /// Total bytes read from disk.
208    pub read_bytes: u64,
209    /// Total bytes written to disk.
210    pub write_bytes: u64,
211    /// Number of read requests served from cache.
212    pub cache_hits: u64,
213    /// Number of read requests that required disk I/O.
214    pub cache_misses: u64,
215    /// Current size of the write buffer in bytes.
216    pub write_buffer_bytes: usize,
217    /// Number of pending disk I/O jobs in the queue.
218    pub queued_jobs: usize,
219    /// Current size of the read cache in bytes (M102).
220    #[serde(default)]
221    pub read_cache_bytes: usize,
222    /// Total number of entries in the buffer pool (M102).
223    #[serde(default)]
224    pub pool_entries: usize,
225    /// Number of prefetch insertions into the read cache (M102).
226    #[serde(default)]
227    pub prefetch_count: u64,
228    /// Number of ARC evictions from the read cache (M102).
229    #[serde(default)]
230    pub eviction_count: u64,
231    /// Number of Writing-to-Skeleton demotions (M102).
232    #[serde(default)]
233    pub skeleton_count: u64,
234}
235
236impl From<crate::disk_backend::DiskIoStats> for DiskStats {
237    fn from(s: crate::disk_backend::DiskIoStats) -> Self {
238        DiskStats {
239            read_bytes: s.read_bytes,
240            write_bytes: s.write_bytes,
241            cache_hits: s.cache_hits,
242            cache_misses: s.cache_misses,
243            write_buffer_bytes: s.write_buffer_bytes,
244            queued_jobs: 0,
245            read_cache_bytes: s.read_cache_bytes,
246            pool_entries: s.pool_entries,
247            prefetch_count: s.prefetch_count,
248            eviction_count: s.eviction_count,
249            skeleton_count: s.skeleton_count,
250        }
251    }
252}
253
254// ---------------------------------------------------------------------------
255// DiskManagerHandle — session-level handle
256// ---------------------------------------------------------------------------
257
258/// Session-level handle for managing the disk subsystem.
259#[derive(Clone)]
260pub struct DiskManagerHandle {
261    tx: mpsc::Sender<DiskJob>,
262    /// Backend reference for per-torrent deferred writes (M100).
263    backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
264    /// Bounded blocking-task spawner (M116).
265    spawner: crate::blocking_spawner::BlockingSpawner,
266}
267
268impl DiskManagerHandle {
269    /// Create a new disk manager with the default backend selected from config.
270    /// Returns the handle and a `JoinHandle` for the background actor task.
271    pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
272        let backend = crate::disk_backend::create_backend_from_config(&config);
273        let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
274        Self::new_with_backend(config, backend, spawner)
275    }
276
277    /// Create a new disk manager with a custom disk I/O backend.
278    /// Returns the handle and a `JoinHandle` for the background actor task.
279    pub(crate) fn new_with_backend(
280        config: DiskConfig,
281        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
282        spawner: crate::blocking_spawner::BlockingSpawner,
283    ) -> (Self, tokio::task::JoinHandle<()>) {
284        let (tx, rx) = mpsc::channel(config.channel_capacity);
285        let backend_for_actor = Arc::clone(&backend);
286        let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
287        let join = tokio::spawn(actor.run());
288        (
289            DiskManagerHandle {
290                tx,
291                backend,
292                spawner,
293            },
294            join,
295        )
296    }
297
298    /// Register a torrent's storage with the disk subsystem and return a
299    /// per-torrent `DiskHandle`.
300    pub async fn register_torrent(
301        &self,
302        info_hash: Id20,
303        storage: Arc<dyn TorrentStorage>,
304    ) -> DiskHandle {
305        // Clone storage: one for the DiskJob::Register, one for the DiskHandle.
306        let storage_for_handle = Arc::clone(&storage);
307
308        let (reply_tx, reply_rx) = oneshot::channel();
309        let _ = self
310            .tx
311            .send(DiskJob::Register {
312                info_hash,
313                storage,
314                reply: reply_tx,
315            })
316            .await;
317        let _ = reply_rx.await;
318
319        // Create the deferred write queue (M100).
320        let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
321        let write_state = Arc::new(DiskWriteState {
322            tx: write_tx,
323            pending: Mutex::new(HashMap::new()),
324            notify: tokio::sync::Notify::new(),
325            lock_timing: crate::timed_lock::LockTimingSettings::default(),
326        });
327
328        // Spawn the per-torrent writer task.
329        let writer_storage = Arc::clone(&storage_for_handle);
330        let writer_state = Arc::clone(&write_state);
331        let writer_spawner = self.spawner.clone();
332        tokio::spawn(async move {
333            while let Some(first) = write_rx.recv().await {
334                // Drain up to 64 jobs into a batch, then execute all in a
335                // single block_in_place call to reduce thread pool overhead.
336                let mut batch = vec![first];
337                while batch.len() < 64 {
338                    match write_rx.try_recv() {
339                        Ok(job) => batch.push(job),
340                        Err(_) => break,
341                    }
342                }
343
344                // Collect piece indices before moving batch into the closure.
345                let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
346
347                let ws = Arc::clone(&writer_storage);
348                let spawner = writer_spawner.clone();
349                spawner
350                    .block_in_place(move || {
351                        for WriteJob { piece, begin, data } in &batch {
352                            if let Err(e) = ws.write_chunk(*piece, *begin, data) {
353                                tracing::warn!(piece, begin, %e, "deferred write failed");
354                            }
355                        }
356                    })
357                    .await;
358
359                // Decrement pending counts for all jobs in batch, notify once.
360                {
361                    let mut pending = crate::timed_lock::TimedGuard::new(
362                        writer_state.pending.lock(),
363                        &writer_state.lock_timing,
364                        "disk_pending",
365                    );
366                    for piece in &pieces {
367                        if let Some(count) = pending.get_mut(piece) {
368                            *count = count.saturating_sub(1);
369                            if *count == 0 {
370                                pending.remove(piece);
371                            }
372                        }
373                    }
374                }
375                writer_state.notify.notify_waiters();
376            }
377        });
378
379        DiskHandle {
380            tx: self.tx.clone(),
381            info_hash,
382            hash_pool: None,
383            hash_result_tx: None,
384            storage: Some(storage_for_handle),
385            backend: Some(Arc::clone(&self.backend)),
386            write_state: Some(write_state),
387            spawner: Some(self.spawner.clone()),
388        }
389    }
390
391    /// Unregister a torrent, flushing and clearing its write buffer and cache.
392    pub async fn unregister_torrent(&self, info_hash: Id20) {
393        let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
394    }
395
396    /// Gracefully shut down the disk subsystem, flushing all buffers.
397    pub async fn shutdown(&self) {
398        let (tx, rx) = oneshot::channel();
399        let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
400        let _ = rx.await;
401    }
402}
403
404// ---------------------------------------------------------------------------
405// DiskHandle — per-torrent handle
406// ---------------------------------------------------------------------------
407
408/// Per-torrent handle for async disk I/O.
409#[derive(Clone)]
410pub struct DiskHandle {
411    tx: mpsc::Sender<DiskJob>,
412    info_hash: Id20,
413    /// Hash pool for parallel piece verification (M96).
414    hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
415    /// Per-torrent hash result sender (M96).
416    hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
417    /// Direct storage reference for deferred writes (M100).
418    storage: Option<Arc<dyn TorrentStorage>>,
419    /// Backend reference for disk-based verify (M100).
420    backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
421    /// Deferred write queue state (M100).
422    write_state: Option<Arc<DiskWriteState>>,
423    /// Bounded blocking-task spawner (M116).
424    spawner: Option<crate::blocking_spawner::BlockingSpawner>,
425}
426
427impl std::fmt::Debug for DiskHandle {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        f.debug_struct("DiskHandle")
430            .field("info_hash", &self.info_hash)
431            .finish_non_exhaustive()
432    }
433}
434
435impl DiskHandle {
436    /// Create a DiskHandle from raw parts (for internal/test use).
437    #[cfg_attr(not(test), allow(dead_code))]
438    pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
439        Self {
440            tx,
441            info_hash,
442            hash_pool: None,
443            hash_result_tx: None,
444            storage: None,
445            backend: None,
446            write_state: None,
447            spawner: None,
448        }
449    }
450
451    /// Set the hash pool reference (M96).
452    pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
453        self.hash_pool = Some(pool);
454    }
455
456    /// Set the per-torrent hash result sender (M96).
457    pub fn set_hash_result_tx(
458        &mut self,
459        tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
460    ) {
461        self.hash_result_tx = Some(tx);
462    }
463
464    /// Write a chunk to disk (may be buffered).
465    pub async fn write_chunk(
466        &self,
467        piece: u32,
468        begin: u32,
469        data: Bytes,
470        flags: DiskJobFlags,
471    ) -> irontide_storage::Result<()> {
472        let (tx, rx) = oneshot::channel();
473        let _ = self
474            .tx
475            .send(DiskJob::Write {
476                info_hash: self.info_hash,
477                piece,
478                begin,
479                data,
480                flags,
481                reply: tx,
482            })
483            .await;
484        rx.await
485            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
486                std::io::ErrorKind::BrokenPipe,
487                "disk actor gone",
488            ))))
489    }
490
491    /// Read a chunk from disk (may hit cache or write buffer).
492    pub async fn read_chunk(
493        &self,
494        piece: u32,
495        begin: u32,
496        length: u32,
497        flags: DiskJobFlags,
498    ) -> irontide_storage::Result<Bytes> {
499        let (tx, rx) = oneshot::channel();
500        let _ = self
501            .tx
502            .send(DiskJob::Read {
503                info_hash: self.info_hash,
504                piece,
505                begin,
506                length,
507                flags,
508                reply: tx,
509            })
510            .await;
511        rx.await
512            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
513                std::io::ErrorKind::BrokenPipe,
514                "disk actor gone",
515            ))))
516    }
517
518    /// Verify a piece hash against an expected value.
519    pub async fn verify_piece(
520        &self,
521        piece: u32,
522        expected: Id20,
523        flags: DiskJobFlags,
524    ) -> irontide_storage::Result<bool> {
525        let (tx, rx) = oneshot::channel();
526        let _ = self
527            .tx
528            .send(DiskJob::Hash {
529                info_hash: self.info_hash,
530                piece,
531                expected,
532                flags,
533                reply: tx,
534            })
535            .await;
536        rx.await
537            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
538                std::io::ErrorKind::BrokenPipe,
539                "disk actor gone",
540            ))))
541    }
542
543    /// Verify a piece hash against an expected SHA-256 value (v2).
544    pub async fn verify_piece_v2(
545        &self,
546        piece: u32,
547        expected: Id32,
548        flags: DiskJobFlags,
549    ) -> irontide_storage::Result<bool> {
550        let (tx, rx) = oneshot::channel();
551        let _ = self
552            .tx
553            .send(DiskJob::HashV2 {
554                info_hash: self.info_hash,
555                piece,
556                expected,
557                flags,
558                reply: tx,
559            })
560            .await;
561        rx.await
562            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
563                std::io::ErrorKind::BrokenPipe,
564                "disk actor gone",
565            ))))
566    }
567
568    /// Hash a single block with SHA-256 for Merkle verification (v2).
569    pub async fn hash_block(
570        &self,
571        piece: u32,
572        begin: u32,
573        length: u32,
574        flags: DiskJobFlags,
575    ) -> irontide_storage::Result<Id32> {
576        let (tx, rx) = oneshot::channel();
577        let _ = self
578            .tx
579            .send(DiskJob::BlockHash {
580                info_hash: self.info_hash,
581                piece,
582                begin,
583                length,
584                flags,
585                reply: tx,
586            })
587            .await;
588        rx.await
589            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
590                std::io::ErrorKind::BrokenPipe,
591                "disk actor gone",
592            ))))
593    }
594
595    /// Clear a piece from cache and write buffer (e.g. on hash failure).
596    pub async fn clear_piece(&self, piece: u32) {
597        let _ = self
598            .tx
599            .send(DiskJob::ClearPiece {
600                info_hash: self.info_hash,
601                piece,
602            })
603            .await;
604    }
605
606    /// Flush a specific piece from the write buffer to disk.
607    pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
608        let (tx, rx) = oneshot::channel();
609        let _ = self
610            .tx
611            .send(DiskJob::FlushWriteBuffer {
612                info_hash: self.info_hash,
613                piece,
614                reply: tx,
615            })
616            .await;
617        rx.await
618            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
619                std::io::ErrorKind::BrokenPipe,
620                "disk actor gone",
621            ))))
622    }
623
624    /// Query which pieces are currently in the read cache for this torrent.
625    pub async fn cached_pieces(&self) -> Vec<u32> {
626        let (tx, rx) = oneshot::channel();
627        let _ = self
628            .tx
629            .send(DiskJob::CachedPieces {
630                info_hash: self.info_hash,
631                reply: tx,
632            })
633            .await;
634        rx.await.unwrap_or_default()
635    }
636
637    /// Flush all buffered writes to persistent storage.
638    pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
639        let (tx, rx) = oneshot::channel();
640        let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
641        rx.await
642            .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
643                std::io::ErrorKind::BrokenPipe,
644                "disk actor gone",
645            ))))
646    }
647
648    /// Spawn a non-blocking v1 piece hash verification.
649    ///
650    /// M96: If a hash pool is configured, submits the job to the pool.
651    /// M101: Uses `HashJob::Streaming` to delegate reading + hashing to the
652    /// backend, eliminating the full-piece allocation on the caller side.
653    ///
654    /// The `generation` parameter enables staleness detection by the caller.
655    pub fn enqueue_verify(
656        &self,
657        piece: u32,
658        expected: Id20,
659        generation: u64,
660        result_tx: &mpsc::Sender<VerifyResult>,
661    ) {
662        // M96/M101: If hash pool is available, submit streaming job.
663        if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
664            if let Some(backend) = &self.backend {
665                let pool = pool.clone();
666                let hash_tx = hash_tx.clone();
667                let backend = Arc::clone(backend);
668                let info_hash = self.info_hash;
669                let job = crate::hash_pool::HashJob::Streaming {
670                    piece,
671                    expected,
672                    generation,
673                    info_hash,
674                    backend,
675                    result_tx: hash_tx,
676                };
677                tokio::spawn(async move {
678                    if pool.submit(job).await.is_err() {
679                        tracing::warn!(piece, "hash pool shut down, treating as failed");
680                    }
681                });
682                return;
683            }
684
685            // No backend — send failure.
686            let hash_tx = hash_tx.clone();
687            tokio::spawn(async move {
688                tracing::warn!(piece, "verify: no backend (hash pool path)");
689                let _ = hash_tx
690                    .send(crate::hash_pool::HashResult {
691                        piece,
692                        passed: false,
693                        generation,
694                    })
695                    .await;
696            });
697            return;
698        }
699
700        // Non-pool path: delegate to backend.hash_piece().
701        if let Some(backend) = &self.backend {
702            let backend = Arc::clone(backend);
703            let info_hash = self.info_hash;
704            let result_tx = result_tx.clone();
705            let spawner = self.spawner.clone().unwrap();
706            tokio::spawn(async move {
707                let passed = spawner
708                    .block_in_place(move || {
709                        backend
710                            .hash_piece(info_hash, piece, &expected)
711                            .unwrap_or_else(|e| {
712                                warn!(piece, %e, "verify: hash_piece failed");
713                                false
714                            })
715                    })
716                    .await;
717                let _ = result_tx.send(VerifyResult { piece, passed }).await;
718            });
719            return;
720        }
721
722        // No data source at all — treat as failure.
723        let result_tx = result_tx.clone();
724        tokio::spawn(async move {
725            warn!(piece, "verify: no data source, treating as failed");
726            let _ = result_tx
727                .send(VerifyResult {
728                    piece,
729                    passed: false,
730                })
731                .await;
732        });
733    }
734
735    /// Spawn a non-blocking v2 piece hash verification (SHA-256).
736    ///
737    /// Reads the piece from disk via the backend's `read_piece()` method.
738    pub fn enqueue_verify_v2(
739        &self,
740        piece: u32,
741        expected: Id32,
742        result_tx: &mpsc::Sender<VerifyResult>,
743    ) {
744        if let Some(backend) = &self.backend {
745            let backend = Arc::clone(backend);
746            let info_hash = self.info_hash;
747            let result_tx = result_tx.clone();
748            let spawner = self.spawner.clone().unwrap();
749            tokio::spawn(async move {
750                let passed = spawner
751                    .block_in_place(move || match backend.read_piece(info_hash, piece) {
752                        Ok(data) => {
753                            let actual = irontide_core::sha256(&data);
754                            actual == expected
755                        }
756                        Err(e) => {
757                            warn!(piece, %e, "verify v2: read_piece failed");
758                            false
759                        }
760                    })
761                    .await;
762                let _ = result_tx.send(VerifyResult { piece, passed }).await;
763            });
764            return;
765        }
766
767        // No backend — treat as failure.
768        let result_tx = result_tx.clone();
769        tokio::spawn(async move {
770            warn!(piece, "verify v2: no data source, treating as failed");
771            let _ = result_tx
772                .send(VerifyResult {
773                    piece,
774                    passed: false,
775                })
776                .await;
777        });
778    }
779
780    /// Enqueue a block write via the deferred writer task (M100).
781    ///
782    /// The write is sent to a dedicated per-torrent writer task that calls
783    /// `block_in_place(storage.write_chunk())`. If the channel is full, falls
784    /// back to a synchronous `block_in_place` write from the calling task.
785    ///
786    /// Returns early (no-op) if write_state is `None` (pre-M100 code path).
787    pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
788        let (write_state, storage) = match (&self.write_state, &self.storage) {
789            (Some(ws), Some(s)) => (ws, s),
790            _ => return, // pre-M100 path
791        };
792
793        // Increment pending count before sending.
794        {
795            let mut pending = crate::timed_lock::TimedGuard::new(
796                write_state.pending.lock(),
797                &write_state.lock_timing,
798                "disk_pending",
799            );
800            *pending.entry(piece).or_insert(0) += 1;
801        }
802
803        match write_state.tx.try_send(WriteJob {
804            piece,
805            begin,
806            data: data.clone(),
807        }) {
808            Ok(()) => {}
809            Err(mpsc::error::TrySendError::Full(_)) => {
810                // Channel full: write synchronously to avoid unbounded backlog.
811                let storage = Arc::clone(storage);
812                if let Some(ref spawner) = self.spawner {
813                    spawner.block_in_place_sync(|| {
814                        if let Err(e) = storage.write_chunk(piece, begin, &data) {
815                            tracing::warn!(piece, begin, %e, "deferred write fallback failed");
816                        }
817                    });
818                } else {
819                    // Fallback: direct call (pre-M116 path)
820                    if let Err(e) = storage.write_chunk(piece, begin, &data) {
821                        tracing::warn!(piece, begin, %e, "deferred write fallback failed");
822                    }
823                }
824                // Decrement pending + notify.
825                let mut pending = crate::timed_lock::TimedGuard::new(
826                    write_state.pending.lock(),
827                    &write_state.lock_timing,
828                    "disk_pending",
829                );
830                if let Some(count) = pending.get_mut(&piece) {
831                    *count = count.saturating_sub(1);
832                    if *count == 0 {
833                        pending.remove(&piece);
834                        drop(pending);
835                        write_state.notify.notify_waiters();
836                    }
837                }
838            }
839            Err(mpsc::error::TrySendError::Closed(_)) => {
840                // Writer task gone — decrement pending to avoid stuck waiters.
841                let mut pending = crate::timed_lock::TimedGuard::new(
842                    write_state.pending.lock(),
843                    &write_state.lock_timing,
844                    "disk_pending",
845                );
846                if let Some(count) = pending.get_mut(&piece) {
847                    *count = count.saturating_sub(1);
848                    if *count == 0 {
849                        pending.remove(&piece);
850                        drop(pending);
851                        write_state.notify.notify_waiters();
852                    }
853                }
854            }
855        }
856    }
857
858    /// Write a block directly to storage from two slices (vectored write).
859    ///
860    /// Bypasses the buffer pool and deferred write queue — data goes straight
861    /// to the underlying storage via `DiskIoBackend::write_block_direct`.
862    /// This is the zero-copy direct-pwrite path used by peer tasks when
863    /// the ring buffer wraps and produces two slices.
864    ///
865    /// Returns `Ok(())` on success, or an error if the backend is not set
866    /// or the underlying write fails.
867    pub(crate) fn write_block_direct(
868        &self,
869        piece: u32,
870        begin: u32,
871        s0: &[u8],
872        s1: &[u8],
873    ) -> crate::Result<()> {
874        let backend = match &self.backend {
875            Some(b) => b,
876            None => return Ok(()), // pre-M100 path
877        };
878        backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
879    }
880
881    /// Wait until all deferred writes for `piece` have been flushed to storage.
882    ///
883    /// Returns immediately if write_state is `None` (pre-M100 path) or if
884    /// there are no pending writes for the given piece.
885    pub(crate) async fn flush_piece_writes(&self, piece: u32) {
886        let write_state = match &self.write_state {
887            Some(ws) => ws,
888            None => return,
889        };
890
891        loop {
892            {
893                let pending = crate::timed_lock::TimedGuard::new(
894                    write_state.pending.lock(),
895                    &write_state.lock_timing,
896                    "disk_pending",
897                );
898                if !pending.contains_key(&piece) {
899                    return;
900                }
901            }
902            write_state.notify.notified().await;
903        }
904    }
905
906    /// Direct storage reference (M100).
907    #[allow(dead_code)]
908    pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
909        self.storage.clone()
910    }
911}
912
913// ---------------------------------------------------------------------------
914// DiskActor — dispatcher loop (all I/O runs on tokio's blocking thread pool)
915// ---------------------------------------------------------------------------
916
917struct DiskActor {
918    rx: mpsc::Receiver<DiskJob>,
919    backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
920    spawner: crate::blocking_spawner::BlockingSpawner,
921    #[allow(dead_code)]
922    config: DiskConfig,
923}
924
925impl DiskActor {
926    fn new(
927        rx: mpsc::Receiver<DiskJob>,
928        config: DiskConfig,
929        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
930        spawner: crate::blocking_spawner::BlockingSpawner,
931    ) -> Self {
932        DiskActor {
933            rx,
934            backend,
935            spawner,
936            config,
937        }
938    }
939
940    async fn run(mut self) {
941        loop {
942            // Block on first job
943            let first = match self.rx.recv().await {
944                Some(job) => job,
945                None => break,
946            };
947
948            // Drain remaining pending jobs (batch processing)
949            let mut batch = vec![first];
950            while let Ok(job) = self.rx.try_recv() {
951                batch.push(job);
952            }
953
954            for job in batch {
955                if let DiskJob::Shutdown { reply } = job {
956                    // Flush on blocking thread to avoid stalling tokio runtime.
957                    let backend = Arc::clone(&self.backend);
958                    let spawner = self.spawner.clone();
959                    let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
960                    if let Err(e) = flush_result {
961                        warn!("flush_all on shutdown failed: {e}");
962                    }
963                    let _ = reply.send(());
964                    return;
965                }
966                self.dispatch_job(job);
967            }
968        }
969    }
970
971    /// Dispatch a job for execution. Fast metadata ops run inline;
972    /// all I/O ops are spawned as fire-and-forget tasks bounded by the
973    /// semaphore, so the dispatcher loop never blocks on disk operations.
974    fn dispatch_job(&self, job: DiskJob) {
975        match job {
976            // --- Fast metadata ops (inline) ---
977            DiskJob::Register {
978                info_hash,
979                storage,
980                reply,
981            } => {
982                self.backend.register(info_hash, storage);
983                let _ = reply.send(());
984            }
985            DiskJob::Unregister { info_hash } => {
986                self.backend.unregister(info_hash);
987            }
988            DiskJob::ClearPiece { info_hash, piece } => {
989                self.backend.clear_piece(info_hash, piece);
990            }
991            DiskJob::CachedPieces { info_hash, reply } => {
992                let pieces = self.backend.cached_pieces(info_hash);
993                let _ = reply.send(pieces);
994            }
995
996            // --- Synchronous write (caller awaits reply) ---
997            DiskJob::Write {
998                info_hash,
999                piece,
1000                begin,
1001                data,
1002                flags,
1003                reply,
1004            } => {
1005                let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
1006                let backend = Arc::clone(&self.backend);
1007                let spawner = self.spawner.clone();
1008                tokio::spawn(async move {
1009                    let result = spawner
1010                        .block_in_place(move || {
1011                            backend.write_chunk(info_hash, piece, begin, data, flush)
1012                        })
1013                        .await;
1014                    let _ = reply.send(to_storage_result(result));
1015                });
1016            }
1017
1018            // --- Read ---
1019            DiskJob::Read {
1020                info_hash,
1021                piece,
1022                begin,
1023                length,
1024                flags,
1025                reply,
1026            } => {
1027                let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
1028                let backend = Arc::clone(&self.backend);
1029                let spawner = self.spawner.clone();
1030                tokio::spawn(async move {
1031                    let result = spawner
1032                        .block_in_place(move || {
1033                            backend.read_chunk(info_hash, piece, begin, length, volatile)
1034                        })
1035                        .await;
1036                    let _ = reply.send(to_storage_result(result));
1037                });
1038            }
1039
1040            // --- Synchronous hash (v1) ---
1041            DiskJob::Hash {
1042                info_hash,
1043                piece,
1044                expected,
1045                reply,
1046                ..
1047            } => {
1048                let backend = Arc::clone(&self.backend);
1049                let spawner = self.spawner.clone();
1050                tokio::spawn(async move {
1051                    let result = spawner
1052                        .block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
1053                        .await;
1054                    let _ = reply.send(to_storage_result(result));
1055                });
1056            }
1057
1058            // --- Synchronous hash (v2) ---
1059            DiskJob::HashV2 {
1060                info_hash,
1061                piece,
1062                expected,
1063                reply,
1064                ..
1065            } => {
1066                let backend = Arc::clone(&self.backend);
1067                let spawner = self.spawner.clone();
1068                tokio::spawn(async move {
1069                    let result = spawner
1070                        .block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
1071                        .await;
1072                    let _ = reply.send(to_storage_result(result));
1073                });
1074            }
1075
1076            // --- Block hash (v2 Merkle) ---
1077            DiskJob::BlockHash {
1078                info_hash,
1079                piece,
1080                begin,
1081                length,
1082                reply,
1083                ..
1084            } => {
1085                let backend = Arc::clone(&self.backend);
1086                let spawner = self.spawner.clone();
1087                tokio::spawn(async move {
1088                    let result = spawner
1089                        .block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
1090                        .await;
1091                    let _ = reply.send(to_storage_result(result));
1092                });
1093            }
1094
1095            // --- Flush piece ---
1096            DiskJob::FlushWriteBuffer {
1097                info_hash,
1098                piece,
1099                reply,
1100            } => {
1101                let backend = Arc::clone(&self.backend);
1102                let spawner = self.spawner.clone();
1103                tokio::spawn(async move {
1104                    let result = spawner
1105                        .block_in_place(move || backend.flush_piece(info_hash, piece))
1106                        .await;
1107                    let _ = reply.send(to_storage_result(result));
1108                });
1109            }
1110
1111            // --- Flush all ---
1112            DiskJob::FlushAll { reply } => {
1113                let backend = Arc::clone(&self.backend);
1114                let spawner = self.spawner.clone();
1115                tokio::spawn(async move {
1116                    let result = spawner.block_in_place(move || backend.flush_all()).await;
1117                    let _ = reply.send(to_storage_result(result));
1118                });
1119            }
1120
1121            DiskJob::Shutdown { .. } => unreachable!(),
1122        }
1123    }
1124}
1125
1126/// Convert `crate::Result<T>` to `irontide_storage::Result<T>` for reply channels.
1127fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
1128    r.map_err(|e| match e {
1129        crate::Error::Storage(se) => se,
1130        other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
1131    })
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136    use super::*;
1137    use irontide_core::Lengths;
1138    use irontide_storage::MemoryStorage;
1139
1140    // ── DiskActor integration tests ──────────────────────────────────
1141
1142    fn test_config() -> DiskConfig {
1143        DiskConfig {
1144            io_threads: 2,
1145            cache_size: 1024 * 1024,
1146            ..DiskConfig::default()
1147        }
1148    }
1149
1150    fn make_hash(n: u8) -> Id20 {
1151        let mut b = [0u8; 20];
1152        b[0] = n;
1153        Id20(b)
1154    }
1155
1156    #[tokio::test]
1157    async fn async_write_read() {
1158        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1159        let ih = make_hash(1);
1160        let lengths = Lengths::new(100, 50, 25);
1161        let storage = Arc::new(MemoryStorage::new(lengths));
1162        let disk = mgr.register_torrent(ih, storage).await;
1163
1164        let data = Bytes::from(vec![42u8; 25]);
1165        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1166            .await
1167            .unwrap();
1168        let read = disk
1169            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1170            .await
1171            .unwrap();
1172        assert_eq!(read, data);
1173
1174        mgr.shutdown().await;
1175    }
1176
1177    #[tokio::test]
1178    async fn verify_through_handle() {
1179        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1180        let ih = make_hash(2);
1181        let lengths = Lengths::new(100, 50, 25);
1182        let storage = Arc::new(MemoryStorage::new(lengths));
1183        let disk = mgr.register_torrent(ih, storage).await;
1184
1185        let piece_data = vec![9u8; 50];
1186        disk.write_chunk(
1187            0,
1188            0,
1189            Bytes::from(piece_data.clone()),
1190            DiskJobFlags::FLUSH_PIECE,
1191        )
1192        .await
1193        .unwrap();
1194        disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
1195            .await
1196            .unwrap();
1197
1198        let expected = irontide_core::sha1(&piece_data);
1199        assert!(
1200            disk.verify_piece(0, expected, DiskJobFlags::empty())
1201                .await
1202                .unwrap()
1203        );
1204        assert!(
1205            !disk
1206                .verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
1207                .await
1208                .unwrap()
1209        );
1210
1211        mgr.shutdown().await;
1212    }
1213
1214    #[tokio::test]
1215    async fn cache_hit_avoids_io() {
1216        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1217        let ih = make_hash(3);
1218        let lengths = Lengths::new(100, 50, 25);
1219        let storage = Arc::new(MemoryStorage::new(lengths));
1220        let disk = mgr.register_torrent(ih, storage).await;
1221
1222        let data = Bytes::from(vec![7u8; 25]);
1223        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1224            .await
1225            .unwrap();
1226
1227        // First read: cache miss, reads from storage
1228        let r1 = disk
1229            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1230            .await
1231            .unwrap();
1232        assert_eq!(r1, data);
1233
1234        // Second read: should be cache hit
1235        let r2 = disk
1236            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1237            .await
1238            .unwrap();
1239        assert_eq!(r2, data);
1240
1241        mgr.shutdown().await;
1242    }
1243
1244    #[tokio::test]
1245    async fn volatile_read_bypasses_cache() {
1246        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1247        let ih = make_hash(4);
1248        let lengths = Lengths::new(100, 50, 25);
1249        let storage = Arc::new(MemoryStorage::new(lengths));
1250        let disk = mgr.register_torrent(ih, storage).await;
1251
1252        let data = Bytes::from(vec![5u8; 25]);
1253        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1254            .await
1255            .unwrap();
1256
1257        // Volatile read: should not cache
1258        let r = disk
1259            .read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
1260            .await
1261            .unwrap();
1262        assert_eq!(r, data);
1263
1264        mgr.shutdown().await;
1265    }
1266
1267    #[tokio::test]
1268    async fn clear_piece_evicts_cache() {
1269        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1270        let ih = make_hash(5);
1271        let lengths = Lengths::new(100, 50, 25);
1272        let storage = Arc::new(MemoryStorage::new(lengths));
1273        let disk = mgr.register_torrent(ih, storage).await;
1274
1275        let data = Bytes::from(vec![5u8; 25]);
1276        disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1277            .await
1278            .unwrap();
1279        // Populate cache
1280        disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
1281            .await
1282            .unwrap();
1283        // Clear
1284        disk.clear_piece(0).await;
1285
1286        // Can still read (from storage, not cache)
1287        let r = disk
1288            .read_chunk(0, 0, 25, DiskJobFlags::empty())
1289            .await
1290            .unwrap();
1291        assert_eq!(r, data);
1292
1293        mgr.shutdown().await;
1294    }
1295
1296    #[tokio::test]
1297    async fn write_buffer_flush() {
1298        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1299        let ih = make_hash(6);
1300        let lengths = Lengths::new(100, 50, 25);
1301        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1302        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1303
1304        // Write to buffer (no FLUSH_PIECE)
1305        disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
1306            .await
1307            .unwrap();
1308        disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
1309            .await
1310            .unwrap();
1311
1312        // Explicitly flush
1313        disk.flush_piece(0).await.unwrap();
1314
1315        // Read back from storage directly to verify flush happened
1316        let piece = storage.read_piece(0).unwrap();
1317        assert_eq!(&piece[..25], &[1u8; 25]);
1318        assert_eq!(&piece[25..], &[2u8; 25]);
1319
1320        mgr.shutdown().await;
1321    }
1322
1323    #[tokio::test]
1324    async fn verify_piece_v2_via_disk_handle() {
1325        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1326        let ih = make_hash(11);
1327        let data = vec![0xABu8; 16384];
1328        let expected = irontide_core::sha256(&data);
1329        let lengths = Lengths::new(16384, 16384, 16384);
1330        let storage = Arc::new(MemoryStorage::new(lengths));
1331        storage.write_chunk(0, 0, &data).unwrap();
1332
1333        let disk = mgr.register_torrent(ih, storage).await;
1334        let result = disk
1335            .verify_piece_v2(0, expected, DiskJobFlags::empty())
1336            .await;
1337        assert!(result.unwrap());
1338        mgr.shutdown().await;
1339    }
1340
1341    #[tokio::test]
1342    async fn hash_block_via_disk_handle() {
1343        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1344        let ih = make_hash(12);
1345        let data = vec![0xCDu8; 16384];
1346        let lengths = Lengths::new(16384, 16384, 16384);
1347        let storage = Arc::new(MemoryStorage::new(lengths));
1348        storage.write_chunk(0, 0, &data).unwrap();
1349
1350        let disk = mgr.register_torrent(ih, storage).await;
1351        let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
1352        assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
1353        mgr.shutdown().await;
1354    }
1355
1356    #[tokio::test]
1357    async fn concurrent_verify_multiple_pieces() {
1358        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1359        let ih = make_hash(10);
1360
1361        // 8 pieces of 50 bytes each = 400 bytes total
1362        let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
1363        let piece_len = 50u64;
1364        let lengths = Lengths::new(data.len() as u64, piece_len, 25);
1365        let storage = Arc::new(MemoryStorage::new(lengths.clone()));
1366
1367        // Write all piece data
1368        let num_pieces = lengths.num_pieces();
1369        for p in 0..num_pieces {
1370            let offset = lengths.piece_offset(p) as usize;
1371            let size = lengths.piece_size(p) as usize;
1372            storage
1373                .write_chunk(p, 0, &data[offset..offset + size])
1374                .unwrap();
1375        }
1376
1377        let disk = mgr.register_torrent(ih, storage).await;
1378
1379        // Compute expected hashes
1380        let mut expected_hashes = Vec::new();
1381        for p in 0..num_pieces {
1382            let offset = lengths.piece_offset(p) as usize;
1383            let size = lengths.piece_size(p) as usize;
1384            expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
1385        }
1386
1387        // Verify all 8 pieces concurrently via JoinSet
1388        let mut js = tokio::task::JoinSet::new();
1389        for p in 0..num_pieces {
1390            let d = disk.clone();
1391            let hash = expected_hashes[p as usize];
1392            js.spawn(async move {
1393                let valid = d
1394                    .verify_piece(p, hash, DiskJobFlags::empty())
1395                    .await
1396                    .unwrap();
1397                (p, valid)
1398            });
1399        }
1400
1401        let mut results = Vec::new();
1402        while let Some(r) = js.join_next().await {
1403            results.push(r.unwrap());
1404        }
1405        results.sort_by_key(|&(p, _)| p);
1406
1407        assert_eq!(results.len(), num_pieces as usize);
1408        for (p, valid) in &results {
1409            assert!(valid, "piece {p} should be valid");
1410        }
1411
1412        mgr.shutdown().await;
1413    }
1414
1415    // ── Deferred write queue tests (M100) ────────────────────────────
1416
1417    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1418    async fn write_block_deferred_writes_to_storage() {
1419        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1420        let ih = make_hash(30);
1421        let lengths = Lengths::new(100, 50, 25);
1422        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1423        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1424
1425        let block0 = Bytes::from(vec![0xAAu8; 25]);
1426        let block1 = Bytes::from(vec![0xBBu8; 25]);
1427
1428        disk.write_block_deferred(0, 0, block0.clone());
1429        disk.write_block_deferred(0, 25, block1.clone());
1430
1431        // Wait for all writes to piece 0 to flush.
1432        disk.flush_piece_writes(0).await;
1433
1434        // Read back from storage to verify data landed on disk.
1435        let read0 = storage.read_chunk(0, 0, 25).unwrap();
1436        assert_eq!(&read0[..], &block0[..], "block 0 should match");
1437        let read1 = storage.read_chunk(0, 25, 25).unwrap();
1438        assert_eq!(&read1[..], &block1[..], "block 1 should match");
1439
1440        mgr.shutdown().await;
1441    }
1442
1443    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1444    async fn flush_piece_writes_waits_for_completion() {
1445        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1446        let ih = make_hash(31);
1447        let lengths = Lengths::new(200, 100, 25);
1448        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1449        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1450
1451        // Enqueue 4 blocks for piece 0.
1452        for i in 0u32..4 {
1453            let data = Bytes::from(vec![(i as u8) + 1; 25]);
1454            disk.write_block_deferred(0, i * 25, data);
1455        }
1456
1457        // flush_piece_writes must block until all 4 writes complete.
1458        disk.flush_piece_writes(0).await;
1459
1460        // Verify all blocks are visible on storage.
1461        let piece = storage.read_piece(0).unwrap();
1462        assert_eq!(&piece[0..25], &[1u8; 25]);
1463        assert_eq!(&piece[25..50], &[2u8; 25]);
1464        assert_eq!(&piece[50..75], &[3u8; 25]);
1465        assert_eq!(&piece[75..100], &[4u8; 25]);
1466
1467        mgr.shutdown().await;
1468    }
1469
1470    // ── M101: Batch writer tests ──────────────────────────────────────
1471
1472    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1473    async fn batch_writer_drains_multiple_jobs() {
1474        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1475        let ih = make_hash(50);
1476        // 10 blocks of 25 bytes each across a single piece of 250 bytes.
1477        let lengths = Lengths::new(250, 250, 25);
1478        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1479        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1480
1481        // Enqueue 10 writes (well within the 64-job batch cap).
1482        for i in 0u32..10 {
1483            let data = Bytes::from(vec![i as u8 + 1; 25]);
1484            disk.write_block_deferred(0, i * 25, data);
1485        }
1486
1487        // Wait for all writes to piece 0 to flush.
1488        disk.flush_piece_writes(0).await;
1489
1490        // Verify all 10 blocks landed correctly.
1491        for i in 0u32..10 {
1492            let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
1493            assert_eq!(
1494                &chunk[..],
1495                vec![i as u8 + 1; 25].as_slice(),
1496                "block {i} mismatch"
1497            );
1498        }
1499
1500        mgr.shutdown().await;
1501    }
1502
1503    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1504    async fn batch_writer_caps_at_64() {
1505        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1506        let ih = make_hash(51);
1507        // 100 blocks of 16 bytes each across a single piece of 1600 bytes.
1508        let lengths = Lengths::new(1600, 1600, 16);
1509        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1510        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1511
1512        // Enqueue 100 writes — more than the 64-job batch cap, so at least
1513        // two batches are needed to drain the channel.
1514        for i in 0u32..100 {
1515            let data = Bytes::from(vec![i as u8; 16]);
1516            disk.write_block_deferred(0, i * 16, data);
1517        }
1518
1519        // Wait for all 100 writes to complete (requires multiple batches).
1520        disk.flush_piece_writes(0).await;
1521
1522        // Verify every block landed correctly.
1523        for i in 0u32..100 {
1524            let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
1525            assert_eq!(
1526                &chunk[..],
1527                vec![i as u8; 16].as_slice(),
1528                "block {i} mismatch after overflow to next batch"
1529            );
1530        }
1531
1532        mgr.shutdown().await;
1533    }
1534
1535    // ── M100: Disk-based verify tests ────────────────────────────────
1536
1537    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1538    async fn verify_from_disk_after_deferred_write() {
1539        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1540        let ih = make_hash(40);
1541        let chunk_size = 16384u32;
1542        let piece_size = u64::from(chunk_size) * 2;
1543        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1544        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1545        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1546
1547        // Write both chunks via deferred queue.
1548        let chunk0 = vec![0xAAu8; chunk_size as usize];
1549        let chunk1 = vec![0xBBu8; chunk_size as usize];
1550        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1551        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1552        disk.flush_piece_writes(0).await;
1553
1554        // Compute expected SHA-1 hash.
1555        let mut full_piece = Vec::with_capacity(piece_size as usize);
1556        full_piece.extend_from_slice(&chunk0);
1557        full_piece.extend_from_slice(&chunk1);
1558        let expected_hash = irontide_core::sha1(&full_piece);
1559
1560        // Verify via disk-read path.
1561        let (result_tx, mut result_rx) = mpsc::channel(4);
1562        disk.enqueue_verify(0, expected_hash, 0, &result_tx);
1563        let result = result_rx
1564            .recv()
1565            .await
1566            .expect("should receive verify result");
1567        assert_eq!(result.piece, 0);
1568        assert!(result.passed, "disk-based SHA-1 verify should pass");
1569
1570        // Wrong hash should fail.
1571        disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1572        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1573        disk.flush_piece_writes(0).await;
1574        disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
1575        let result = result_rx
1576            .recv()
1577            .await
1578            .expect("should receive verify result");
1579        assert!(!result.passed, "wrong hash should fail disk-based verify");
1580
1581        mgr.shutdown().await;
1582    }
1583
1584    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1585    async fn verify_v2_from_disk_after_deferred_write() {
1586        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1587        let ih = make_hash(41);
1588        let chunk_size = 16384u32;
1589        let piece_size = u64::from(chunk_size) * 2;
1590        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1591        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1592        let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1593
1594        // Write both chunks via deferred queue.
1595        let chunk0 = vec![0xCCu8; chunk_size as usize];
1596        let chunk1 = vec![0xDDu8; chunk_size as usize];
1597        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1598        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1599        disk.flush_piece_writes(0).await;
1600
1601        // Compute expected SHA-256 hash.
1602        let mut full_piece = Vec::with_capacity(piece_size as usize);
1603        full_piece.extend_from_slice(&chunk0);
1604        full_piece.extend_from_slice(&chunk1);
1605        let expected_hash = irontide_core::sha256(&full_piece);
1606
1607        // Verify via disk-read path.
1608        let (result_tx, mut result_rx) = mpsc::channel(4);
1609        disk.enqueue_verify_v2(0, expected_hash, &result_tx);
1610        let result = result_rx
1611            .recv()
1612            .await
1613            .expect("should receive v2 verify result");
1614        assert_eq!(result.piece, 0);
1615        assert!(result.passed, "disk-based SHA-256 verify should pass");
1616
1617        // Wrong hash should fail.
1618        disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1619        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1620        disk.flush_piece_writes(0).await;
1621        disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
1622        let result = result_rx
1623            .recv()
1624            .await
1625            .expect("should receive v2 verify result");
1626        assert!(
1627            !result.passed,
1628            "wrong hash should fail disk-based v2 verify"
1629        );
1630
1631        mgr.shutdown().await;
1632    }
1633
1634    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1635    async fn verify_with_hash_pool_from_disk() {
1636        let (mgr, _actor) = DiskManagerHandle::new(test_config());
1637        let ih = make_hash(42);
1638        let chunk_size = 16384u32;
1639        let piece_size = u64::from(chunk_size) * 2;
1640        let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1641        let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1642        let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1643
1644        // Configure hash pool.
1645        let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
1646        disk.set_hash_result_tx(hash_result_tx);
1647        let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
1648        disk.set_hash_pool(hash_pool);
1649
1650        // Write both chunks via deferred queue.
1651        let chunk0 = vec![0xEEu8; chunk_size as usize];
1652        let chunk1 = vec![0xFFu8; chunk_size as usize];
1653        disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1654        disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1655        disk.flush_piece_writes(0).await;
1656
1657        // Compute expected SHA-1 hash.
1658        let mut full_piece = Vec::with_capacity(piece_size as usize);
1659        full_piece.extend_from_slice(&chunk0);
1660        full_piece.extend_from_slice(&chunk1);
1661        let expected_hash = irontide_core::sha1(&full_piece);
1662
1663        // Verify via hash pool path (reads from disk, submits to pool).
1664        let (verify_result_tx, _) = mpsc::channel(4); // not used for pool path
1665        disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
1666        let result = hash_result_rx
1667            .recv()
1668            .await
1669            .expect("should receive hash pool result");
1670        assert!(result.passed, "hash pool disk-based verify should pass");
1671        assert_eq!(result.piece, 0);
1672        assert_eq!(result.generation, 42);
1673
1674        mgr.shutdown().await;
1675    }
1676}