1use std::collections::HashMap;
2use std::sync::Arc;
3
4use parking_lot::Mutex;
5
6use bitflags::bitflags;
7use bytes::Bytes;
8use irontide_core::{Id20, Id32};
9use irontide_storage::TorrentStorage;
10use tokio::sync::{mpsc, oneshot};
11use tracing::warn;
12
13bitflags! {
14 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
16 pub struct DiskJobFlags: u8 {
17 const FORCE_COPY = 0x01;
19 const SEQUENTIAL = 0x02;
21 const VOLATILE_READ = 0x04;
23 const FLUSH_PIECE = 0x08;
25 }
26}
27
28#[derive(Debug)]
30pub struct DiskWriteError {
31 pub piece: u32,
33 pub begin: u32,
35 pub error: irontide_storage::Error,
37}
38
39#[derive(Debug)]
41pub struct VerifyResult {
42 pub piece: u32,
44 pub passed: bool,
46}
47
48pub(crate) struct WriteJob {
50 piece: u32,
51 begin: u32,
52 data: Bytes,
53}
54
55pub(crate) struct DiskWriteState {
62 tx: mpsc::Sender<WriteJob>,
63 pending: Mutex<HashMap<u32, u32>>,
65 notify: tokio::sync::Notify,
67 lock_timing: crate::timed_lock::LockTimingSettings,
69}
70
71pub(crate) enum DiskJob {
72 Register {
73 info_hash: Id20,
74 storage: Arc<dyn TorrentStorage>,
75 reply: oneshot::Sender<()>,
76 },
77 Unregister {
78 info_hash: Id20,
79 },
80
81 Write {
82 info_hash: Id20,
83 piece: u32,
84 begin: u32,
85 data: Bytes,
86 flags: DiskJobFlags,
87 reply: oneshot::Sender<irontide_storage::Result<()>>,
88 },
89 Read {
90 info_hash: Id20,
91 piece: u32,
92 begin: u32,
93 length: u32,
94 flags: DiskJobFlags,
95 reply: oneshot::Sender<irontide_storage::Result<Bytes>>,
96 },
97 Hash {
98 info_hash: Id20,
99 piece: u32,
100 expected: Id20,
101 #[allow(dead_code)]
102 flags: DiskJobFlags,
103 reply: oneshot::Sender<irontide_storage::Result<bool>>,
104 },
105 HashV2 {
106 info_hash: Id20,
107 piece: u32,
108 expected: Id32,
109 #[allow(dead_code)]
110 flags: DiskJobFlags,
111 reply: oneshot::Sender<irontide_storage::Result<bool>>,
112 },
113 BlockHash {
114 info_hash: Id20,
115 piece: u32,
116 begin: u32,
117 length: u32,
118 #[allow(dead_code)]
119 flags: DiskJobFlags,
120 reply: oneshot::Sender<irontide_storage::Result<Id32>>,
121 },
122
123 ClearPiece {
124 info_hash: Id20,
125 piece: u32,
126 },
127 FlushWriteBuffer {
128 info_hash: Id20,
129 piece: u32,
130 reply: oneshot::Sender<irontide_storage::Result<()>>,
131 },
132
133 CachedPieces {
134 info_hash: Id20,
135 reply: oneshot::Sender<Vec<u32>>,
136 },
137
138 FlushAll {
140 reply: oneshot::Sender<irontide_storage::Result<()>>,
141 },
142
143 Shutdown {
144 reply: oneshot::Sender<()>,
145 },
146}
147
148#[derive(Debug, Clone)]
150pub struct DiskConfig {
151 pub io_threads: usize,
153 pub storage_mode: irontide_core::StorageMode,
155 pub cache_size: usize,
158 pub write_cache_ratio: f32,
161 pub channel_capacity: usize,
163 pub buffer_pool_capacity: usize,
165 pub enable_mlock: bool,
167 pub lock_warn_threshold_ms: u64,
169 pub io_uring_sq_depth: u32,
171 pub io_uring_direct_io: bool,
173 pub filesystem_direct_io: bool,
175 pub io_uring_batch_threshold: usize,
177 pub iocp_concurrent_threads: u32,
179 pub iocp_direct_io: bool,
181}
182
183impl Default for DiskConfig {
184 fn default() -> Self {
185 DiskConfig {
186 io_threads: 4,
187 storage_mode: irontide_core::StorageMode::Auto,
188 cache_size: 16 * 1024 * 1024,
189 write_cache_ratio: 0.5,
190 channel_capacity: 512,
191 buffer_pool_capacity: 64 * 1024 * 1024,
192 enable_mlock: cfg!(unix),
193 lock_warn_threshold_ms: 50,
194 io_uring_sq_depth: 256,
195 io_uring_direct_io: false,
196 filesystem_direct_io: false,
197 io_uring_batch_threshold: 4,
198 iocp_concurrent_threads: 0,
199 iocp_direct_io: false,
200 }
201 }
202}
203
204#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
206pub struct DiskStats {
207 pub read_bytes: u64,
209 pub write_bytes: u64,
211 pub cache_hits: u64,
213 pub cache_misses: u64,
215 pub write_buffer_bytes: usize,
217 pub queued_jobs: usize,
219 #[serde(default)]
221 pub read_cache_bytes: usize,
222 #[serde(default)]
224 pub pool_entries: usize,
225 #[serde(default)]
227 pub prefetch_count: u64,
228 #[serde(default)]
230 pub eviction_count: u64,
231 #[serde(default)]
233 pub skeleton_count: u64,
234}
235
236impl From<crate::disk_backend::DiskIoStats> for DiskStats {
237 fn from(s: crate::disk_backend::DiskIoStats) -> Self {
238 DiskStats {
239 read_bytes: s.read_bytes,
240 write_bytes: s.write_bytes,
241 cache_hits: s.cache_hits,
242 cache_misses: s.cache_misses,
243 write_buffer_bytes: s.write_buffer_bytes,
244 queued_jobs: 0,
245 read_cache_bytes: s.read_cache_bytes,
246 pool_entries: s.pool_entries,
247 prefetch_count: s.prefetch_count,
248 eviction_count: s.eviction_count,
249 skeleton_count: s.skeleton_count,
250 }
251 }
252}
253
254#[derive(Clone)]
260pub struct DiskManagerHandle {
261 tx: mpsc::Sender<DiskJob>,
262 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
264 spawner: crate::blocking_spawner::BlockingSpawner,
266}
267
268impl DiskManagerHandle {
269 pub fn new(config: DiskConfig) -> (Self, tokio::task::JoinHandle<()>) {
272 let backend = crate::disk_backend::create_backend_from_config(&config);
273 let spawner = crate::blocking_spawner::BlockingSpawner::new(config.io_threads);
274 Self::new_with_backend(config, backend, spawner)
275 }
276
277 pub(crate) fn new_with_backend(
280 config: DiskConfig,
281 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
282 spawner: crate::blocking_spawner::BlockingSpawner,
283 ) -> (Self, tokio::task::JoinHandle<()>) {
284 let (tx, rx) = mpsc::channel(config.channel_capacity);
285 let backend_for_actor = Arc::clone(&backend);
286 let actor = DiskActor::new(rx, config, backend_for_actor, spawner.clone());
287 let join = tokio::spawn(actor.run());
288 (
289 DiskManagerHandle {
290 tx,
291 backend,
292 spawner,
293 },
294 join,
295 )
296 }
297
298 pub async fn register_torrent(
301 &self,
302 info_hash: Id20,
303 storage: Arc<dyn TorrentStorage>,
304 ) -> DiskHandle {
305 let storage_for_handle = Arc::clone(&storage);
307
308 let (reply_tx, reply_rx) = oneshot::channel();
309 let _ = self
310 .tx
311 .send(DiskJob::Register {
312 info_hash,
313 storage,
314 reply: reply_tx,
315 })
316 .await;
317 let _ = reply_rx.await;
318
319 let (write_tx, mut write_rx) = mpsc::channel::<WriteJob>(512);
321 let write_state = Arc::new(DiskWriteState {
322 tx: write_tx,
323 pending: Mutex::new(HashMap::new()),
324 notify: tokio::sync::Notify::new(),
325 lock_timing: crate::timed_lock::LockTimingSettings::default(),
326 });
327
328 let writer_storage = Arc::clone(&storage_for_handle);
330 let writer_state = Arc::clone(&write_state);
331 let writer_spawner = self.spawner.clone();
332 tokio::spawn(async move {
333 while let Some(first) = write_rx.recv().await {
334 let mut batch = vec![first];
337 while batch.len() < 64 {
338 match write_rx.try_recv() {
339 Ok(job) => batch.push(job),
340 Err(_) => break,
341 }
342 }
343
344 let pieces: Vec<u32> = batch.iter().map(|j| j.piece).collect();
346
347 let ws = Arc::clone(&writer_storage);
348 let spawner = writer_spawner.clone();
349 spawner
350 .block_in_place(move || {
351 for WriteJob { piece, begin, data } in &batch {
352 if let Err(e) = ws.write_chunk(*piece, *begin, data) {
353 tracing::warn!(piece, begin, %e, "deferred write failed");
354 }
355 }
356 })
357 .await;
358
359 {
361 let mut pending = crate::timed_lock::TimedGuard::new(
362 writer_state.pending.lock(),
363 &writer_state.lock_timing,
364 "disk_pending",
365 );
366 for piece in &pieces {
367 if let Some(count) = pending.get_mut(piece) {
368 *count = count.saturating_sub(1);
369 if *count == 0 {
370 pending.remove(piece);
371 }
372 }
373 }
374 }
375 writer_state.notify.notify_waiters();
376 }
377 });
378
379 DiskHandle {
380 tx: self.tx.clone(),
381 info_hash,
382 hash_pool: None,
383 hash_result_tx: None,
384 storage: Some(storage_for_handle),
385 backend: Some(Arc::clone(&self.backend)),
386 write_state: Some(write_state),
387 spawner: Some(self.spawner.clone()),
388 }
389 }
390
391 pub async fn unregister_torrent(&self, info_hash: Id20) {
393 let _ = self.tx.send(DiskJob::Unregister { info_hash }).await;
394 }
395
396 pub async fn shutdown(&self) {
398 let (tx, rx) = oneshot::channel();
399 let _ = self.tx.send(DiskJob::Shutdown { reply: tx }).await;
400 let _ = rx.await;
401 }
402}
403
404#[derive(Clone)]
410pub struct DiskHandle {
411 tx: mpsc::Sender<DiskJob>,
412 info_hash: Id20,
413 hash_pool: Option<std::sync::Arc<crate::hash_pool::HashPool>>,
415 hash_result_tx: Option<tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>>,
417 storage: Option<Arc<dyn TorrentStorage>>,
419 backend: Option<Arc<dyn crate::disk_backend::DiskIoBackend>>,
421 write_state: Option<Arc<DiskWriteState>>,
423 spawner: Option<crate::blocking_spawner::BlockingSpawner>,
425}
426
427impl std::fmt::Debug for DiskHandle {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 f.debug_struct("DiskHandle")
430 .field("info_hash", &self.info_hash)
431 .finish_non_exhaustive()
432 }
433}
434
435impl DiskHandle {
436 #[cfg_attr(not(test), allow(dead_code))]
438 pub(crate) fn new(tx: mpsc::Sender<DiskJob>, info_hash: Id20) -> Self {
439 Self {
440 tx,
441 info_hash,
442 hash_pool: None,
443 hash_result_tx: None,
444 storage: None,
445 backend: None,
446 write_state: None,
447 spawner: None,
448 }
449 }
450
451 pub fn set_hash_pool(&mut self, pool: std::sync::Arc<crate::hash_pool::HashPool>) {
453 self.hash_pool = Some(pool);
454 }
455
456 pub fn set_hash_result_tx(
458 &mut self,
459 tx: tokio::sync::mpsc::Sender<crate::hash_pool::HashResult>,
460 ) {
461 self.hash_result_tx = Some(tx);
462 }
463
464 pub async fn write_chunk(
466 &self,
467 piece: u32,
468 begin: u32,
469 data: Bytes,
470 flags: DiskJobFlags,
471 ) -> irontide_storage::Result<()> {
472 let (tx, rx) = oneshot::channel();
473 let _ = self
474 .tx
475 .send(DiskJob::Write {
476 info_hash: self.info_hash,
477 piece,
478 begin,
479 data,
480 flags,
481 reply: tx,
482 })
483 .await;
484 rx.await
485 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
486 std::io::ErrorKind::BrokenPipe,
487 "disk actor gone",
488 ))))
489 }
490
491 pub async fn read_chunk(
493 &self,
494 piece: u32,
495 begin: u32,
496 length: u32,
497 flags: DiskJobFlags,
498 ) -> irontide_storage::Result<Bytes> {
499 let (tx, rx) = oneshot::channel();
500 let _ = self
501 .tx
502 .send(DiskJob::Read {
503 info_hash: self.info_hash,
504 piece,
505 begin,
506 length,
507 flags,
508 reply: tx,
509 })
510 .await;
511 rx.await
512 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
513 std::io::ErrorKind::BrokenPipe,
514 "disk actor gone",
515 ))))
516 }
517
518 pub async fn verify_piece(
520 &self,
521 piece: u32,
522 expected: Id20,
523 flags: DiskJobFlags,
524 ) -> irontide_storage::Result<bool> {
525 let (tx, rx) = oneshot::channel();
526 let _ = self
527 .tx
528 .send(DiskJob::Hash {
529 info_hash: self.info_hash,
530 piece,
531 expected,
532 flags,
533 reply: tx,
534 })
535 .await;
536 rx.await
537 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
538 std::io::ErrorKind::BrokenPipe,
539 "disk actor gone",
540 ))))
541 }
542
543 pub async fn verify_piece_v2(
545 &self,
546 piece: u32,
547 expected: Id32,
548 flags: DiskJobFlags,
549 ) -> irontide_storage::Result<bool> {
550 let (tx, rx) = oneshot::channel();
551 let _ = self
552 .tx
553 .send(DiskJob::HashV2 {
554 info_hash: self.info_hash,
555 piece,
556 expected,
557 flags,
558 reply: tx,
559 })
560 .await;
561 rx.await
562 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
563 std::io::ErrorKind::BrokenPipe,
564 "disk actor gone",
565 ))))
566 }
567
568 pub async fn hash_block(
570 &self,
571 piece: u32,
572 begin: u32,
573 length: u32,
574 flags: DiskJobFlags,
575 ) -> irontide_storage::Result<Id32> {
576 let (tx, rx) = oneshot::channel();
577 let _ = self
578 .tx
579 .send(DiskJob::BlockHash {
580 info_hash: self.info_hash,
581 piece,
582 begin,
583 length,
584 flags,
585 reply: tx,
586 })
587 .await;
588 rx.await
589 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
590 std::io::ErrorKind::BrokenPipe,
591 "disk actor gone",
592 ))))
593 }
594
595 pub async fn clear_piece(&self, piece: u32) {
597 let _ = self
598 .tx
599 .send(DiskJob::ClearPiece {
600 info_hash: self.info_hash,
601 piece,
602 })
603 .await;
604 }
605
606 pub async fn flush_piece(&self, piece: u32) -> irontide_storage::Result<()> {
608 let (tx, rx) = oneshot::channel();
609 let _ = self
610 .tx
611 .send(DiskJob::FlushWriteBuffer {
612 info_hash: self.info_hash,
613 piece,
614 reply: tx,
615 })
616 .await;
617 rx.await
618 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
619 std::io::ErrorKind::BrokenPipe,
620 "disk actor gone",
621 ))))
622 }
623
624 pub async fn cached_pieces(&self) -> Vec<u32> {
626 let (tx, rx) = oneshot::channel();
627 let _ = self
628 .tx
629 .send(DiskJob::CachedPieces {
630 info_hash: self.info_hash,
631 reply: tx,
632 })
633 .await;
634 rx.await.unwrap_or_default()
635 }
636
637 pub async fn flush_cache(&self) -> irontide_storage::Result<()> {
639 let (tx, rx) = oneshot::channel();
640 let _ = self.tx.send(DiskJob::FlushAll { reply: tx }).await;
641 rx.await
642 .unwrap_or(Err(irontide_storage::Error::Io(std::io::Error::new(
643 std::io::ErrorKind::BrokenPipe,
644 "disk actor gone",
645 ))))
646 }
647
648 pub fn enqueue_verify(
656 &self,
657 piece: u32,
658 expected: Id20,
659 generation: u64,
660 result_tx: &mpsc::Sender<VerifyResult>,
661 ) {
662 if let (Some(pool), Some(hash_tx)) = (&self.hash_pool, &self.hash_result_tx) {
664 if let Some(backend) = &self.backend {
665 let pool = pool.clone();
666 let hash_tx = hash_tx.clone();
667 let backend = Arc::clone(backend);
668 let info_hash = self.info_hash;
669 let job = crate::hash_pool::HashJob::Streaming {
670 piece,
671 expected,
672 generation,
673 info_hash,
674 backend,
675 result_tx: hash_tx,
676 };
677 tokio::spawn(async move {
678 if pool.submit(job).await.is_err() {
679 tracing::warn!(piece, "hash pool shut down, treating as failed");
680 }
681 });
682 return;
683 }
684
685 let hash_tx = hash_tx.clone();
687 tokio::spawn(async move {
688 tracing::warn!(piece, "verify: no backend (hash pool path)");
689 let _ = hash_tx
690 .send(crate::hash_pool::HashResult {
691 piece,
692 passed: false,
693 generation,
694 })
695 .await;
696 });
697 return;
698 }
699
700 if let Some(backend) = &self.backend {
702 let backend = Arc::clone(backend);
703 let info_hash = self.info_hash;
704 let result_tx = result_tx.clone();
705 let spawner = self.spawner.clone().unwrap();
706 tokio::spawn(async move {
707 let passed = spawner
708 .block_in_place(move || {
709 backend
710 .hash_piece(info_hash, piece, &expected)
711 .unwrap_or_else(|e| {
712 warn!(piece, %e, "verify: hash_piece failed");
713 false
714 })
715 })
716 .await;
717 let _ = result_tx.send(VerifyResult { piece, passed }).await;
718 });
719 return;
720 }
721
722 let result_tx = result_tx.clone();
724 tokio::spawn(async move {
725 warn!(piece, "verify: no data source, treating as failed");
726 let _ = result_tx
727 .send(VerifyResult {
728 piece,
729 passed: false,
730 })
731 .await;
732 });
733 }
734
735 pub fn enqueue_verify_v2(
739 &self,
740 piece: u32,
741 expected: Id32,
742 result_tx: &mpsc::Sender<VerifyResult>,
743 ) {
744 if let Some(backend) = &self.backend {
745 let backend = Arc::clone(backend);
746 let info_hash = self.info_hash;
747 let result_tx = result_tx.clone();
748 let spawner = self.spawner.clone().unwrap();
749 tokio::spawn(async move {
750 let passed = spawner
751 .block_in_place(move || match backend.read_piece(info_hash, piece) {
752 Ok(data) => {
753 let actual = irontide_core::sha256(&data);
754 actual == expected
755 }
756 Err(e) => {
757 warn!(piece, %e, "verify v2: read_piece failed");
758 false
759 }
760 })
761 .await;
762 let _ = result_tx.send(VerifyResult { piece, passed }).await;
763 });
764 return;
765 }
766
767 let result_tx = result_tx.clone();
769 tokio::spawn(async move {
770 warn!(piece, "verify v2: no data source, treating as failed");
771 let _ = result_tx
772 .send(VerifyResult {
773 piece,
774 passed: false,
775 })
776 .await;
777 });
778 }
779
780 pub(crate) fn write_block_deferred(&self, piece: u32, begin: u32, data: Bytes) {
788 let (write_state, storage) = match (&self.write_state, &self.storage) {
789 (Some(ws), Some(s)) => (ws, s),
790 _ => return, };
792
793 {
795 let mut pending = crate::timed_lock::TimedGuard::new(
796 write_state.pending.lock(),
797 &write_state.lock_timing,
798 "disk_pending",
799 );
800 *pending.entry(piece).or_insert(0) += 1;
801 }
802
803 match write_state.tx.try_send(WriteJob {
804 piece,
805 begin,
806 data: data.clone(),
807 }) {
808 Ok(()) => {}
809 Err(mpsc::error::TrySendError::Full(_)) => {
810 let storage = Arc::clone(storage);
812 if let Some(ref spawner) = self.spawner {
813 spawner.block_in_place_sync(|| {
814 if let Err(e) = storage.write_chunk(piece, begin, &data) {
815 tracing::warn!(piece, begin, %e, "deferred write fallback failed");
816 }
817 });
818 } else {
819 if let Err(e) = storage.write_chunk(piece, begin, &data) {
821 tracing::warn!(piece, begin, %e, "deferred write fallback failed");
822 }
823 }
824 let mut pending = crate::timed_lock::TimedGuard::new(
826 write_state.pending.lock(),
827 &write_state.lock_timing,
828 "disk_pending",
829 );
830 if let Some(count) = pending.get_mut(&piece) {
831 *count = count.saturating_sub(1);
832 if *count == 0 {
833 pending.remove(&piece);
834 drop(pending);
835 write_state.notify.notify_waiters();
836 }
837 }
838 }
839 Err(mpsc::error::TrySendError::Closed(_)) => {
840 let mut pending = crate::timed_lock::TimedGuard::new(
842 write_state.pending.lock(),
843 &write_state.lock_timing,
844 "disk_pending",
845 );
846 if let Some(count) = pending.get_mut(&piece) {
847 *count = count.saturating_sub(1);
848 if *count == 0 {
849 pending.remove(&piece);
850 drop(pending);
851 write_state.notify.notify_waiters();
852 }
853 }
854 }
855 }
856 }
857
858 pub(crate) fn write_block_direct(
868 &self,
869 piece: u32,
870 begin: u32,
871 s0: &[u8],
872 s1: &[u8],
873 ) -> crate::Result<()> {
874 let backend = match &self.backend {
875 Some(b) => b,
876 None => return Ok(()), };
878 backend.write_block_direct(self.info_hash, piece, begin, s0, s1)
879 }
880
881 pub(crate) async fn flush_piece_writes(&self, piece: u32) {
886 let write_state = match &self.write_state {
887 Some(ws) => ws,
888 None => return,
889 };
890
891 loop {
892 {
893 let pending = crate::timed_lock::TimedGuard::new(
894 write_state.pending.lock(),
895 &write_state.lock_timing,
896 "disk_pending",
897 );
898 if !pending.contains_key(&piece) {
899 return;
900 }
901 }
902 write_state.notify.notified().await;
903 }
904 }
905
906 #[allow(dead_code)]
908 pub(crate) fn storage(&self) -> Option<Arc<dyn TorrentStorage>> {
909 self.storage.clone()
910 }
911}
912
913struct DiskActor {
918 rx: mpsc::Receiver<DiskJob>,
919 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
920 spawner: crate::blocking_spawner::BlockingSpawner,
921 #[allow(dead_code)]
922 config: DiskConfig,
923}
924
925impl DiskActor {
926 fn new(
927 rx: mpsc::Receiver<DiskJob>,
928 config: DiskConfig,
929 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
930 spawner: crate::blocking_spawner::BlockingSpawner,
931 ) -> Self {
932 DiskActor {
933 rx,
934 backend,
935 spawner,
936 config,
937 }
938 }
939
940 async fn run(mut self) {
941 loop {
942 let first = match self.rx.recv().await {
944 Some(job) => job,
945 None => break,
946 };
947
948 let mut batch = vec![first];
950 while let Ok(job) = self.rx.try_recv() {
951 batch.push(job);
952 }
953
954 for job in batch {
955 if let DiskJob::Shutdown { reply } = job {
956 let backend = Arc::clone(&self.backend);
958 let spawner = self.spawner.clone();
959 let flush_result = spawner.block_in_place(move || backend.flush_all()).await;
960 if let Err(e) = flush_result {
961 warn!("flush_all on shutdown failed: {e}");
962 }
963 let _ = reply.send(());
964 return;
965 }
966 self.dispatch_job(job);
967 }
968 }
969 }
970
971 fn dispatch_job(&self, job: DiskJob) {
975 match job {
976 DiskJob::Register {
978 info_hash,
979 storage,
980 reply,
981 } => {
982 self.backend.register(info_hash, storage);
983 let _ = reply.send(());
984 }
985 DiskJob::Unregister { info_hash } => {
986 self.backend.unregister(info_hash);
987 }
988 DiskJob::ClearPiece { info_hash, piece } => {
989 self.backend.clear_piece(info_hash, piece);
990 }
991 DiskJob::CachedPieces { info_hash, reply } => {
992 let pieces = self.backend.cached_pieces(info_hash);
993 let _ = reply.send(pieces);
994 }
995
996 DiskJob::Write {
998 info_hash,
999 piece,
1000 begin,
1001 data,
1002 flags,
1003 reply,
1004 } => {
1005 let flush = flags.contains(DiskJobFlags::FLUSH_PIECE);
1006 let backend = Arc::clone(&self.backend);
1007 let spawner = self.spawner.clone();
1008 tokio::spawn(async move {
1009 let result = spawner
1010 .block_in_place(move || {
1011 backend.write_chunk(info_hash, piece, begin, data, flush)
1012 })
1013 .await;
1014 let _ = reply.send(to_storage_result(result));
1015 });
1016 }
1017
1018 DiskJob::Read {
1020 info_hash,
1021 piece,
1022 begin,
1023 length,
1024 flags,
1025 reply,
1026 } => {
1027 let volatile = flags.contains(DiskJobFlags::VOLATILE_READ);
1028 let backend = Arc::clone(&self.backend);
1029 let spawner = self.spawner.clone();
1030 tokio::spawn(async move {
1031 let result = spawner
1032 .block_in_place(move || {
1033 backend.read_chunk(info_hash, piece, begin, length, volatile)
1034 })
1035 .await;
1036 let _ = reply.send(to_storage_result(result));
1037 });
1038 }
1039
1040 DiskJob::Hash {
1042 info_hash,
1043 piece,
1044 expected,
1045 reply,
1046 ..
1047 } => {
1048 let backend = Arc::clone(&self.backend);
1049 let spawner = self.spawner.clone();
1050 tokio::spawn(async move {
1051 let result = spawner
1052 .block_in_place(move || backend.hash_piece(info_hash, piece, &expected))
1053 .await;
1054 let _ = reply.send(to_storage_result(result));
1055 });
1056 }
1057
1058 DiskJob::HashV2 {
1060 info_hash,
1061 piece,
1062 expected,
1063 reply,
1064 ..
1065 } => {
1066 let backend = Arc::clone(&self.backend);
1067 let spawner = self.spawner.clone();
1068 tokio::spawn(async move {
1069 let result = spawner
1070 .block_in_place(move || backend.hash_piece_v2(info_hash, piece, &expected))
1071 .await;
1072 let _ = reply.send(to_storage_result(result));
1073 });
1074 }
1075
1076 DiskJob::BlockHash {
1078 info_hash,
1079 piece,
1080 begin,
1081 length,
1082 reply,
1083 ..
1084 } => {
1085 let backend = Arc::clone(&self.backend);
1086 let spawner = self.spawner.clone();
1087 tokio::spawn(async move {
1088 let result = spawner
1089 .block_in_place(move || backend.hash_block(info_hash, piece, begin, length))
1090 .await;
1091 let _ = reply.send(to_storage_result(result));
1092 });
1093 }
1094
1095 DiskJob::FlushWriteBuffer {
1097 info_hash,
1098 piece,
1099 reply,
1100 } => {
1101 let backend = Arc::clone(&self.backend);
1102 let spawner = self.spawner.clone();
1103 tokio::spawn(async move {
1104 let result = spawner
1105 .block_in_place(move || backend.flush_piece(info_hash, piece))
1106 .await;
1107 let _ = reply.send(to_storage_result(result));
1108 });
1109 }
1110
1111 DiskJob::FlushAll { reply } => {
1113 let backend = Arc::clone(&self.backend);
1114 let spawner = self.spawner.clone();
1115 tokio::spawn(async move {
1116 let result = spawner.block_in_place(move || backend.flush_all()).await;
1117 let _ = reply.send(to_storage_result(result));
1118 });
1119 }
1120
1121 DiskJob::Shutdown { .. } => unreachable!(),
1122 }
1123 }
1124}
1125
1126fn to_storage_result<T>(r: crate::Result<T>) -> irontide_storage::Result<T> {
1128 r.map_err(|e| match e {
1129 crate::Error::Storage(se) => se,
1130 other => irontide_storage::Error::Io(std::io::Error::other(other.to_string())),
1131 })
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136 use super::*;
1137 use irontide_core::Lengths;
1138 use irontide_storage::MemoryStorage;
1139
1140 fn test_config() -> DiskConfig {
1143 DiskConfig {
1144 io_threads: 2,
1145 cache_size: 1024 * 1024,
1146 ..DiskConfig::default()
1147 }
1148 }
1149
1150 fn make_hash(n: u8) -> Id20 {
1151 let mut b = [0u8; 20];
1152 b[0] = n;
1153 Id20(b)
1154 }
1155
1156 #[tokio::test]
1157 async fn async_write_read() {
1158 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1159 let ih = make_hash(1);
1160 let lengths = Lengths::new(100, 50, 25);
1161 let storage = Arc::new(MemoryStorage::new(lengths));
1162 let disk = mgr.register_torrent(ih, storage).await;
1163
1164 let data = Bytes::from(vec![42u8; 25]);
1165 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1166 .await
1167 .unwrap();
1168 let read = disk
1169 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1170 .await
1171 .unwrap();
1172 assert_eq!(read, data);
1173
1174 mgr.shutdown().await;
1175 }
1176
1177 #[tokio::test]
1178 async fn verify_through_handle() {
1179 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1180 let ih = make_hash(2);
1181 let lengths = Lengths::new(100, 50, 25);
1182 let storage = Arc::new(MemoryStorage::new(lengths));
1183 let disk = mgr.register_torrent(ih, storage).await;
1184
1185 let piece_data = vec![9u8; 50];
1186 disk.write_chunk(
1187 0,
1188 0,
1189 Bytes::from(piece_data.clone()),
1190 DiskJobFlags::FLUSH_PIECE,
1191 )
1192 .await
1193 .unwrap();
1194 disk.write_chunk(0, 25, Bytes::from(vec![9u8; 25]), DiskJobFlags::FLUSH_PIECE)
1195 .await
1196 .unwrap();
1197
1198 let expected = irontide_core::sha1(&piece_data);
1199 assert!(
1200 disk.verify_piece(0, expected, DiskJobFlags::empty())
1201 .await
1202 .unwrap()
1203 );
1204 assert!(
1205 !disk
1206 .verify_piece(0, Id20::ZERO, DiskJobFlags::empty())
1207 .await
1208 .unwrap()
1209 );
1210
1211 mgr.shutdown().await;
1212 }
1213
1214 #[tokio::test]
1215 async fn cache_hit_avoids_io() {
1216 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1217 let ih = make_hash(3);
1218 let lengths = Lengths::new(100, 50, 25);
1219 let storage = Arc::new(MemoryStorage::new(lengths));
1220 let disk = mgr.register_torrent(ih, storage).await;
1221
1222 let data = Bytes::from(vec![7u8; 25]);
1223 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1224 .await
1225 .unwrap();
1226
1227 let r1 = disk
1229 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1230 .await
1231 .unwrap();
1232 assert_eq!(r1, data);
1233
1234 let r2 = disk
1236 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1237 .await
1238 .unwrap();
1239 assert_eq!(r2, data);
1240
1241 mgr.shutdown().await;
1242 }
1243
1244 #[tokio::test]
1245 async fn volatile_read_bypasses_cache() {
1246 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1247 let ih = make_hash(4);
1248 let lengths = Lengths::new(100, 50, 25);
1249 let storage = Arc::new(MemoryStorage::new(lengths));
1250 let disk = mgr.register_torrent(ih, storage).await;
1251
1252 let data = Bytes::from(vec![5u8; 25]);
1253 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1254 .await
1255 .unwrap();
1256
1257 let r = disk
1259 .read_chunk(0, 0, 25, DiskJobFlags::VOLATILE_READ)
1260 .await
1261 .unwrap();
1262 assert_eq!(r, data);
1263
1264 mgr.shutdown().await;
1265 }
1266
1267 #[tokio::test]
1268 async fn clear_piece_evicts_cache() {
1269 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1270 let ih = make_hash(5);
1271 let lengths = Lengths::new(100, 50, 25);
1272 let storage = Arc::new(MemoryStorage::new(lengths));
1273 let disk = mgr.register_torrent(ih, storage).await;
1274
1275 let data = Bytes::from(vec![5u8; 25]);
1276 disk.write_chunk(0, 0, data.clone(), DiskJobFlags::FLUSH_PIECE)
1277 .await
1278 .unwrap();
1279 disk.read_chunk(0, 0, 25, DiskJobFlags::empty())
1281 .await
1282 .unwrap();
1283 disk.clear_piece(0).await;
1285
1286 let r = disk
1288 .read_chunk(0, 0, 25, DiskJobFlags::empty())
1289 .await
1290 .unwrap();
1291 assert_eq!(r, data);
1292
1293 mgr.shutdown().await;
1294 }
1295
1296 #[tokio::test]
1297 async fn write_buffer_flush() {
1298 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1299 let ih = make_hash(6);
1300 let lengths = Lengths::new(100, 50, 25);
1301 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1302 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1303
1304 disk.write_chunk(0, 0, Bytes::from(vec![1u8; 25]), DiskJobFlags::empty())
1306 .await
1307 .unwrap();
1308 disk.write_chunk(0, 25, Bytes::from(vec![2u8; 25]), DiskJobFlags::empty())
1309 .await
1310 .unwrap();
1311
1312 disk.flush_piece(0).await.unwrap();
1314
1315 let piece = storage.read_piece(0).unwrap();
1317 assert_eq!(&piece[..25], &[1u8; 25]);
1318 assert_eq!(&piece[25..], &[2u8; 25]);
1319
1320 mgr.shutdown().await;
1321 }
1322
1323 #[tokio::test]
1324 async fn verify_piece_v2_via_disk_handle() {
1325 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1326 let ih = make_hash(11);
1327 let data = vec![0xABu8; 16384];
1328 let expected = irontide_core::sha256(&data);
1329 let lengths = Lengths::new(16384, 16384, 16384);
1330 let storage = Arc::new(MemoryStorage::new(lengths));
1331 storage.write_chunk(0, 0, &data).unwrap();
1332
1333 let disk = mgr.register_torrent(ih, storage).await;
1334 let result = disk
1335 .verify_piece_v2(0, expected, DiskJobFlags::empty())
1336 .await;
1337 assert!(result.unwrap());
1338 mgr.shutdown().await;
1339 }
1340
1341 #[tokio::test]
1342 async fn hash_block_via_disk_handle() {
1343 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1344 let ih = make_hash(12);
1345 let data = vec![0xCDu8; 16384];
1346 let lengths = Lengths::new(16384, 16384, 16384);
1347 let storage = Arc::new(MemoryStorage::new(lengths));
1348 storage.write_chunk(0, 0, &data).unwrap();
1349
1350 let disk = mgr.register_torrent(ih, storage).await;
1351 let hash = disk.hash_block(0, 0, 16384, DiskJobFlags::empty()).await;
1352 assert_eq!(hash.unwrap(), irontide_core::sha256(&data));
1353 mgr.shutdown().await;
1354 }
1355
1356 #[tokio::test]
1357 async fn concurrent_verify_multiple_pieces() {
1358 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1359 let ih = make_hash(10);
1360
1361 let data: Vec<u8> = (0..400).map(|i| (i % 256) as u8).collect();
1363 let piece_len = 50u64;
1364 let lengths = Lengths::new(data.len() as u64, piece_len, 25);
1365 let storage = Arc::new(MemoryStorage::new(lengths.clone()));
1366
1367 let num_pieces = lengths.num_pieces();
1369 for p in 0..num_pieces {
1370 let offset = lengths.piece_offset(p) as usize;
1371 let size = lengths.piece_size(p) as usize;
1372 storage
1373 .write_chunk(p, 0, &data[offset..offset + size])
1374 .unwrap();
1375 }
1376
1377 let disk = mgr.register_torrent(ih, storage).await;
1378
1379 let mut expected_hashes = Vec::new();
1381 for p in 0..num_pieces {
1382 let offset = lengths.piece_offset(p) as usize;
1383 let size = lengths.piece_size(p) as usize;
1384 expected_hashes.push(irontide_core::sha1(&data[offset..offset + size]));
1385 }
1386
1387 let mut js = tokio::task::JoinSet::new();
1389 for p in 0..num_pieces {
1390 let d = disk.clone();
1391 let hash = expected_hashes[p as usize];
1392 js.spawn(async move {
1393 let valid = d
1394 .verify_piece(p, hash, DiskJobFlags::empty())
1395 .await
1396 .unwrap();
1397 (p, valid)
1398 });
1399 }
1400
1401 let mut results = Vec::new();
1402 while let Some(r) = js.join_next().await {
1403 results.push(r.unwrap());
1404 }
1405 results.sort_by_key(|&(p, _)| p);
1406
1407 assert_eq!(results.len(), num_pieces as usize);
1408 for (p, valid) in &results {
1409 assert!(valid, "piece {p} should be valid");
1410 }
1411
1412 mgr.shutdown().await;
1413 }
1414
1415 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1418 async fn write_block_deferred_writes_to_storage() {
1419 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1420 let ih = make_hash(30);
1421 let lengths = Lengths::new(100, 50, 25);
1422 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1423 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1424
1425 let block0 = Bytes::from(vec![0xAAu8; 25]);
1426 let block1 = Bytes::from(vec![0xBBu8; 25]);
1427
1428 disk.write_block_deferred(0, 0, block0.clone());
1429 disk.write_block_deferred(0, 25, block1.clone());
1430
1431 disk.flush_piece_writes(0).await;
1433
1434 let read0 = storage.read_chunk(0, 0, 25).unwrap();
1436 assert_eq!(&read0[..], &block0[..], "block 0 should match");
1437 let read1 = storage.read_chunk(0, 25, 25).unwrap();
1438 assert_eq!(&read1[..], &block1[..], "block 1 should match");
1439
1440 mgr.shutdown().await;
1441 }
1442
1443 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1444 async fn flush_piece_writes_waits_for_completion() {
1445 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1446 let ih = make_hash(31);
1447 let lengths = Lengths::new(200, 100, 25);
1448 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1449 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1450
1451 for i in 0u32..4 {
1453 let data = Bytes::from(vec![(i as u8) + 1; 25]);
1454 disk.write_block_deferred(0, i * 25, data);
1455 }
1456
1457 disk.flush_piece_writes(0).await;
1459
1460 let piece = storage.read_piece(0).unwrap();
1462 assert_eq!(&piece[0..25], &[1u8; 25]);
1463 assert_eq!(&piece[25..50], &[2u8; 25]);
1464 assert_eq!(&piece[50..75], &[3u8; 25]);
1465 assert_eq!(&piece[75..100], &[4u8; 25]);
1466
1467 mgr.shutdown().await;
1468 }
1469
1470 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1473 async fn batch_writer_drains_multiple_jobs() {
1474 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1475 let ih = make_hash(50);
1476 let lengths = Lengths::new(250, 250, 25);
1478 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1479 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1480
1481 for i in 0u32..10 {
1483 let data = Bytes::from(vec![i as u8 + 1; 25]);
1484 disk.write_block_deferred(0, i * 25, data);
1485 }
1486
1487 disk.flush_piece_writes(0).await;
1489
1490 for i in 0u32..10 {
1492 let chunk = storage.read_chunk(0, i * 25, 25).unwrap();
1493 assert_eq!(
1494 &chunk[..],
1495 vec![i as u8 + 1; 25].as_slice(),
1496 "block {i} mismatch"
1497 );
1498 }
1499
1500 mgr.shutdown().await;
1501 }
1502
1503 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1504 async fn batch_writer_caps_at_64() {
1505 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1506 let ih = make_hash(51);
1507 let lengths = Lengths::new(1600, 1600, 16);
1509 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1510 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1511
1512 for i in 0u32..100 {
1515 let data = Bytes::from(vec![i as u8; 16]);
1516 disk.write_block_deferred(0, i * 16, data);
1517 }
1518
1519 disk.flush_piece_writes(0).await;
1521
1522 for i in 0u32..100 {
1524 let chunk = storage.read_chunk(0, i * 16, 16).unwrap();
1525 assert_eq!(
1526 &chunk[..],
1527 vec![i as u8; 16].as_slice(),
1528 "block {i} mismatch after overflow to next batch"
1529 );
1530 }
1531
1532 mgr.shutdown().await;
1533 }
1534
1535 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1538 async fn verify_from_disk_after_deferred_write() {
1539 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1540 let ih = make_hash(40);
1541 let chunk_size = 16384u32;
1542 let piece_size = u64::from(chunk_size) * 2;
1543 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1544 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1545 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1546
1547 let chunk0 = vec![0xAAu8; chunk_size as usize];
1549 let chunk1 = vec![0xBBu8; chunk_size as usize];
1550 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1551 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1552 disk.flush_piece_writes(0).await;
1553
1554 let mut full_piece = Vec::with_capacity(piece_size as usize);
1556 full_piece.extend_from_slice(&chunk0);
1557 full_piece.extend_from_slice(&chunk1);
1558 let expected_hash = irontide_core::sha1(&full_piece);
1559
1560 let (result_tx, mut result_rx) = mpsc::channel(4);
1562 disk.enqueue_verify(0, expected_hash, 0, &result_tx);
1563 let result = result_rx
1564 .recv()
1565 .await
1566 .expect("should receive verify result");
1567 assert_eq!(result.piece, 0);
1568 assert!(result.passed, "disk-based SHA-1 verify should pass");
1569
1570 disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1572 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1573 disk.flush_piece_writes(0).await;
1574 disk.enqueue_verify(0, Id20::ZERO, 0, &result_tx);
1575 let result = result_rx
1576 .recv()
1577 .await
1578 .expect("should receive verify result");
1579 assert!(!result.passed, "wrong hash should fail disk-based verify");
1580
1581 mgr.shutdown().await;
1582 }
1583
1584 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1585 async fn verify_v2_from_disk_after_deferred_write() {
1586 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1587 let ih = make_hash(41);
1588 let chunk_size = 16384u32;
1589 let piece_size = u64::from(chunk_size) * 2;
1590 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1591 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1592 let disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1593
1594 let chunk0 = vec![0xCCu8; chunk_size as usize];
1596 let chunk1 = vec![0xDDu8; chunk_size as usize];
1597 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1598 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1599 disk.flush_piece_writes(0).await;
1600
1601 let mut full_piece = Vec::with_capacity(piece_size as usize);
1603 full_piece.extend_from_slice(&chunk0);
1604 full_piece.extend_from_slice(&chunk1);
1605 let expected_hash = irontide_core::sha256(&full_piece);
1606
1607 let (result_tx, mut result_rx) = mpsc::channel(4);
1609 disk.enqueue_verify_v2(0, expected_hash, &result_tx);
1610 let result = result_rx
1611 .recv()
1612 .await
1613 .expect("should receive v2 verify result");
1614 assert_eq!(result.piece, 0);
1615 assert!(result.passed, "disk-based SHA-256 verify should pass");
1616
1617 disk.write_block_deferred(0, 0, Bytes::from(chunk0));
1619 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1));
1620 disk.flush_piece_writes(0).await;
1621 disk.enqueue_verify_v2(0, Id32::ZERO, &result_tx);
1622 let result = result_rx
1623 .recv()
1624 .await
1625 .expect("should receive v2 verify result");
1626 assert!(
1627 !result.passed,
1628 "wrong hash should fail disk-based v2 verify"
1629 );
1630
1631 mgr.shutdown().await;
1632 }
1633
1634 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1635 async fn verify_with_hash_pool_from_disk() {
1636 let (mgr, _actor) = DiskManagerHandle::new(test_config());
1637 let ih = make_hash(42);
1638 let chunk_size = 16384u32;
1639 let piece_size = u64::from(chunk_size) * 2;
1640 let lengths = Lengths::new(piece_size, piece_size, chunk_size);
1641 let storage: Arc<dyn TorrentStorage> = Arc::new(MemoryStorage::new(lengths));
1642 let mut disk = mgr.register_torrent(ih, Arc::clone(&storage)).await;
1643
1644 let (hash_result_tx, mut hash_result_rx) = mpsc::channel(4);
1646 disk.set_hash_result_tx(hash_result_tx);
1647 let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(2, 16));
1648 disk.set_hash_pool(hash_pool);
1649
1650 let chunk0 = vec![0xEEu8; chunk_size as usize];
1652 let chunk1 = vec![0xFFu8; chunk_size as usize];
1653 disk.write_block_deferred(0, 0, Bytes::from(chunk0.clone()));
1654 disk.write_block_deferred(0, chunk_size, Bytes::from(chunk1.clone()));
1655 disk.flush_piece_writes(0).await;
1656
1657 let mut full_piece = Vec::with_capacity(piece_size as usize);
1659 full_piece.extend_from_slice(&chunk0);
1660 full_piece.extend_from_slice(&chunk1);
1661 let expected_hash = irontide_core::sha1(&full_piece);
1662
1663 let (verify_result_tx, _) = mpsc::channel(4); disk.enqueue_verify(0, expected_hash, 42, &verify_result_tx);
1666 let result = hash_result_rx
1667 .recv()
1668 .await
1669 .expect("should receive hash pool result");
1670 assert!(result.passed, "hash pool disk-based verify should pass");
1671 assert_eq!(result.piece, 0);
1672 assert_eq!(result.generation, 42);
1673
1674 mgr.shutdown().await;
1675 }
1676}