1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_precision_loss,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 reason = "M175: disk job sizes bounded by piece_length (u32 by construction in Lengths::new)"
7)]
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use parking_lot::Mutex;
13
14use bitflags::bitflags;
15use bytes::Bytes;
16use irontide_core::{Id20, Id32};
17use irontide_storage::TorrentStorage;
18use tokio::sync::{mpsc, oneshot};
19use tracing::warn;
20
21bitflags! {
22 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
24 pub struct DiskJobFlags: u8 {
25 const FORCE_COPY = 0x01;
27 const SEQUENTIAL = 0x02;
29 const VOLATILE_READ = 0x04;
31 const FLUSH_PIECE = 0x08;
33 }
34}
35
36#[derive(Debug)]
38pub struct DiskWriteError {
39 pub piece: u32,
41 pub begin: u32,
43 pub error: irontide_storage::Error,
45}
46
47#[derive(Debug)]
49pub struct VerifyResult {
50 pub piece: u32,
52 pub passed: bool,
54}
55
56pub(crate) struct WriteJob {
58 piece: u32,
59 begin: u32,
60 data: Bytes,
61}
62
63pub(crate) struct DiskWriteState {
70 tx: mpsc::Sender<WriteJob>,
71 pending: Mutex<HashMap<u32, u32>>,
73 notify: tokio::sync::Notify,
75 lock_timing: crate::timed_lock::LockTimingSettings,
77}
78
79pub(crate) enum DiskJob {
80 Register {
81 info_hash: Id20,
82 storage: Arc<dyn TorrentStorage>,
83 reply: oneshot::Sender<()>,
84 },
85 Unregister {
86 info_hash: Id20,
87 },
88
89 Write {
90 info_hash: Id20,
91 piece: u32,
92 begin: u32,
93 data: Bytes,
94 flags: DiskJobFlags,
95 reply: oneshot::Sender<irontide_storage::Result<()>>,
96 },
97 Read {
98 info_hash: Id20,
99 piece: u32,
100 begin: u32,
101 length: u32,
102 flags: DiskJobFlags,
103 reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
104 },
105 Hash {
106 info_hash: Id20,
107 piece: u32,
108 expected: Id20,
109 #[allow(dead_code)]
110 flags: DiskJobFlags,
111 reply: oneshot::Sender<irontide_storage::Result<bool>>,
112 },
113 HashV2 {
114 info_hash: Id20,
115 piece: u32,
116 expected: Id32,
117 #[allow(dead_code)]
118 flags: DiskJobFlags,
119 reply: oneshot::Sender<irontide_storage::Result<bool>>,
120 },
121 BlockHash {
122 info_hash: Id20,
123 piece: u32,
124 begin: u32,
125 length: u32,
126 #[allow(dead_code)]
127 flags: DiskJobFlags,
128 reply: oneshot::Sender<irontide_storage::Result<Id32>>,
129 },
130
131 ClearPiece {
132 info_hash: Id20,
133 piece: u32,
134 },
135 FlushWriteBuffer {
136 info_hash: Id20,
137 piece: u32,
138 reply: oneshot::Sender<irontide_storage::Result<()>>,
139 },
140
141 CachedPieces {
142 info_hash: Id20,
143 reply: oneshot::Sender<Vec<u32>>,
144 },
145
146 FlushAll {
148 reply: oneshot::Sender<irontide_storage::Result<()>>,
149 },
150
151 Shutdown {
152 reply: oneshot::Sender<()>,
153 },
154}
155
156#[derive(Debug, Clone)]
158pub struct DiskConfig {
159 pub io_threads: usize,
161 pub cache_size: usize,
164 pub write_cache_ratio: f32,
167 pub channel_capacity: usize,
169 pub buffer_pool_capacity: usize,
171 pub enable_mlock: bool,
173 pub lock_warn_threshold_ms: u64,
175 pub filesystem_direct_io: bool,
177}
178
179impl Default for DiskConfig {
180 fn default() -> Self {
181 Self {
182 io_threads: 4,
183 cache_size: 16 * 1024 * 1024,
184 write_cache_ratio: 0.5,
185 channel_capacity: 512,
186 buffer_pool_capacity: 64 * 1024 * 1024,
187 enable_mlock: cfg!(unix),
188 lock_warn_threshold_ms: 50,
189 filesystem_direct_io: false,
190 }
191 }
192}
193
194#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
196pub struct DiskStats {
197 pub read_bytes: u64,
199 pub write_bytes: u64,
201 pub cache_hits: u64,
203 pub cache_misses: u64,
205 pub write_buffer_bytes: usize,
207 pub queued_jobs: usize,
209 #[serde(default)]
211 pub read_cache_bytes: usize,
212 #[serde(default)]
214 pub pool_entries: usize,
215 #[serde(default)]
217 pub prefetch_count: u64,
218 #[serde(default)]
220 pub eviction_count: u64,
221 #[serde(default)]
223 pub skeleton_count: u64,
224}
225
226impl From<crate::disk_backend::DiskIoStats> for DiskStats {
227 fn from(s: crate::disk_backend::DiskIoStats) -> Self {
228 Self {
229 read_bytes: s.read_bytes,
230 write_bytes: s.write_bytes,
231 cache_hits: s.cache_hits,
232 cache_misses: s.cache_misses,
233 write_buffer_bytes: s.write_buffer_bytes,
234 queued_jobs: 0,
235 read_cache_bytes: s.read_cache_bytes,
236 pool_entries: s.pool_entries,
237 prefetch_count: s.prefetch_count,
238 eviction_count: s.eviction_count,
239 skeleton_count: s.skeleton_count,
240 }
241 }
242}
243
244#[derive(Clone)]
250pub struct DiskManagerHandle {
251 tx: mpsc::Sender<DiskJob>,
252 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
254 spawner: crate::blocking_spawner::BlockingSpawner,
256}
257
258impl DiskManagerHandle {
259 #[must_use]
262 pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
263 let backend = crate::disk_backend::create_backend_from_config(&config);
264 let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
265 Self::new_with_backend(config, backend, spawner)
266 }
267
268 pub fn new_with_backend(
271 config: DiskConfig,
272 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
273 spawner: crate::blocking_spawner::BlockingSpawner,
274 ) -> (Self, tokio::task::JoinHandle<()>) {
275 let (tx, rx) = mpsc::channel(config.channel_capacity);
276 let backend_for_actor = Arc::clone(&backend);
277 let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
278 let join = tokio::spawn(actor.run());
279 (
280 Self {
281 tx,
282 backend,
283 spawner,
284 },
285 join,
286 )
287 }
288
289 pub async fn register_torrent(
292 &self,
293 info_hash: Id20,
294 storage: Arc<dyn TorrentStorage>,
295 ) -> DiskHandle {
296 let storage_for_handle = Arc::clone(&storage);
298
299 let (reply_tx, reply_rx) = oneshot::channel();
300 let _ = self
301 .tx
302 .send(DiskJob::Register {
303 info_hash,
304 storage,
305 reply: reply_tx,
306 })
307 .await;
308 let _ = reply_rx.await;
309
310 let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
312 let write_state = Arc::new(DiskWriteState {
313 tx: write_tx,
314 pending: Mutex::new(HashMap::new()),
315 notify: tokio::sync::Notify::new(),
316 lock_timing: crate::timed_lock::LockTimingSettings::default(),
317 });
318
319 let writer_storage = Arc::clone(&storage_for_handle);
321 let writer_state = Arc::clone(&write_state);
322 let writer_spawner = self.spawner.clone();
323 tokio::spawn(async move {
324 while let Some(first) = write_rx.recv().await {
325 let mut batch = vec![first];
328 while batch.len() < 64 {
329 match write_rx.try_recv() {
330 Ok(job) => batch.push(job),
331 Err(_) => break,
332 }
333 }
334
335 let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
337
338 let ws = Arc::clone(&writer_storage);
339 let spawner = writer_spawner.clone();
340 spawner
341 .block_in_place(move || {
342 for WriteJob { piece, begin, data } in &batch {
343 if let Err(e) = ws.write_chunk(*piece, *begin, data) {
344 tracing::warn!(piece, begin, %e, "deferred write failed");
345 }
346 }
347 })
348 .await;
349
350 {
352 let mut pending = crate::timed_lock::TimedGuard::new(
353 writer_state.pending.lock(),
354 &writer_state.lock_timing,
355 "disk_pending",
356 );
357 for piece in &pieces {
358 if let Some(count) = pending.get_mut(piece) {
359 *count = count.saturating_sub(1);
360 if *count == 0 {
361 pending.remove(piece);
362 }
363 }
364 }
365 }
366 writer_state.notify.notify_waiters();
367 }
368 });
369
370 DiskHandle {
371 tx: self.tx.clone(),
372 info_hash,
373 hash_pool: None,
374 hash_result_tx: None,
375 storage: Some(storage_for_handle),
376 backend: Some(Arc::clone(&self.backend)),
377 write_state: Some(write_state),
378 spawner: Some(self.spawner.clone()),
379 }
380 }
381
382 pub async fn unregister_torrent(&self, info_hash: Id20) {
384 let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
385 }
386
387 pub async fn shutdown(&self) {
389 let (tx, rx) = oneshot::channel();
390 let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
391 let _ = rx.await;
392 }
393}
394
395#[derive(Clone)]
401pub struct DiskHandle {
402 tx: mpsc::Sender<DiskJob>,
403 info_hash: Id20,
404 hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
406 hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
408 storage: Option<Arc<dyn TorrentStorage>>,
410 backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
412 write_state: Option<Arc<DiskWriteState>>,
414 spawner: Option<crate::blocking_spawner::BlockingSpawner>,
416}
417
418impl std::fmt::Debug for DiskHandle {
419 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 f.debug_struct("DiskHandle")
421 .field("info_hash", &self.info_hash)
422 .finish_non_exhaustive()
423 }
424}
425
426impl DiskHandle {
427 #[cfg_attr(not(test), allow(dead_code))]
429 pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
430 Self {
431 tx,
432 info_hash,
433 hash_pool: None,
434 hash_result_tx: None,
435 storage: None,
436 backend: None,
437 write_state: None,
438 spawner: None,
439 }
440 }
441
442 pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
444 self.hash_pool = Some(pool);
445 }
446
447 pub fn set_hash_result_tx(
449 &mut self,
450 tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
451 ) {
452 self.hash_result_tx = Some(tx);
453 }
454
455 pub async fn write_chunk(
461 &self,
462 piece: u32,
463 begin: u32,
464 data: Bytes,
465 flags: DiskJobFlags,
466 ) -> irontide_storage::Result<()> {
467 let (tx, rx) = oneshot::channel();
468 let _ = self
469 .tx
470 .send(DiskJob::Write {
471 info_hash: self.info_hash,
472 piece,
473 begin,
474 data,
475 flags,
476 reply: tx,
477 })
478 .await;
479 rx.await.unwrap_or_else(|_| {
480 Err(irontide_storage::Error::Io(std::io::Error::new(
481 std::io::ErrorKind::BrokenPipe,
482 "disk actor gone",
483 )))
484 })
485 }
486
487 pub async fn read_chunk(
493 &self,
494 piece: u32,
495 begin: u32,
496 length: u32,
497 flags: DiskJobFlags,
498 ) -> irontide_storage::Result<Bytes> {
499 let (tx, rx) = oneshot::channel();
500 let _ = self
501 .tx
502 .send(DiskJob::Read {
503 info_hash: self.info_hash,
504 piece,
505 begin,
506 length,
507 flags,
508 reply: tx,
509 })
510 .await;
511 rx.await.unwrap_or_else(|_| {
512 Err(irontide_storage::Error::Io(std::io::Error::new(
513 std::io::ErrorKind::BrokenPipe,
514 "disk actor gone",
515 )))
516 })
517 }
518
519 pub async fn verify_piece(
525 &self,
526 piece: u32,
527 expected: Id20,
528 flags: DiskJobFlags,
529 ) -> irontide_storage::Result<bool> {
530 let (tx, rx) = oneshot::channel();
531 let _ = self
532 .tx
533 .send(DiskJob::Hash {
534 info_hash: self.info_hash,
535 piece,
536 expected,
537 flags,
538 reply: tx,
539 })
540 .await;
541 rx.await.unwrap_or_else(|_| {
542 Err(irontide_storage::Error::Io(std::io::Error::new(
543 std::io::ErrorKind::BrokenPipe,
544 "disk actor gone",
545 )))
546 })
547 }
548
549 pub async fn verify_piece_v2(
555 &self,
556 piece: u32,
557 expected: Id32,
558 flags: DiskJobFlags,
559 ) -> irontide_storage::Result<bool> {
560 let (tx, rx) = oneshot::channel();
561 let _ = self
562 .tx
563 .send(DiskJob::HashV2 {
564 info_hash: self.info_hash,
565 piece,
566 expected,
567 flags,
568 reply: tx,
569 })
570 .await;
571 rx.await.unwrap_or_else(|_| {
572 Err(irontide_storage::Error::Io(std::io::Error::new(
573 std::io::ErrorKind::BrokenPipe,
574 "disk actor gone",
575 )))
576 })
577 }
578
579 pub async fn hash_block(
585 &self,
586 piece: u32,
587 begin: u32,
588 length: u32,
589 flags: DiskJobFlags,
590 ) -> irontide_storage::Result<Id32> {
591 let (tx, rx) = oneshot::channel();
592 let _ = self
593 .tx
594 .send(DiskJob::BlockHash {
595 info_hash: self.info_hash,
596 piece,
597 begin,
598 length,
599 flags,
600 reply: tx,
601 })
602 .await;
603 rx.await.unwrap_or_else(|_| {
604 Err(irontide_storage::Error::Io(std::io::Error::new(
605 std::io::ErrorKind::BrokenPipe,
606 "disk actor gone",
607 )))
608 })
609 }
610
611 pub async fn clear_piece(&self, piece: u32) {
613 let _ = self
614 .tx
615 .send(DiskJob::ClearPiece {
616 info_hash: self.info_hash,
617 piece,
618 })
619 .await;
620 }
621
622 pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
628 let (tx, rx) = oneshot::channel();
629 let _ = self
630 .tx
631 .send(DiskJob::FlushWriteBuffer {
632 info_hash: self.info_hash,
633 piece,
634 reply: tx,
635 })
636 .await;
637 rx.await.unwrap_or_else(|_| {
638 Err(irontide_storage::Error::Io(std::io::Error::new(
639 std::io::ErrorKind::BrokenPipe,
640 "disk actor gone",
641 )))
642 })
643 }
644
645 pub async fn cached_pieces(&self) -> Vec<u32> {
647 let (tx, rx) = oneshot::channel();
648 let _ = self
649 .tx
650 .send(DiskJob::CachedPieces {
651 info_hash: self.info_hash,
652 reply: tx,
653 })
654 .await;
655 rx.await.unwrap_or_default()
656 }
657
658 pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
664 let (tx, rx) = oneshot::channel();
665 let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
666 rx.await.unwrap_or_else(|_| {
667 Err(irontide_storage::Error::Io(std::io::Error::new(
668 std::io::ErrorKind::BrokenPipe,
669 "disk actor gone",
670 )))
671 })
672 }
673
674 pub fn enqueue_verify(
682 &self,
683 piece: u32,
684 expected: Id20,
685 generation: u64,
686 result_tx: &mpsc::Sender<VerifyResult>,
687 ) {
688 if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
690 if let Some(backend) = &self.backend {
691 let pool = pool.clone();
692 let hash_tx = hash_tx.clone();
693 let backend = Arc::clone(backend);
694 let info_hash = self.info_hash;
695 let job = crate::hash_pool::HashJob::Streaming {
696 piece,
697 expected,
698 generation,
699 info_hash,
700 backend,
701 result_tx: hash_tx,
702 };
703 tokio::spawn(async move {
704 if pool.submit(job).await.is_err() {
705 tracing::warn!(piece, "hash pool shut down, treating as failed");
706 }
707 });
708 return;
709 }
710
711 let hash_tx = hash_tx.clone();
713 tokio::spawn(async move {
714 tracing::warn!(piece, "verify: no backend (hash pool path)");
715 let _ = hash_tx
716 .send(crate::hash_pool::HashResult {
717 piece,
718 passed: false,
719 generation,
720 })
721 .await;
722 });
723 return;
724 }
725
726 if let Some(backend) = &self.backend {
728 let backend = Arc::clone(backend);
729 let info_hash = self.info_hash;
730 let result_tx = result_tx.clone();
731 let spawner = self.spawner.clone().unwrap();
732 tokio::spawn(async move {
733 let passed = spawner
734 .block_in_place(move || {
735 backend
736 .hash_piece(info_hash, piece, &expected)
737 .unwrap_or_else(|e| {
738 warn!(piece, %e, "verify: hash_piece failed");
739 false
740 })
741 })
742 .await;
743 let _ = result_tx.send(VerifyResult { piece, passed }).await;
744 });
745 return;
746 }
747
748 let result_tx = result_tx.clone();
750 tokio::spawn(async move {
751 warn!(piece, "verify: no data source, treating as failed");
752 let _ = result_tx
753 .send(VerifyResult {
754 piece,
755 passed: false,
756 })
757 .await;
758 });
759 }
760
761 pub fn enqueue_verify_v2(
765 &self,
766 piece: u32,
767 expected: Id32,
768 result_tx: &mpsc::Sender<VerifyResult>,
769 ) {
770 if let Some(backend) = &self.backend {
771 let backend = Arc::clone(backend);
772 let info_hash = self.info_hash;
773 let result_tx = result_tx.clone();
774 let spawner = self.spawner.clone().unwrap();
775 tokio::spawn(async move {
776 let passed = spawner
777 .block_in_place(move || match backend.read_piece(info_hash, piece) {
778 Ok(data) => {
779 let actual = irontide_core::sha256(&data);
780 actual == expected
781 }
782 Err(e) => {
783 warn!(piece, %e, "verify v2: read_piece failed");
784 false
785 }
786 })
787 .await;
788 let _ = result_tx.send(VerifyResult { piece, passed }).await;
789 });
790 return;
791 }
792
793 let result_tx = result_tx.clone();
795 tokio::spawn(async move {
796 warn!(piece, "verify v2: no data source, treating as failed");
797 let _ = result_tx
798 .send(VerifyResult {
799 piece,
800 passed: false,
801 })
802 .await;
803 });
804 }
805
806 #[allow(
814 clippy::needless_pass_by_value,
815 reason = "Bytes is refcounted, pass-by-value is the bytes-crate idiom"
816 )]
817 pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
818 let (Some(write_state), Some(storage)) = (&self.write_state, &self.storage) else {
819 return;
820 };
821
822 {
824 let mut pending = crate::timed_lock::TimedGuard::new(
825 write_state.pending.lock(),
826 &write_state.lock_timing,
827 "disk_pending",
828 );
829 *pending.entry(piece).or_insert(0) += 1;
830 }
831
832 match write_state.tx.try_send(WriteJob {
833 piece,
834 begin,
835 data: data.clone(),
836 }) {
837 Ok(()) => {}
838 Err(mpsc::error::TrySendError::Full(_)) => {
839 let storage = Arc::clone(storage);
841 if let Some(ref spawner) = self.spawner {
842 spawner.block_in_place_sync(|| {
843 if let Err(e) = storage.write_chunk(piece, begin, &data) {
844 tracing::warn!(piece, begin, %e, "deferred write fallback failed");
845 }
846 });
847 } else {
848 if let Err(e) = storage.write_chunk(piece, begin, &data) {
850 tracing::warn!(piece, begin, %e, "deferred write fallback failed");
851 }
852 }
853 let mut pending = crate::timed_lock::TimedGuard::new(
855 write_state.pending.lock(),
856 &write_state.lock_timing,
857 "disk_pending",
858 );
859 if let Some(count) = pending.get_mut(&piece) {
860 *count = count.saturating_sub(1);
861 if *count == 0 {
862 pending.remove(&piece);
863 drop(pending);
864 write_state.notify.notify_waiters();
865 }
866 }
867 }
868 Err(mpsc::error::TrySendError::Closed(_)) => {
869 let mut pending = crate::timed_lock::TimedGuard::new(
871 write_state.pending.lock(),
872 &write_state.lock_timing,
873 "disk_pending",
874 );
875 if let Some(count) = pending.get_mut(&piece) {
876 *count = count.saturating_sub(1);
877 if *count == 0 {
878 pending.remove(&piece);
879 drop(pending);
880 write_state.notify.notify_waiters();
881 }
882 }
883 }
884 }
885 }
886
887 pub(crate) fn write_block_direct(
897 &self,
898 piece: u32,
899 begin: u32,
900 s0: &[u8],
901 s1: &[u8],
902 ) -> crate::Result<()> {
903 let Some(backend) = &self.backend else {
904 return Ok(());
905 };
906 backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
907 }
908
909 pub(crate) async fn flush_piece_writes(&self, piece: u32) {
914 let Some(write_state) = &self.write_state else {
915 return;
916 };
917
918 loop {
919 {
920 let pending = crate::timed_lock::TimedGuard::new(
921 write_state.pending.lock(),
922 &write_state.lock_timing,
923 "disk_pending",
924 );
925 if !pending.contains_key(&piece) {
926 return;
927 }
928 }
929 write_state.notify.notified().await;
930 }
931 }
932
933 #[allow(dead_code)]
935 pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
936 self.storage.clone()
937 }
938}
939
940struct DiskActor {
945 rx: mpsc::Receiver<DiskJob>,
946 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
947 spawner: crate::blocking_spawner::BlockingSpawner,
948 #[allow(dead_code)]
949 config: DiskConfig,
950}
951
952impl DiskActor {
953 fn new(
954 rx: mpsc::Receiver<DiskJob>,
955 config: DiskConfig,
956 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
957 spawner: crate::blocking_spawner::BlockingSpawner,
958 ) -> Self {
959 Self {
960 rx,
961 backend,
962 spawner,
963 config,
964 }
965 }
966
967 async fn run(mut self) {
968 loop {
969 let Some(first) = self.rx.recv().await else {
971 break;
972 };
973
974 let mut batch = vec![first];
976 while let Ok(job) = self.rx.try_recv() {
977 batch.push(job);
978 }
979
980 for job in batch {
981 if let DiskJob::Shutdown { reply } = job {
982 let backend = Arc::clone(&self.backend);
984 let spawner = self.spawner.clone();
985 let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
986 if let Err(e) = flush_result {
987 warn!("flush_all on shutdown failed: {e}");
988 }
989 let _ = reply.send(());
990 return;
991 }
992 self.dispatch_job(job);
993 }
994 }
995 }
996
997 fn dispatch_job(&self, job: DiskJob) {
1001 match job {
1002 DiskJob::Register {
1004 info_hash,
1005 storage,
1006 reply,
1007 } => {
1008 self.backend.register(info_hash, storage);
1009 let _ = reply.send(());
1010 }
1011 DiskJob::Unregister { info_hash } => {
1012 self.backend.unregister(info_hash);
1013 }
1014 DiskJob::ClearPiece { info_hash, piece } => {
1015 self.backend.clear_piece(info_hash, piece);
1016 }
1017 DiskJob::CachedPieces { info_hash, reply } => {
1018 let pieces = self.backend.cached_pieces(info_hash);
1019 let _ = reply.send(pieces);
1020 }
1021
1022 DiskJob::Write {
1024 info_hash,
1025 piece,
1026 begin,
1027 data,
1028 flags,
1029 reply,
1030 } => {
1031 let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
1032 let backend = Arc::clone(&self.backend);
1033 let spawner = self.spawner.clone();
1034 tokio::spawn(async move {
1035 let result = spawner
1036 .block_in_place(move || {
1037 backend.write_chunk(info_hash, piece, begin, data, flush)
1038 })
1039 .await;
1040 let _ = reply.send(to_storage_result(result));
1041 });
1042 }
1043
1044 DiskJob::Read {
1046 info_hash,
1047 piece,
1048 begin,
1049 length,
1050 flags,
1051 reply,
1052 } => {
1053 let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
1054 let backend = Arc::clone(&self.backend);
1055 let spawner = self.spawner.clone();
1056 tokio::spawn(async move {
1057 let result = spawner
1058 .block_in_place(move || {
1059 backend.read_chunk(info_hash, piece, begin, length, volatile)
1060 })
1061 .await;
1062 let _ = reply.send(to_storage_result(result));
1063 });
1064 }
1065
1066 DiskJob::Hash {
1068 info_hash,
1069 piece,
1070 expected,
1071 reply,
1072 ..
1073 } => {
1074 let backend = Arc::clone(&self.backend);
1075 let spawner = self.spawner.clone();
1076 tokio::spawn(async move {
1077 let result = spawner
1078 .block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
1079 .await;
1080 let _ = reply.send(to_storage_result(result));
1081 });
1082 }
1083
1084 DiskJob::HashV2 {
1086 info_hash,
1087 piece,
1088 expected,
1089 reply,
1090 ..
1091 } => {
1092 let backend = Arc::clone(&self.backend);
1093 let spawner = self.spawner.clone();
1094 tokio::spawn(async move {
1095 let result = spawner
1096 .block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
1097 .await;
1098 let _ = reply.send(to_storage_result(result));
1099 });
1100 }
1101
1102 DiskJob::BlockHash {
1104 info_hash,
1105 piece,
1106 begin,
1107 length,
1108 reply,
1109 ..
1110 } => {
1111 let backend = Arc::clone(&self.backend);
1112 let spawner = self.spawner.clone();
1113 tokio::spawn(async move {
1114 let result = spawner
1115 .block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
1116 .await;
1117 let _ = reply.send(to_storage_result(result));
1118 });
1119 }
1120
1121 DiskJob::FlushWriteBuffer {
1123 info_hash,
1124 piece,
1125 reply,
1126 } => {
1127 let backend = Arc::clone(&self.backend);
1128 let spawner = self.spawner.clone();
1129 tokio::spawn(async move {
1130 let result = spawner
1131 .block_in_place(move || backend.flush_piece(info_hash, piece))
1132 .await;
1133 let _ = reply.send(to_storage_result(result));
1134 });
1135 }
1136
1137 DiskJob::FlushAll { reply } => {
1139 let backend = Arc::clone(&self.backend);
1140 let spawner = self.spawner.clone();
1141 tokio::spawn(async move {
1142 let result = spawner.block_in_place(move || backend.flush_all()).await;
1143 let _ = reply.send(to_storage_result(result));
1144 });
1145 }
1146
1147 DiskJob::Shutdown { .. } => unreachable!(),
1148 }
1149 }
1150}
1151
1152fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
1154 r.map_err(|e| match e {
1155 crate::Error::Storage(se) => se,
1156 other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
1157 })
1158}
1159
1160impl From<&irontide_settings::Settings> for DiskConfig {
1163 fn from(s: &irontide_settings::Settings) -> Self {
1164 Self {
1165 io_threads: s.disk_io_threads,
1166 cache_size: s.disk_cache_size,
1167 write_cache_ratio: s.disk_write_cache_ratio,
1168 channel_capacity: s.disk_channel_capacity,
1169 buffer_pool_capacity: s.buffer_pool_capacity,
1170 enable_mlock: s.enable_mlock,
1171 lock_warn_threshold_ms: s.lock_warn_threshold_ms,
1172 filesystem_direct_io: s.filesystem_direct_io,
1173 }
1174 }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::*;
1180 use irontide_core::Lengths;
1181 use irontide_settings::Settings;
1182 use irontide_storage::MemoryStorage;
1183
1184 #[test]
1185 fn disk_config_from_settings() {
1186 let s = Settings::default();
1187 let dc = crate::disk::DiskConfig::from(&s);
1188 assert_eq!(dc.io_threads, s.disk_io_threads);
1189 assert_eq!(dc.cache_size, 16 * 1024 * 1024);
1190 assert!((dc.write_cache_ratio - 0.5).abs() < f32::EPSILON);
1191 assert_eq!(dc.channel_capacity, 512);
1192 }
1193
1194 fn test_config() -> DiskConfig {
1197 DiskConfig {
1198 io_threads: 2,
1199 cache_size: 1024 * 1024,
1200 ..DiskConfig::default()
1201 }
1202 }
1203
1204 fn make_hash(n: u8) -> Id20 {
1205 let mut b = [0u8; 20];
1206 b[0] = n;
1207 Id20(b)
1208 }
1209
1210 #[tokio::test]
1211 async fn async_write_read() {
1212 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1213 let ih = make_hash(1);
1214 let lengths = Lengths::new(100, 50, 25);
1215 let storage = Arc::new(MemoryStorage::new(lengths));
1216 let disk = mgr.register_torrent(ih, storage).await;
1217
1218 let data = Bytes::from(vec![42u8; 25]);
1219 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1220 .await
1221 .unwrap();
1222 let read = disk
1223 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1224 .await
1225 .unwrap();
1226 assert_eq!(read, data);
1227
1228 mgr.shutdown().await;
1229 }
1230
1231 #[tokio::test]
1232 async fn verify_through_handle() {
1233 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1234 let ih = make_hash(2);
1235 let lengths = Lengths::new(100, 50, 25);
1236 let storage = Arc::new(MemoryStorage::new(lengths));
1237 let disk = mgr.register_torrent(ih, storage).await;
1238
1239 let piece_data = vec![9u8; 50];
1240 disk.write_chunk(
1241 0,
1242 0,
1243 Bytes::from(piece_data.clone()),
1244 DiskJobFlags::FLUSH_PIECE,
1245 )
1246 .await
1247 .unwrap();
1248 disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
1249 .await
1250 .unwrap();
1251
1252 let expected = irontide_core::sha1(&piece_data);
1253 assert!(
1254 disk.verify_piece(0, expected, DiskJobFlags::empty())
1255 .await
1256 .unwrap()
1257 );
1258 assert!(
1259 !disk
1260 .verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
1261 .await
1262 .unwrap()
1263 );
1264
1265 mgr.shutdown().await;
1266 }
1267
1268 #[tokio::test]
1269 async fn cache_hit_avoids_io() {
1270 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1271 let ih = make_hash(3);
1272 let lengths = Lengths::new(100, 50, 25);
1273 let storage = Arc::new(MemoryStorage::new(lengths));
1274 let disk = mgr.register_torrent(ih, storage).await;
1275
1276 let data = Bytes::from(vec![7u8; 25]);
1277 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1278 .await
1279 .unwrap();
1280
1281 let r1 = disk
1283 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1284 .await
1285 .unwrap();
1286 assert_eq!(r1, data);
1287
1288 let r2 = disk
1290 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1291 .await
1292 .unwrap();
1293 assert_eq!(r2, data);
1294
1295 mgr.shutdown().await;
1296 }
1297
1298 #[tokio::test]
1299 async fn volatile_read_bypasses_cache() {
1300 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1301 let ih = make_hash(4);
1302 let lengths = Lengths::new(100, 50, 25);
1303 let storage = Arc::new(MemoryStorage::new(lengths));
1304 let disk = mgr.register_torrent(ih, storage).await;
1305
1306 let data = Bytes::from(vec![5u8; 25]);
1307 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1308 .await
1309 .unwrap();
1310
1311 let r = disk
1313 .read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
1314 .await
1315 .unwrap();
1316 assert_eq!(r, data);
1317
1318 mgr.shutdown().await;
1319 }
1320
1321 #[tokio::test]
1322 async fn clear_piece_evicts_cache() {
1323 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1324 let ih = make_hash(5);
1325 let lengths = Lengths::new(100, 50, 25);
1326 let storage = Arc::new(MemoryStorage::new(lengths));
1327 let disk = mgr.register_torrent(ih, storage).await;
1328
1329 let data = Bytes::from(vec![5u8; 25]);
1330 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1331 .await
1332 .unwrap();
1333 disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
1335 .await
1336 .unwrap();
1337 disk.clear_piece(0).await;
1339
1340 let r = disk
1342 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1343 .await
1344 .unwrap();
1345 assert_eq!(r, data);
1346
1347 mgr.shutdown().await;
1348 }
1349
1350 #[tokio::test]
1351 async fn write_buffer_flush() {
1352 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1353 let ih = make_hash(6);
1354 let lengths = Lengths::new(100, 50, 25);
1355 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1356 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1357
1358 disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
1360 .await
1361 .unwrap();
1362 disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
1363 .await
1364 .unwrap();
1365
1366 disk.flush_piece(0).await.unwrap();
1368
1369 let piece = storage.read_piece(0).unwrap();
1371 assert_eq!(&piece[..25], &[1u8; 25]);
1372 assert_eq!(&piece[25..], &[2u8; 25]);
1373
1374 mgr.shutdown().await;
1375 }
1376
1377 #[tokio::test]
1378 async fn verify_piece_v2_via_disk_handle() {
1379 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1380 let ih = make_hash(11);
1381 let data = vec![0xABu8; 16384];
1382 let expected = irontide_core::sha256(&data);
1383 let lengths = Lengths::new(16384, 16384, 16384);
1384 let storage = Arc::new(MemoryStorage::new(lengths));
1385 storage.write_chunk(0, 0, &data).unwrap();
1386
1387 let disk = mgr.register_torrent(ih, storage).await;
1388 let result = disk
1389 .verify_piece_v2(0, expected, DiskJobFlags::empty())
1390 .await;
1391 assert!(result.unwrap());
1392 mgr.shutdown().await;
1393 }
1394
1395 #[tokio::test]
1396 async fn hash_block_via_disk_handle() {
1397 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1398 let ih = make_hash(12);
1399 let data = vec![0xCDu8; 16384];
1400 let lengths = Lengths::new(16384, 16384, 16384);
1401 let storage = Arc::new(MemoryStorage::new(lengths));
1402 storage.write_chunk(0, 0, &data).unwrap();
1403
1404 let disk = mgr.register_torrent(ih, storage).await;
1405 let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
1406 assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
1407 mgr.shutdown().await;
1408 }
1409
1410 #[tokio::test]
1411 async fn concurrent_verify_multiple_pieces() {
1412 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1413 let ih = make_hash(10);
1414
1415 let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
1417 let piece_len = 50u64;
1418 let lengths = Lengths::new(data.len() as u64, piece_len, 25);
1419 let storage = Arc::new(MemoryStorage::new(lengths.clone()));
1420
1421 let num_pieces = lengths.num_pieces();
1423 for p in 0..num_pieces {
1424 let offset = lengths.piece_offset(p) as usize;
1425 let size = lengths.piece_size(p) as usize;
1426 storage
1427 .write_chunk(p, 0, &data[offset..offset + size])
1428 .unwrap();
1429 }
1430
1431 let disk = mgr.register_torrent(ih, storage).await;
1432
1433 let mut expected_hashes = Vec::new();
1435 for p in 0..num_pieces {
1436 let offset = lengths.piece_offset(p) as usize;
1437 let size = lengths.piece_size(p) as usize;
1438 expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
1439 }
1440
1441 let mut js = tokio::task::JoinSet::new();
1443 for p in 0..num_pieces {
1444 let d = disk.clone();
1445 let hash = expected_hashes[p as usize];
1446 js.spawn(async move {
1447 let valid = d
1448 .verify_piece(p, hash, DiskJobFlags::empty())
1449 .await
1450 .unwrap();
1451 (p, valid)
1452 });
1453 }
1454
1455 let mut results = Vec::new();
1456 while let Some(r) = js.join_next().await {
1457 results.push(r.unwrap());
1458 }
1459 results.sort_by_key(|&(p, _)| p);
1460
1461 assert_eq!(results.len(), num_pieces as usize);
1462 for (p, valid) in &results {
1463 assert!(valid, "piece {p} should be valid");
1464 }
1465
1466 mgr.shutdown().await;
1467 }
1468
1469 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1472 async fn write_block_deferred_writes_to_storage() {
1473 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1474 let ih = make_hash(30);
1475 let lengths = Lengths::new(100, 50, 25);
1476 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1477 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1478
1479 let block0 = Bytes::from(vec![0xAAu8; 25]);
1480 let block1 = Bytes::from(vec![0xBBu8; 25]);
1481
1482 disk.write_block_deferred(0, 0, block0.clone());
1483 disk.write_block_deferred(0, 25, block1.clone());
1484
1485 disk.flush_piece_writes(0).await;
1487
1488 let read0 = storage.read_chunk(0, 0, 25).unwrap();
1490 assert_eq!(&read0[..], &block0[..], "block 0 should match");
1491 let read1 = storage.read_chunk(0, 25, 25).unwrap();
1492 assert_eq!(&read1[..], &block1[..], "block 1 should match");
1493
1494 mgr.shutdown().await;
1495 }
1496
1497 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1498 async fn flush_piece_writes_waits_for_completion() {
1499 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1500 let ih = make_hash(31);
1501 let lengths = Lengths::new(200, 100, 25);
1502 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1503 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1504
1505 for i in 0u32..4 {
1507 let data = Bytes::from(vec![(i as u8) + 1; 25]);
1508 disk.write_block_deferred(0, i * 25, data);
1509 }
1510
1511 disk.flush_piece_writes(0).await;
1513
1514 let piece = storage.read_piece(0).unwrap();
1516 assert_eq!(&piece[0..25], &[1u8; 25]);
1517 assert_eq!(&piece[25..50], &[2u8; 25]);
1518 assert_eq!(&piece[50..75], &[3u8; 25]);
1519 assert_eq!(&piece[75..100], &[4u8; 25]);
1520
1521 mgr.shutdown().await;
1522 }
1523
1524 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1527 async fn batch_writer_drains_multiple_jobs() {
1528 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1529 let ih = make_hash(50);
1530 let lengths = Lengths::new(250, 250, 25);
1532 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1533 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1534
1535 for i in 0u32..10 {
1537 let data = Bytes::from(vec![i as u8 + 1; 25]);
1538 disk.write_block_deferred(0, i * 25, data);
1539 }
1540
1541 disk.flush_piece_writes(0).await;
1543
1544 for i in 0u32..10 {
1546 let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
1547 assert_eq!(
1548 &chunk[..],
1549 vec![i as u8 + 1; 25].as_slice(),
1550 "block {i} mismatch"
1551 );
1552 }
1553
1554 mgr.shutdown().await;
1555 }
1556
1557 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1558 async fn batch_writer_caps_at_64() {
1559 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1560 let ih = make_hash(51);
1561 let lengths = Lengths::new(1600, 1600, 16);
1563 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1564 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1565
1566 for i in 0u32..100 {
1569 let data = Bytes::from(vec![i as u8; 16]);
1570 disk.write_block_deferred(0, i * 16, data);
1571 }
1572
1573 disk.flush_piece_writes(0).await;
1575
1576 for i in 0u32..100 {
1578 let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
1579 assert_eq!(
1580 &chunk[..],
1581 vec![i as u8; 16].as_slice(),
1582 "block {i} mismatch after overflow to next batch"
1583 );
1584 }
1585
1586 mgr.shutdown().await;
1587 }
1588
1589 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1592 async fn verify_from_disk_after_deferred_write() {
1593 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1594 let ih = make_hash(40);
1595 let chunk_size = 16384u32;
1596 let piece_size = u64::from(chunk_size) * 2;
1597 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1598 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1599 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1600
1601 let chunk0 = vec![0xAAu8; chunk_size as usize];
1603 let chunk1 = vec![0xBBu8; chunk_size as usize];
1604 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1605 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1606 disk.flush_piece_writes(0).await;
1607
1608 let mut full_piece = Vec::with_capacity(piece_size as usize);
1610 full_piece.extend_from_slice(&chunk0);
1611 full_piece.extend_from_slice(&chunk1);
1612 let expected_hash = irontide_core::sha1(&full_piece);
1613
1614 let (result_tx, mut result_rx) = mpsc::channel(4);
1616 disk.enqueue_verify(0, expected_hash, 0, &result_tx);
1617 let result = result_rx
1618 .recv()
1619 .await
1620 .expect("should receive verify result");
1621 assert_eq!(result.piece, 0);
1622 assert!(result.passed, "disk-based SHA-1 verify should pass");
1623
1624 disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1626 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1627 disk.flush_piece_writes(0).await;
1628 disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
1629 let result = result_rx
1630 .recv()
1631 .await
1632 .expect("should receive verify result");
1633 assert!(!result.passed, "wrong hash should fail disk-based verify");
1634
1635 mgr.shutdown().await;
1636 }
1637
1638 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1639 async fn verify_v2_from_disk_after_deferred_write() {
1640 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1641 let ih = make_hash(41);
1642 let chunk_size = 16384u32;
1643 let piece_size = u64::from(chunk_size) * 2;
1644 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1645 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1646 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1647
1648 let chunk0 = vec![0xCCu8; chunk_size as usize];
1650 let chunk1 = vec![0xDDu8; chunk_size as usize];
1651 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1652 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1653 disk.flush_piece_writes(0).await;
1654
1655 let mut full_piece = Vec::with_capacity(piece_size as usize);
1657 full_piece.extend_from_slice(&chunk0);
1658 full_piece.extend_from_slice(&chunk1);
1659 let expected_hash = irontide_core::sha256(&full_piece);
1660
1661 let (result_tx, mut result_rx) = mpsc::channel(4);
1663 disk.enqueue_verify_v2(0, expected_hash, &result_tx);
1664 let result = result_rx
1665 .recv()
1666 .await
1667 .expect("should receive v2 verify result");
1668 assert_eq!(result.piece, 0);
1669 assert!(result.passed, "disk-based SHA-256 verify should pass");
1670
1671 disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1673 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1674 disk.flush_piece_writes(0).await;
1675 disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
1676 let result = result_rx
1677 .recv()
1678 .await
1679 .expect("should receive v2 verify result");
1680 assert!(
1681 !result.passed,
1682 "wrong hash should fail disk-based v2 verify"
1683 );
1684
1685 mgr.shutdown().await;
1686 }
1687
1688 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1689 async fn verify_with_hash_pool_from_disk() {
1690 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1691 let ih = make_hash(42);
1692 let chunk_size = 16384u32;
1693 let piece_size = u64::from(chunk_size) * 2;
1694 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1695 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1696 let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1697
1698 let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
1700 disk.set_hash_result_tx(hash_result_tx);
1701 let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
1702 disk.set_hash_pool(hash_pool);
1703
1704 let chunk0 = vec![0xEEu8; chunk_size as usize];
1706 let chunk1 = vec![0xFFu8; chunk_size as usize];
1707 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1708 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1709 disk.flush_piece_writes(0).await;
1710
1711 let mut full_piece = Vec::with_capacity(piece_size as usize);
1713 full_piece.extend_from_slice(&chunk0);
1714 full_piece.extend_from_slice(&chunk1);
1715 let expected_hash = irontide_core::sha1(&full_piece);
1716
1717 let (verify_result_tx, _) = mpsc::channel(4); disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
1720 let result = hash_result_rx
1721 .recv()
1722 .await
1723 .expect("should receive hash pool result");
1724 assert!(result.passed, "hash pool disk-based verify should pass");
1725 assert_eq!(result.piece, 0);
1726 assert_eq!(result.generation, 42);
1727
1728 mgr.shutdown().await;
1729 }
1730}