1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_precision_loss,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 reason = "M175: disk job sizes bounded by piece_length (u32 by construction in Lengths::new)"
7)]
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use parking_lot::Mutex;
13
14use bitflags::bitflags;
15use bytes::Bytes;
16use irontide_core::{Id20, Id32};
17use irontide_storage::TorrentStorage;
18use tokio::sync::{mpsc, oneshot};
19use tracing::warn;
20
21bitflags! {
22 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
24 pub struct DiskJobFlags: u8 {
25 const FORCE_COPY = 0x01;
27 const SEQUENTIAL = 0x02;
29 const VOLATILE_READ = 0x04;
31 const FLUSH_PIECE = 0x08;
33 }
34}
35
36#[derive(Debug)]
38pub struct DiskWriteError {
39 pub piece: u32,
41 pub begin: u32,
43 pub error: irontide_storage::Error,
45}
46
47#[derive(Debug)]
49pub struct VerifyResult {
50 pub piece: u32,
52 pub passed: bool,
54}
55
56pub(crate) struct WriteJob {
58 piece: u32,
59 begin: u32,
60 data: Bytes,
61}
62
63pub(crate) struct DiskWriteState {
70 tx: mpsc::Sender<WriteJob>,
71 pending: Mutex<HashMap<u32, u32>>,
73 notify: tokio::sync::Notify,
75 lock_timing: crate::timed_lock::LockTimingSettings,
77}
78
79pub(crate) enum DiskJob {
80 Register {
81 info_hash: Id20,
82 storage: Arc<dyn TorrentStorage>,
83 reply: oneshot::Sender<()>,
84 },
85 Unregister {
86 info_hash: Id20,
87 },
88
89 Write {
90 info_hash: Id20,
91 piece: u32,
92 begin: u32,
93 data: Bytes,
94 flags: DiskJobFlags,
95 reply: oneshot::Sender<irontide_storage::Result<()>>,
96 },
97 Read {
98 info_hash: Id20,
99 piece: u32,
100 begin: u32,
101 length: u32,
102 flags: DiskJobFlags,
103 reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
104 },
105 Hash {
106 info_hash: Id20,
107 piece: u32,
108 expected: Id20,
109 #[allow(dead_code)]
110 flags: DiskJobFlags,
111 reply: oneshot::Sender<irontide_storage::Result<bool>>,
112 },
113 HashV2 {
114 info_hash: Id20,
115 piece: u32,
116 expected: Id32,
117 #[allow(dead_code)]
118 flags: DiskJobFlags,
119 reply: oneshot::Sender<irontide_storage::Result<bool>>,
120 },
121 BlockHash {
122 info_hash: Id20,
123 piece: u32,
124 begin: u32,
125 length: u32,
126 #[allow(dead_code)]
127 flags: DiskJobFlags,
128 reply: oneshot::Sender<irontide_storage::Result<Id32>>,
129 },
130
131 ClearPiece {
132 info_hash: Id20,
133 piece: u32,
134 },
135 FlushWriteBuffer {
136 info_hash: Id20,
137 piece: u32,
138 reply: oneshot::Sender<irontide_storage::Result<()>>,
139 },
140
141 CachedPieces {
142 info_hash: Id20,
143 reply: oneshot::Sender<Vec<u32>>,
144 },
145
146 FlushAll {
148 reply: oneshot::Sender<irontide_storage::Result<()>>,
149 },
150
151 Shutdown {
152 reply: oneshot::Sender<()>,
153 },
154}
155
156#[derive(Debug, Clone)]
158pub struct DiskConfig {
159 pub io_threads: usize,
161 pub storage_mode: irontide_core::StorageMode,
163 pub cache_size: usize,
166 pub write_cache_ratio: f32,
169 pub channel_capacity: usize,
171 pub buffer_pool_capacity: usize,
173 pub enable_mlock: bool,
175 pub lock_warn_threshold_ms: u64,
177 pub io_uring_sq_depth: u32,
179 pub io_uring_direct_io: bool,
181 pub filesystem_direct_io: bool,
183 pub io_uring_batch_threshold: usize,
185 pub iocp_concurrent_threads: u32,
187 pub iocp_direct_io: bool,
189}
190
191impl Default for DiskConfig {
192 fn default() -> Self {
193 Self {
194 io_threads: 4,
195 storage_mode: irontide_core::StorageMode::Auto,
196 cache_size: 16 * 1024 * 1024,
197 write_cache_ratio: 0.5,
198 channel_capacity: 512,
199 buffer_pool_capacity: 64 * 1024 * 1024,
200 enable_mlock: cfg!(unix),
201 lock_warn_threshold_ms: 50,
202 io_uring_sq_depth: 256,
203 io_uring_direct_io: false,
204 filesystem_direct_io: false,
205 io_uring_batch_threshold: 4,
206 iocp_concurrent_threads: 0,
207 iocp_direct_io: false,
208 }
209 }
210}
211
212#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
214pub struct DiskStats {
215 pub read_bytes: u64,
217 pub write_bytes: u64,
219 pub cache_hits: u64,
221 pub cache_misses: u64,
223 pub write_buffer_bytes: usize,
225 pub queued_jobs: usize,
227 #[serde(default)]
229 pub read_cache_bytes: usize,
230 #[serde(default)]
232 pub pool_entries: usize,
233 #[serde(default)]
235 pub prefetch_count: u64,
236 #[serde(default)]
238 pub eviction_count: u64,
239 #[serde(default)]
241 pub skeleton_count: u64,
242}
243
244impl From<crate::disk_backend::DiskIoStats> for DiskStats {
245 fn from(s: crate::disk_backend::DiskIoStats) -> Self {
246 Self {
247 read_bytes: s.read_bytes,
248 write_bytes: s.write_bytes,
249 cache_hits: s.cache_hits,
250 cache_misses: s.cache_misses,
251 write_buffer_bytes: s.write_buffer_bytes,
252 queued_jobs: 0,
253 read_cache_bytes: s.read_cache_bytes,
254 pool_entries: s.pool_entries,
255 prefetch_count: s.prefetch_count,
256 eviction_count: s.eviction_count,
257 skeleton_count: s.skeleton_count,
258 }
259 }
260}
261
262#[derive(Clone)]
268pub struct DiskManagerHandle {
269 tx: mpsc::Sender<DiskJob>,
270 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
272 spawner: crate::blocking_spawner::BlockingSpawner,
274}
275
276impl DiskManagerHandle {
277 #[must_use]
280 pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
281 let backend = crate::disk_backend::create_backend_from_config(&config);
282 let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
283 Self::new_with_backend(config, backend, spawner)
284 }
285
286 pub(crate) fn new_with_backend(
289 config: DiskConfig,
290 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
291 spawner: crate::blocking_spawner::BlockingSpawner,
292 ) -> (Self, tokio::task::JoinHandle<()>) {
293 let (tx, rx) = mpsc::channel(config.channel_capacity);
294 let backend_for_actor = Arc::clone(&backend);
295 let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
296 let join = tokio::spawn(actor.run());
297 (
298 Self {
299 tx,
300 backend,
301 spawner,
302 },
303 join,
304 )
305 }
306
307 pub async fn register_torrent(
310 &self,
311 info_hash: Id20,
312 storage: Arc<dyn TorrentStorage>,
313 ) -> DiskHandle {
314 let storage_for_handle = Arc::clone(&storage);
316
317 let (reply_tx, reply_rx) = oneshot::channel();
318 let _ = self
319 .tx
320 .send(DiskJob::Register {
321 info_hash,
322 storage,
323 reply: reply_tx,
324 })
325 .await;
326 let _ = reply_rx.await;
327
328 let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
330 let write_state = Arc::new(DiskWriteState {
331 tx: write_tx,
332 pending: Mutex::new(HashMap::new()),
333 notify: tokio::sync::Notify::new(),
334 lock_timing: crate::timed_lock::LockTimingSettings::default(),
335 });
336
337 let writer_storage = Arc::clone(&storage_for_handle);
339 let writer_state = Arc::clone(&write_state);
340 let writer_spawner = self.spawner.clone();
341 tokio::spawn(async move {
342 while let Some(first) = write_rx.recv().await {
343 let mut batch = vec![first];
346 while batch.len() < 64 {
347 match write_rx.try_recv() {
348 Ok(job) => batch.push(job),
349 Err(_) => break,
350 }
351 }
352
353 let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
355
356 let ws = Arc::clone(&writer_storage);
357 let spawner = writer_spawner.clone();
358 spawner
359 .block_in_place(move || {
360 for WriteJob { piece, begin, data } in &batch {
361 if let Err(e) = ws.write_chunk(*piece, *begin, data) {
362 tracing::warn!(piece, begin, %e, "deferred write failed");
363 }
364 }
365 })
366 .await;
367
368 {
370 let mut pending = crate::timed_lock::TimedGuard::new(
371 writer_state.pending.lock(),
372 &writer_state.lock_timing,
373 "disk_pending",
374 );
375 for piece in &pieces {
376 if let Some(count) = pending.get_mut(piece) {
377 *count = count.saturating_sub(1);
378 if *count == 0 {
379 pending.remove(piece);
380 }
381 }
382 }
383 }
384 writer_state.notify.notify_waiters();
385 }
386 });
387
388 DiskHandle {
389 tx: self.tx.clone(),
390 info_hash,
391 hash_pool: None,
392 hash_result_tx: None,
393 storage: Some(storage_for_handle),
394 backend: Some(Arc::clone(&self.backend)),
395 write_state: Some(write_state),
396 spawner: Some(self.spawner.clone()),
397 }
398 }
399
400 pub async fn unregister_torrent(&self, info_hash: Id20) {
402 let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
403 }
404
405 pub async fn shutdown(&self) {
407 let (tx, rx) = oneshot::channel();
408 let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
409 let _ = rx.await;
410 }
411}
412
413#[derive(Clone)]
419pub struct DiskHandle {
420 tx: mpsc::Sender<DiskJob>,
421 info_hash: Id20,
422 hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
424 hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
426 storage: Option<Arc<dyn TorrentStorage>>,
428 backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
430 write_state: Option<Arc<DiskWriteState>>,
432 spawner: Option<crate::blocking_spawner::BlockingSpawner>,
434}
435
436impl std::fmt::Debug for DiskHandle {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct("DiskHandle")
439 .field("info_hash", &self.info_hash)
440 .finish_non_exhaustive()
441 }
442}
443
444impl DiskHandle {
445 #[cfg_attr(not(test), allow(dead_code))]
447 pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
448 Self {
449 tx,
450 info_hash,
451 hash_pool: None,
452 hash_result_tx: None,
453 storage: None,
454 backend: None,
455 write_state: None,
456 spawner: None,
457 }
458 }
459
460 pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
462 self.hash_pool = Some(pool);
463 }
464
465 pub fn set_hash_result_tx(
467 &mut self,
468 tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
469 ) {
470 self.hash_result_tx = Some(tx);
471 }
472
473 pub async fn write_chunk(
479 &self,
480 piece: u32,
481 begin: u32,
482 data: Bytes,
483 flags: DiskJobFlags,
484 ) -> irontide_storage::Result<()> {
485 let (tx, rx) = oneshot::channel();
486 let _ = self
487 .tx
488 .send(DiskJob::Write {
489 info_hash: self.info_hash,
490 piece,
491 begin,
492 data,
493 flags,
494 reply: tx,
495 })
496 .await;
497 rx.await.unwrap_or_else(|_| {
498 Err(irontide_storage::Error::Io(std::io::Error::new(
499 std::io::ErrorKind::BrokenPipe,
500 "disk actor gone",
501 )))
502 })
503 }
504
505 pub async fn read_chunk(
511 &self,
512 piece: u32,
513 begin: u32,
514 length: u32,
515 flags: DiskJobFlags,
516 ) -> irontide_storage::Result<Bytes> {
517 let (tx, rx) = oneshot::channel();
518 let _ = self
519 .tx
520 .send(DiskJob::Read {
521 info_hash: self.info_hash,
522 piece,
523 begin,
524 length,
525 flags,
526 reply: tx,
527 })
528 .await;
529 rx.await.unwrap_or_else(|_| {
530 Err(irontide_storage::Error::Io(std::io::Error::new(
531 std::io::ErrorKind::BrokenPipe,
532 "disk actor gone",
533 )))
534 })
535 }
536
537 pub async fn verify_piece(
543 &self,
544 piece: u32,
545 expected: Id20,
546 flags: DiskJobFlags,
547 ) -> irontide_storage::Result<bool> {
548 let (tx, rx) = oneshot::channel();
549 let _ = self
550 .tx
551 .send(DiskJob::Hash {
552 info_hash: self.info_hash,
553 piece,
554 expected,
555 flags,
556 reply: tx,
557 })
558 .await;
559 rx.await.unwrap_or_else(|_| {
560 Err(irontide_storage::Error::Io(std::io::Error::new(
561 std::io::ErrorKind::BrokenPipe,
562 "disk actor gone",
563 )))
564 })
565 }
566
567 pub async fn verify_piece_v2(
573 &self,
574 piece: u32,
575 expected: Id32,
576 flags: DiskJobFlags,
577 ) -> irontide_storage::Result<bool> {
578 let (tx, rx) = oneshot::channel();
579 let _ = self
580 .tx
581 .send(DiskJob::HashV2 {
582 info_hash: self.info_hash,
583 piece,
584 expected,
585 flags,
586 reply: tx,
587 })
588 .await;
589 rx.await.unwrap_or_else(|_| {
590 Err(irontide_storage::Error::Io(std::io::Error::new(
591 std::io::ErrorKind::BrokenPipe,
592 "disk actor gone",
593 )))
594 })
595 }
596
597 pub async fn hash_block(
603 &self,
604 piece: u32,
605 begin: u32,
606 length: u32,
607 flags: DiskJobFlags,
608 ) -> irontide_storage::Result<Id32> {
609 let (tx, rx) = oneshot::channel();
610 let _ = self
611 .tx
612 .send(DiskJob::BlockHash {
613 info_hash: self.info_hash,
614 piece,
615 begin,
616 length,
617 flags,
618 reply: tx,
619 })
620 .await;
621 rx.await.unwrap_or_else(|_| {
622 Err(irontide_storage::Error::Io(std::io::Error::new(
623 std::io::ErrorKind::BrokenPipe,
624 "disk actor gone",
625 )))
626 })
627 }
628
629 pub async fn clear_piece(&self, piece: u32) {
631 let _ = self
632 .tx
633 .send(DiskJob::ClearPiece {
634 info_hash: self.info_hash,
635 piece,
636 })
637 .await;
638 }
639
640 pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
646 let (tx, rx) = oneshot::channel();
647 let _ = self
648 .tx
649 .send(DiskJob::FlushWriteBuffer {
650 info_hash: self.info_hash,
651 piece,
652 reply: tx,
653 })
654 .await;
655 rx.await.unwrap_or_else(|_| {
656 Err(irontide_storage::Error::Io(std::io::Error::new(
657 std::io::ErrorKind::BrokenPipe,
658 "disk actor gone",
659 )))
660 })
661 }
662
663 pub async fn cached_pieces(&self) -> Vec<u32> {
665 let (tx, rx) = oneshot::channel();
666 let _ = self
667 .tx
668 .send(DiskJob::CachedPieces {
669 info_hash: self.info_hash,
670 reply: tx,
671 })
672 .await;
673 rx.await.unwrap_or_default()
674 }
675
676 pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
682 let (tx, rx) = oneshot::channel();
683 let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
684 rx.await.unwrap_or_else(|_| {
685 Err(irontide_storage::Error::Io(std::io::Error::new(
686 std::io::ErrorKind::BrokenPipe,
687 "disk actor gone",
688 )))
689 })
690 }
691
692 pub fn enqueue_verify(
700 &self,
701 piece: u32,
702 expected: Id20,
703 generation: u64,
704 result_tx: &mpsc::Sender<VerifyResult>,
705 ) {
706 if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
708 if let Some(backend) = &self.backend {
709 let pool = pool.clone();
710 let hash_tx = hash_tx.clone();
711 let backend = Arc::clone(backend);
712 let info_hash = self.info_hash;
713 let job = crate::hash_pool::HashJob::Streaming {
714 piece,
715 expected,
716 generation,
717 info_hash,
718 backend,
719 result_tx: hash_tx,
720 };
721 tokio::spawn(async move {
722 if pool.submit(job).await.is_err() {
723 tracing::warn!(piece, "hash pool shut down, treating as failed");
724 }
725 });
726 return;
727 }
728
729 let hash_tx = hash_tx.clone();
731 tokio::spawn(async move {
732 tracing::warn!(piece, "verify: no backend (hash pool path)");
733 let _ = hash_tx
734 .send(crate::hash_pool::HashResult {
735 piece,
736 passed: false,
737 generation,
738 })
739 .await;
740 });
741 return;
742 }
743
744 if let Some(backend) = &self.backend {
746 let backend = Arc::clone(backend);
747 let info_hash = self.info_hash;
748 let result_tx = result_tx.clone();
749 let spawner = self.spawner.clone().unwrap();
750 tokio::spawn(async move {
751 let passed = spawner
752 .block_in_place(move || {
753 backend
754 .hash_piece(info_hash, piece, &expected)
755 .unwrap_or_else(|e| {
756 warn!(piece, %e, "verify: hash_piece failed");
757 false
758 })
759 })
760 .await;
761 let _ = result_tx.send(VerifyResult { piece, passed }).await;
762 });
763 return;
764 }
765
766 let result_tx = result_tx.clone();
768 tokio::spawn(async move {
769 warn!(piece, "verify: no data source, treating as failed");
770 let _ = result_tx
771 .send(VerifyResult {
772 piece,
773 passed: false,
774 })
775 .await;
776 });
777 }
778
779 pub fn enqueue_verify_v2(
783 &self,
784 piece: u32,
785 expected: Id32,
786 result_tx: &mpsc::Sender<VerifyResult>,
787 ) {
788 if let Some(backend) = &self.backend {
789 let backend = Arc::clone(backend);
790 let info_hash = self.info_hash;
791 let result_tx = result_tx.clone();
792 let spawner = self.spawner.clone().unwrap();
793 tokio::spawn(async move {
794 let passed = spawner
795 .block_in_place(move || match backend.read_piece(info_hash, piece) {
796 Ok(data) => {
797 let actual = irontide_core::sha256(&data);
798 actual == expected
799 }
800 Err(e) => {
801 warn!(piece, %e, "verify v2: read_piece failed");
802 false
803 }
804 })
805 .await;
806 let _ = result_tx.send(VerifyResult { piece, passed }).await;
807 });
808 return;
809 }
810
811 let result_tx = result_tx.clone();
813 tokio::spawn(async move {
814 warn!(piece, "verify v2: no data source, treating as failed");
815 let _ = result_tx
816 .send(VerifyResult {
817 piece,
818 passed: false,
819 })
820 .await;
821 });
822 }
823
824 #[allow(
832 clippy::needless_pass_by_value,
833 reason = "Bytes is refcounted, pass-by-value is the bytes-crate idiom"
834 )]
835 pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
836 let (Some(write_state), Some(storage)) = (&self.write_state, &self.storage) else {
837 return;
838 };
839
840 {
842 let mut pending = crate::timed_lock::TimedGuard::new(
843 write_state.pending.lock(),
844 &write_state.lock_timing,
845 "disk_pending",
846 );
847 *pending.entry(piece).or_insert(0) += 1;
848 }
849
850 match write_state.tx.try_send(WriteJob {
851 piece,
852 begin,
853 data: data.clone(),
854 }) {
855 Ok(()) => {}
856 Err(mpsc::error::TrySendError::Full(_)) => {
857 let storage = Arc::clone(storage);
859 if let Some(ref spawner) = self.spawner {
860 spawner.block_in_place_sync(|| {
861 if let Err(e) = storage.write_chunk(piece, begin, &data) {
862 tracing::warn!(piece, begin, %e, "deferred write fallback failed");
863 }
864 });
865 } else {
866 if let Err(e) = storage.write_chunk(piece, begin, &data) {
868 tracing::warn!(piece, begin, %e, "deferred write fallback failed");
869 }
870 }
871 let mut pending = crate::timed_lock::TimedGuard::new(
873 write_state.pending.lock(),
874 &write_state.lock_timing,
875 "disk_pending",
876 );
877 if let Some(count) = pending.get_mut(&piece) {
878 *count = count.saturating_sub(1);
879 if *count == 0 {
880 pending.remove(&piece);
881 drop(pending);
882 write_state.notify.notify_waiters();
883 }
884 }
885 }
886 Err(mpsc::error::TrySendError::Closed(_)) => {
887 let mut pending = crate::timed_lock::TimedGuard::new(
889 write_state.pending.lock(),
890 &write_state.lock_timing,
891 "disk_pending",
892 );
893 if let Some(count) = pending.get_mut(&piece) {
894 *count = count.saturating_sub(1);
895 if *count == 0 {
896 pending.remove(&piece);
897 drop(pending);
898 write_state.notify.notify_waiters();
899 }
900 }
901 }
902 }
903 }
904
905 pub(crate) fn write_block_direct(
915 &self,
916 piece: u32,
917 begin: u32,
918 s0: &[u8],
919 s1: &[u8],
920 ) -> crate::Result<()> {
921 let Some(backend) = &self.backend else {
922 return Ok(());
923 };
924 backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
925 }
926
927 pub(crate) async fn flush_piece_writes(&self, piece: u32) {
932 let Some(write_state) = &self.write_state else {
933 return;
934 };
935
936 loop {
937 {
938 let pending = crate::timed_lock::TimedGuard::new(
939 write_state.pending.lock(),
940 &write_state.lock_timing,
941 "disk_pending",
942 );
943 if !pending.contains_key(&piece) {
944 return;
945 }
946 }
947 write_state.notify.notified().await;
948 }
949 }
950
951 #[allow(dead_code)]
953 pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
954 self.storage.clone()
955 }
956}
957
958struct DiskActor {
963 rx: mpsc::Receiver<DiskJob>,
964 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
965 spawner: crate::blocking_spawner::BlockingSpawner,
966 #[allow(dead_code)]
967 config: DiskConfig,
968}
969
970impl DiskActor {
971 fn new(
972 rx: mpsc::Receiver<DiskJob>,
973 config: DiskConfig,
974 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
975 spawner: crate::blocking_spawner::BlockingSpawner,
976 ) -> Self {
977 Self {
978 rx,
979 backend,
980 spawner,
981 config,
982 }
983 }
984
985 async fn run(mut self) {
986 loop {
987 let Some(first) = self.rx.recv().await else {
989 break;
990 };
991
992 let mut batch = vec![first];
994 while let Ok(job) = self.rx.try_recv() {
995 batch.push(job);
996 }
997
998 for job in batch {
999 if let DiskJob::Shutdown { reply } = job {
1000 let backend = Arc::clone(&self.backend);
1002 let spawner = self.spawner.clone();
1003 let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
1004 if let Err(e) = flush_result {
1005 warn!("flush_all on shutdown failed: {e}");
1006 }
1007 let _ = reply.send(());
1008 return;
1009 }
1010 self.dispatch_job(job);
1011 }
1012 }
1013 }
1014
1015 fn dispatch_job(&self, job: DiskJob) {
1019 match job {
1020 DiskJob::Register {
1022 info_hash,
1023 storage,
1024 reply,
1025 } => {
1026 self.backend.register(info_hash, storage);
1027 let _ = reply.send(());
1028 }
1029 DiskJob::Unregister { info_hash } => {
1030 self.backend.unregister(info_hash);
1031 }
1032 DiskJob::ClearPiece { info_hash, piece } => {
1033 self.backend.clear_piece(info_hash, piece);
1034 }
1035 DiskJob::CachedPieces { info_hash, reply } => {
1036 let pieces = self.backend.cached_pieces(info_hash);
1037 let _ = reply.send(pieces);
1038 }
1039
1040 DiskJob::Write {
1042 info_hash,
1043 piece,
1044 begin,
1045 data,
1046 flags,
1047 reply,
1048 } => {
1049 let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
1050 let backend = Arc::clone(&self.backend);
1051 let spawner = self.spawner.clone();
1052 tokio::spawn(async move {
1053 let result = spawner
1054 .block_in_place(move || {
1055 backend.write_chunk(info_hash, piece, begin, data, flush)
1056 })
1057 .await;
1058 let _ = reply.send(to_storage_result(result));
1059 });
1060 }
1061
1062 DiskJob::Read {
1064 info_hash,
1065 piece,
1066 begin,
1067 length,
1068 flags,
1069 reply,
1070 } => {
1071 let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
1072 let backend = Arc::clone(&self.backend);
1073 let spawner = self.spawner.clone();
1074 tokio::spawn(async move {
1075 let result = spawner
1076 .block_in_place(move || {
1077 backend.read_chunk(info_hash, piece, begin, length, volatile)
1078 })
1079 .await;
1080 let _ = reply.send(to_storage_result(result));
1081 });
1082 }
1083
1084 DiskJob::Hash {
1086 info_hash,
1087 piece,
1088 expected,
1089 reply,
1090 ..
1091 } => {
1092 let backend = Arc::clone(&self.backend);
1093 let spawner = self.spawner.clone();
1094 tokio::spawn(async move {
1095 let result = spawner
1096 .block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
1097 .await;
1098 let _ = reply.send(to_storage_result(result));
1099 });
1100 }
1101
1102 DiskJob::HashV2 {
1104 info_hash,
1105 piece,
1106 expected,
1107 reply,
1108 ..
1109 } => {
1110 let backend = Arc::clone(&self.backend);
1111 let spawner = self.spawner.clone();
1112 tokio::spawn(async move {
1113 let result = spawner
1114 .block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
1115 .await;
1116 let _ = reply.send(to_storage_result(result));
1117 });
1118 }
1119
1120 DiskJob::BlockHash {
1122 info_hash,
1123 piece,
1124 begin,
1125 length,
1126 reply,
1127 ..
1128 } => {
1129 let backend = Arc::clone(&self.backend);
1130 let spawner = self.spawner.clone();
1131 tokio::spawn(async move {
1132 let result = spawner
1133 .block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
1134 .await;
1135 let _ = reply.send(to_storage_result(result));
1136 });
1137 }
1138
1139 DiskJob::FlushWriteBuffer {
1141 info_hash,
1142 piece,
1143 reply,
1144 } => {
1145 let backend = Arc::clone(&self.backend);
1146 let spawner = self.spawner.clone();
1147 tokio::spawn(async move {
1148 let result = spawner
1149 .block_in_place(move || backend.flush_piece(info_hash, piece))
1150 .await;
1151 let _ = reply.send(to_storage_result(result));
1152 });
1153 }
1154
1155 DiskJob::FlushAll { reply } => {
1157 let backend = Arc::clone(&self.backend);
1158 let spawner = self.spawner.clone();
1159 tokio::spawn(async move {
1160 let result = spawner.block_in_place(move || backend.flush_all()).await;
1161 let _ = reply.send(to_storage_result(result));
1162 });
1163 }
1164
1165 DiskJob::Shutdown { .. } => unreachable!(),
1166 }
1167 }
1168}
1169
1170fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
1172 r.map_err(|e| match e {
1173 crate::Error::Storage(se) => se,
1174 other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
1175 })
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use super::*;
1181 use irontide_core::Lengths;
1182 use irontide_storage::MemoryStorage;
1183
1184 fn test_config() -> DiskConfig {
1187 DiskConfig {
1188 io_threads: 2,
1189 cache_size: 1024 * 1024,
1190 ..DiskConfig::default()
1191 }
1192 }
1193
1194 fn make_hash(n: u8) -> Id20 {
1195 let mut b = [0u8; 20];
1196 b[0] = n;
1197 Id20(b)
1198 }
1199
1200 #[tokio::test]
1201 async fn async_write_read() {
1202 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1203 let ih = make_hash(1);
1204 let lengths = Lengths::new(100, 50, 25);
1205 let storage = Arc::new(MemoryStorage::new(lengths));
1206 let disk = mgr.register_torrent(ih, storage).await;
1207
1208 let data = Bytes::from(vec![42u8; 25]);
1209 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1210 .await
1211 .unwrap();
1212 let read = disk
1213 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1214 .await
1215 .unwrap();
1216 assert_eq!(read, data);
1217
1218 mgr.shutdown().await;
1219 }
1220
1221 #[tokio::test]
1222 async fn verify_through_handle() {
1223 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1224 let ih = make_hash(2);
1225 let lengths = Lengths::new(100, 50, 25);
1226 let storage = Arc::new(MemoryStorage::new(lengths));
1227 let disk = mgr.register_torrent(ih, storage).await;
1228
1229 let piece_data = vec![9u8; 50];
1230 disk.write_chunk(
1231 0,
1232 0,
1233 Bytes::from(piece_data.clone()),
1234 DiskJobFlags::FLUSH_PIECE,
1235 )
1236 .await
1237 .unwrap();
1238 disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
1239 .await
1240 .unwrap();
1241
1242 let expected = irontide_core::sha1(&piece_data);
1243 assert!(
1244 disk.verify_piece(0, expected, DiskJobFlags::empty())
1245 .await
1246 .unwrap()
1247 );
1248 assert!(
1249 !disk
1250 .verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
1251 .await
1252 .unwrap()
1253 );
1254
1255 mgr.shutdown().await;
1256 }
1257
1258 #[tokio::test]
1259 async fn cache_hit_avoids_io() {
1260 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1261 let ih = make_hash(3);
1262 let lengths = Lengths::new(100, 50, 25);
1263 let storage = Arc::new(MemoryStorage::new(lengths));
1264 let disk = mgr.register_torrent(ih, storage).await;
1265
1266 let data = Bytes::from(vec![7u8; 25]);
1267 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1268 .await
1269 .unwrap();
1270
1271 let r1 = disk
1273 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1274 .await
1275 .unwrap();
1276 assert_eq!(r1, data);
1277
1278 let r2 = disk
1280 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1281 .await
1282 .unwrap();
1283 assert_eq!(r2, data);
1284
1285 mgr.shutdown().await;
1286 }
1287
1288 #[tokio::test]
1289 async fn volatile_read_bypasses_cache() {
1290 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1291 let ih = make_hash(4);
1292 let lengths = Lengths::new(100, 50, 25);
1293 let storage = Arc::new(MemoryStorage::new(lengths));
1294 let disk = mgr.register_torrent(ih, storage).await;
1295
1296 let data = Bytes::from(vec![5u8; 25]);
1297 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1298 .await
1299 .unwrap();
1300
1301 let r = disk
1303 .read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
1304 .await
1305 .unwrap();
1306 assert_eq!(r, data);
1307
1308 mgr.shutdown().await;
1309 }
1310
1311 #[tokio::test]
1312 async fn clear_piece_evicts_cache() {
1313 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1314 let ih = make_hash(5);
1315 let lengths = Lengths::new(100, 50, 25);
1316 let storage = Arc::new(MemoryStorage::new(lengths));
1317 let disk = mgr.register_torrent(ih, storage).await;
1318
1319 let data = Bytes::from(vec![5u8; 25]);
1320 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1321 .await
1322 .unwrap();
1323 disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
1325 .await
1326 .unwrap();
1327 disk.clear_piece(0).await;
1329
1330 let r = disk
1332 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1333 .await
1334 .unwrap();
1335 assert_eq!(r, data);
1336
1337 mgr.shutdown().await;
1338 }
1339
1340 #[tokio::test]
1341 async fn write_buffer_flush() {
1342 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1343 let ih = make_hash(6);
1344 let lengths = Lengths::new(100, 50, 25);
1345 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1346 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1347
1348 disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
1350 .await
1351 .unwrap();
1352 disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
1353 .await
1354 .unwrap();
1355
1356 disk.flush_piece(0).await.unwrap();
1358
1359 let piece = storage.read_piece(0).unwrap();
1361 assert_eq!(&piece[..25], &[1u8; 25]);
1362 assert_eq!(&piece[25..], &[2u8; 25]);
1363
1364 mgr.shutdown().await;
1365 }
1366
1367 #[tokio::test]
1368 async fn verify_piece_v2_via_disk_handle() {
1369 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1370 let ih = make_hash(11);
1371 let data = vec![0xABu8; 16384];
1372 let expected = irontide_core::sha256(&data);
1373 let lengths = Lengths::new(16384, 16384, 16384);
1374 let storage = Arc::new(MemoryStorage::new(lengths));
1375 storage.write_chunk(0, 0, &data).unwrap();
1376
1377 let disk = mgr.register_torrent(ih, storage).await;
1378 let result = disk
1379 .verify_piece_v2(0, expected, DiskJobFlags::empty())
1380 .await;
1381 assert!(result.unwrap());
1382 mgr.shutdown().await;
1383 }
1384
1385 #[tokio::test]
1386 async fn hash_block_via_disk_handle() {
1387 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1388 let ih = make_hash(12);
1389 let data = vec![0xCDu8; 16384];
1390 let lengths = Lengths::new(16384, 16384, 16384);
1391 let storage = Arc::new(MemoryStorage::new(lengths));
1392 storage.write_chunk(0, 0, &data).unwrap();
1393
1394 let disk = mgr.register_torrent(ih, storage).await;
1395 let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
1396 assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
1397 mgr.shutdown().await;
1398 }
1399
1400 #[tokio::test]
1401 async fn concurrent_verify_multiple_pieces() {
1402 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1403 let ih = make_hash(10);
1404
1405 let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
1407 let piece_len = 50u64;
1408 let lengths = Lengths::new(data.len() as u64, piece_len, 25);
1409 let storage = Arc::new(MemoryStorage::new(lengths.clone()));
1410
1411 let num_pieces = lengths.num_pieces();
1413 for p in 0..num_pieces {
1414 let offset = lengths.piece_offset(p) as usize;
1415 let size = lengths.piece_size(p) as usize;
1416 storage
1417 .write_chunk(p, 0, &data[offset..offset + size])
1418 .unwrap();
1419 }
1420
1421 let disk = mgr.register_torrent(ih, storage).await;
1422
1423 let mut expected_hashes = Vec::new();
1425 for p in 0..num_pieces {
1426 let offset = lengths.piece_offset(p) as usize;
1427 let size = lengths.piece_size(p) as usize;
1428 expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
1429 }
1430
1431 let mut js = tokio::task::JoinSet::new();
1433 for p in 0..num_pieces {
1434 let d = disk.clone();
1435 let hash = expected_hashes[p as usize];
1436 js.spawn(async move {
1437 let valid = d
1438 .verify_piece(p, hash, DiskJobFlags::empty())
1439 .await
1440 .unwrap();
1441 (p, valid)
1442 });
1443 }
1444
1445 let mut results = Vec::new();
1446 while let Some(r) = js.join_next().await {
1447 results.push(r.unwrap());
1448 }
1449 results.sort_by_key(|&(p, _)| p);
1450
1451 assert_eq!(results.len(), num_pieces as usize);
1452 for (p, valid) in &results {
1453 assert!(valid, "piece {p} should be valid");
1454 }
1455
1456 mgr.shutdown().await;
1457 }
1458
1459 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1462 async fn write_block_deferred_writes_to_storage() {
1463 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1464 let ih = make_hash(30);
1465 let lengths = Lengths::new(100, 50, 25);
1466 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1467 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1468
1469 let block0 = Bytes::from(vec![0xAAu8; 25]);
1470 let block1 = Bytes::from(vec![0xBBu8; 25]);
1471
1472 disk.write_block_deferred(0, 0, block0.clone());
1473 disk.write_block_deferred(0, 25, block1.clone());
1474
1475 disk.flush_piece_writes(0).await;
1477
1478 let read0 = storage.read_chunk(0, 0, 25).unwrap();
1480 assert_eq!(&read0[..], &block0[..], "block 0 should match");
1481 let read1 = storage.read_chunk(0, 25, 25).unwrap();
1482 assert_eq!(&read1[..], &block1[..], "block 1 should match");
1483
1484 mgr.shutdown().await;
1485 }
1486
1487 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1488 async fn flush_piece_writes_waits_for_completion() {
1489 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1490 let ih = make_hash(31);
1491 let lengths = Lengths::new(200, 100, 25);
1492 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1493 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1494
1495 for i in 0u32..4 {
1497 let data = Bytes::from(vec![(i as u8) + 1; 25]);
1498 disk.write_block_deferred(0, i * 25, data);
1499 }
1500
1501 disk.flush_piece_writes(0).await;
1503
1504 let piece = storage.read_piece(0).unwrap();
1506 assert_eq!(&piece[0..25], &[1u8; 25]);
1507 assert_eq!(&piece[25..50], &[2u8; 25]);
1508 assert_eq!(&piece[50..75], &[3u8; 25]);
1509 assert_eq!(&piece[75..100], &[4u8; 25]);
1510
1511 mgr.shutdown().await;
1512 }
1513
1514 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1517 async fn batch_writer_drains_multiple_jobs() {
1518 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1519 let ih = make_hash(50);
1520 let lengths = Lengths::new(250, 250, 25);
1522 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1523 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1524
1525 for i in 0u32..10 {
1527 let data = Bytes::from(vec![i as u8 + 1; 25]);
1528 disk.write_block_deferred(0, i * 25, data);
1529 }
1530
1531 disk.flush_piece_writes(0).await;
1533
1534 for i in 0u32..10 {
1536 let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
1537 assert_eq!(
1538 &chunk[..],
1539 vec![i as u8 + 1; 25].as_slice(),
1540 "block {i} mismatch"
1541 );
1542 }
1543
1544 mgr.shutdown().await;
1545 }
1546
1547 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1548 async fn batch_writer_caps_at_64() {
1549 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1550 let ih = make_hash(51);
1551 let lengths = Lengths::new(1600, 1600, 16);
1553 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1554 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1555
1556 for i in 0u32..100 {
1559 let data = Bytes::from(vec![i as u8; 16]);
1560 disk.write_block_deferred(0, i * 16, data);
1561 }
1562
1563 disk.flush_piece_writes(0).await;
1565
1566 for i in 0u32..100 {
1568 let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
1569 assert_eq!(
1570 &chunk[..],
1571 vec![i as u8; 16].as_slice(),
1572 "block {i} mismatch after overflow to next batch"
1573 );
1574 }
1575
1576 mgr.shutdown().await;
1577 }
1578
1579 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1582 async fn verify_from_disk_after_deferred_write() {
1583 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1584 let ih = make_hash(40);
1585 let chunk_size = 16384u32;
1586 let piece_size = u64::from(chunk_size) * 2;
1587 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1588 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1589 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1590
1591 let chunk0 = vec![0xAAu8; chunk_size as usize];
1593 let chunk1 = vec![0xBBu8; chunk_size as usize];
1594 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1595 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1596 disk.flush_piece_writes(0).await;
1597
1598 let mut full_piece = Vec::with_capacity(piece_size as usize);
1600 full_piece.extend_from_slice(&chunk0);
1601 full_piece.extend_from_slice(&chunk1);
1602 let expected_hash = irontide_core::sha1(&full_piece);
1603
1604 let (result_tx, mut result_rx) = mpsc::channel(4);
1606 disk.enqueue_verify(0, expected_hash, 0, &result_tx);
1607 let result = result_rx
1608 .recv()
1609 .await
1610 .expect("should receive verify result");
1611 assert_eq!(result.piece, 0);
1612 assert!(result.passed, "disk-based SHA-1 verify should pass");
1613
1614 disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1616 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1617 disk.flush_piece_writes(0).await;
1618 disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
1619 let result = result_rx
1620 .recv()
1621 .await
1622 .expect("should receive verify result");
1623 assert!(!result.passed, "wrong hash should fail disk-based verify");
1624
1625 mgr.shutdown().await;
1626 }
1627
1628 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1629 async fn verify_v2_from_disk_after_deferred_write() {
1630 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1631 let ih = make_hash(41);
1632 let chunk_size = 16384u32;
1633 let piece_size = u64::from(chunk_size) * 2;
1634 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1635 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1636 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1637
1638 let chunk0 = vec![0xCCu8; chunk_size as usize];
1640 let chunk1 = vec![0xDDu8; chunk_size as usize];
1641 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1642 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1643 disk.flush_piece_writes(0).await;
1644
1645 let mut full_piece = Vec::with_capacity(piece_size as usize);
1647 full_piece.extend_from_slice(&chunk0);
1648 full_piece.extend_from_slice(&chunk1);
1649 let expected_hash = irontide_core::sha256(&full_piece);
1650
1651 let (result_tx, mut result_rx) = mpsc::channel(4);
1653 disk.enqueue_verify_v2(0, expected_hash, &result_tx);
1654 let result = result_rx
1655 .recv()
1656 .await
1657 .expect("should receive v2 verify result");
1658 assert_eq!(result.piece, 0);
1659 assert!(result.passed, "disk-based SHA-256 verify should pass");
1660
1661 disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1663 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1664 disk.flush_piece_writes(0).await;
1665 disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
1666 let result = result_rx
1667 .recv()
1668 .await
1669 .expect("should receive v2 verify result");
1670 assert!(
1671 !result.passed,
1672 "wrong hash should fail disk-based v2 verify"
1673 );
1674
1675 mgr.shutdown().await;
1676 }
1677
1678 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1679 async fn verify_with_hash_pool_from_disk() {
1680 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1681 let ih = make_hash(42);
1682 let chunk_size = 16384u32;
1683 let piece_size = u64::from(chunk_size) * 2;
1684 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1685 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1686 let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1687
1688 let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
1690 disk.set_hash_result_tx(hash_result_tx);
1691 let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
1692 disk.set_hash_pool(hash_pool);
1693
1694 let chunk0 = vec![0xEEu8; chunk_size as usize];
1696 let chunk1 = vec![0xFFu8; chunk_size as usize];
1697 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1698 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1699 disk.flush_piece_writes(0).await;
1700
1701 let mut full_piece = Vec::with_capacity(piece_size as usize);
1703 full_piece.extend_from_slice(&chunk0);
1704 full_piece.extend_from_slice(&chunk1);
1705 let expected_hash = irontide_core::sha1(&full_piece);
1706
1707 let (verify_result_tx, _) = mpsc::channel(4); disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
1710 let result = hash_result_rx
1711 .recv()
1712 .await
1713 .expect("should receive hash pool result");
1714 assert!(result.passed, "hash pool disk-based verify should pass");
1715 assert_eq!(result.piece, 0);
1716 assert_eq!(result.generation, 42);
1717
1718 mgr.shutdown().await;
1719 }
1720}