1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_precision_loss,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 reason = "M175: disk I/O sizes bounded by piece_length (u32 by construction in Lengths::new); offset arithmetic stays within file lengths"
7)]
8
9use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::{Mutex, RwLock};
19
20use bytes::Bytes;
21use irontide_core::{Id20, Id32};
22use irontide_storage::TorrentStorage;
23
24use crate::buffer_pool::BufferPool;
25use crate::disk::DiskConfig;
26
27#[derive(Debug, Clone, Default)]
29pub struct DiskIoStats {
30 pub read_bytes: u64,
32 pub write_bytes: u64,
34 pub cache_hits: u64,
36 pub cache_misses: u64,
38 pub write_buffer_bytes: usize,
40 pub read_cache_bytes: usize,
42 pub pool_entries: usize,
44 pub prefetch_count: u64,
46 pub eviction_count: u64,
48 pub skeleton_count: u64,
50}
51
52pub trait DiskIoBackend: Send + Sync {
57 fn name(&self) -> &str;
59
60 fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>);
62
63 fn unregister(&self, info_hash: Id20);
65
66 fn write_chunk(
72 &self,
73 info_hash: Id20,
74 piece: u32,
75 begin: u32,
76 data: Bytes,
77 flush: bool,
78 ) -> crate::Result<()>;
79
80 fn read_chunk(
86 &self,
87 info_hash: Id20,
88 piece: u32,
89 begin: u32,
90 length: u32,
91 volatile: bool,
92 ) -> crate::Result<Bytes>;
93
94 fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>>;
102
103 fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool>;
109
110 fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool>;
116
117 fn hash_block(
123 &self,
124 info_hash: Id20,
125 piece: u32,
126 begin: u32,
127 length: u32,
128 ) -> crate::Result<Id32>;
129
130 fn clear_piece(&self, info_hash: Id20, piece: u32);
132
133 fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()>;
139
140 fn flush_all(&self) -> crate::Result<()>;
146
147 fn cached_pieces(&self, info_hash: Id20) -> Vec<u32>;
149
150 fn stats(&self) -> DiskIoStats;
152
153 fn write_block_direct(
165 &self,
166 info_hash: Id20,
167 piece: u32,
168 begin: u32,
169 s0: &[u8],
170 s1: &[u8],
171 ) -> crate::Result<()>;
172}
173
174pub struct DisabledDiskIo;
179
180impl DiskIoBackend for DisabledDiskIo {
181 fn name(&self) -> &'static str {
182 "disabled"
183 }
184
185 fn register(&self, _info_hash: Id20, _storage: Arc<dyn TorrentStorage>) {}
186
187 fn unregister(&self, _info_hash: Id20) {}
188
189 fn write_chunk(
190 &self,
191 _info_hash: Id20,
192 _piece: u32,
193 _begin: u32,
194 _data: Bytes,
195 _flush: bool,
196 ) -> crate::Result<()> {
197 Ok(())
198 }
199
200 fn read_chunk(
201 &self,
202 _info_hash: Id20,
203 _piece: u32,
204 _begin: u32,
205 length: u32,
206 _volatile: bool,
207 ) -> crate::Result<Bytes> {
208 Ok(Bytes::from(vec![0u8; length as usize]))
209 }
210
211 fn read_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<Vec<u8>> {
212 Ok(Vec::new())
213 }
214
215 fn hash_piece(&self, _info_hash: Id20, _piece: u32, _expected: &Id20) -> crate::Result<bool> {
216 Ok(true)
217 }
218
219 fn hash_piece_v2(
220 &self,
221 _info_hash: Id20,
222 _piece: u32,
223 _expected: &Id32,
224 ) -> crate::Result<bool> {
225 Ok(true)
226 }
227
228 fn hash_block(
229 &self,
230 _info_hash: Id20,
231 _piece: u32,
232 _begin: u32,
233 _length: u32,
234 ) -> crate::Result<Id32> {
235 Ok(Id32([0u8; 32]))
236 }
237
238 fn clear_piece(&self, _info_hash: Id20, _piece: u32) {}
239
240 fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
241 Ok(())
242 }
243
244 fn flush_all(&self) -> crate::Result<()> {
245 Ok(())
246 }
247
248 fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
249 vec![]
250 }
251
252 fn stats(&self) -> DiskIoStats {
253 DiskIoStats::default()
254 }
255
256 fn write_block_direct(
257 &self,
258 _info_hash: Id20,
259 _piece: u32,
260 _begin: u32,
261 _s0: &[u8],
262 _s1: &[u8],
263 ) -> crate::Result<()> {
264 Ok(())
265 }
266}
267
268pub struct PosixDiskIo {
279 storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
280 pool: Mutex<BufferPool>,
281 stats: Mutex<DiskIoStats>,
282}
283
284impl PosixDiskIo {
285 #[must_use]
287 pub fn new(config: &DiskConfig) -> Self {
288 let mut pool = BufferPool::with_capacity(config.buffer_pool_capacity);
289 pool.set_mlock(config.enable_mlock);
290 Self {
291 storages: RwLock::new(HashMap::new()),
292 pool: Mutex::new(pool),
293 stats: Mutex::new(DiskIoStats::default()),
294 }
295 }
296
297 fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
298 self.storages
299 .read()
300 .get(&info_hash)
301 .cloned()
302 .ok_or_else(|| {
303 crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
304 std::io::ErrorKind::NotFound,
305 "torrent not registered",
306 )))
307 })
308 }
309}
310
311impl DiskIoBackend for PosixDiskIo {
312 fn name(&self) -> &'static str {
313 "posix"
314 }
315
316 fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
317 self.storages.write().insert(info_hash, storage);
318 }
319
320 fn unregister(&self, info_hash: Id20) {
321 self.storages.write().remove(&info_hash);
322 self.pool.lock().clear_torrent(info_hash);
323 }
324
325 fn write_chunk(
326 &self,
327 info_hash: Id20,
328 piece: u32,
329 begin: u32,
330 data: Bytes,
331 flush: bool,
332 ) -> crate::Result<()> {
333 let len = data.len();
334
335 if flush {
336 let storage = self.get_storage(info_hash)?;
337 storage.write_chunk(piece, begin, &data)?;
338 self.stats.lock().write_bytes += len as u64;
339 return Ok(());
340 }
341
342 let key = (info_hash, piece);
346 self.pool.lock().write_block(key, begin, data, 0);
347 self.stats.lock().write_bytes += len as u64;
348 Ok(())
349 }
350
351 fn read_chunk(
352 &self,
353 info_hash: Id20,
354 piece: u32,
355 begin: u32,
356 length: u32,
357 volatile: bool,
358 ) -> crate::Result<Bytes> {
359 let key = (info_hash, piece);
360
361 if !volatile {
363 let mut pool = self.pool.lock();
364 if let Some(data) = pool.read_block(key, begin, length as usize) {
365 drop(pool);
366 self.stats.lock().cache_hits += 1;
367 return Ok(data);
368 }
369 }
370 self.stats.lock().cache_misses += 1;
371
372 let storage = self.get_storage(info_hash)?;
374 if !volatile {
375 if let Ok(piece_data) = storage.read_piece(piece) {
376 let piece_bytes = Bytes::from(piece_data);
377 self.stats.lock().read_bytes += piece_bytes.len() as u64;
378
379 let mut pool = self.pool.lock();
380 pool.prefetch_piece(key, piece_bytes.clone());
381 drop(pool);
382
383 let end = (begin as usize).saturating_add(length as usize);
385 if end <= piece_bytes.len() {
386 return Ok(piece_bytes.slice(begin as usize..end));
387 }
388 } else {
391 }
393 }
394
395 let data = storage.read_chunk(piece, begin, length)?;
397 let bytes = Bytes::from(data);
398 self.stats.lock().read_bytes += bytes.len() as u64;
399 Ok(bytes)
400 }
401
402 fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
403 self.flush_piece(info_hash, piece)?;
404 let storage = self.get_storage(info_hash)?;
405 let data = storage.read_piece(piece)?;
406 self.stats.lock().read_bytes += data.len() as u64;
407 Ok(data)
408 }
409
410 fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
411 let key = (info_hash, piece);
412
413 let cached_data = {
418 let mut pool = self.pool.lock();
419 pool.take_all_blocks(key)
420 };
421
422 if let Some(data) = cached_data {
423 let hash = irontide_core::sha1(&data);
424 if hash == *expected {
425 let storage = self.get_storage(info_hash)?;
427 storage.write_chunk(piece, 0, &data)?;
428 self.stats.lock().write_bytes += data.len() as u64;
429
430 let mut pool = self.pool.lock();
431 pool.promote_to_cached(key, Bytes::from(data));
432 return Ok(true);
433 }
434 return Ok(false);
436 }
437
438 self.flush_piece(info_hash, piece)?;
440 let storage = self.get_storage(info_hash)?;
441 Ok(storage.verify_piece(piece, expected)?)
442 }
443
444 fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
445 let key = (info_hash, piece);
446
447 let cached_data = {
449 let mut pool = self.pool.lock();
450 pool.take_all_blocks(key)
451 };
452
453 if let Some(data) = cached_data {
454 let hash = irontide_core::sha256(&data);
455 if hash == *expected {
456 let storage = self.get_storage(info_hash)?;
457 storage.write_chunk(piece, 0, &data)?;
458 self.stats.lock().write_bytes += data.len() as u64;
459
460 let mut pool = self.pool.lock();
461 pool.promote_to_cached(key, Bytes::from(data));
462 return Ok(true);
463 }
464 return Ok(false);
465 }
466
467 self.flush_piece(info_hash, piece)?;
468 let storage = self.get_storage(info_hash)?;
469 Ok(storage.verify_piece_v2(piece, expected)?)
470 }
471
472 fn hash_block(
473 &self,
474 info_hash: Id20,
475 piece: u32,
476 begin: u32,
477 length: u32,
478 ) -> crate::Result<Id32> {
479 self.flush_piece(info_hash, piece)?;
480 let storage = self.get_storage(info_hash)?;
481 Ok(storage.hash_block(piece, begin, length)?)
482 }
483
484 fn clear_piece(&self, info_hash: Id20, piece: u32) {
485 self.pool.lock().clear_piece((info_hash, piece));
486 }
487
488 fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()> {
489 let blocks = {
490 let mut pool = self.pool.lock();
491 match pool.flush_piece((info_hash, piece)) {
492 Some(b) => b,
493 None => return Ok(()),
494 }
495 };
496 let storage = self.get_storage(info_hash)?;
497 for (begin, data) in blocks {
498 storage.write_chunk(piece, begin, &data)?;
499 }
500 Ok(())
501 }
502
503 fn flush_all(&self) -> crate::Result<()> {
504 let all_blocks = {
505 let mut pool = self.pool.lock();
506 pool.flush_all()
507 };
508 for ((info_hash, piece), blocks) in all_blocks {
509 let storage = self.get_storage(info_hash)?;
510 for (begin, data) in blocks {
511 storage.write_chunk(piece, begin, &data)?;
512 }
513 }
514 Ok(())
515 }
516
517 fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
518 let pool = self.pool.lock();
519 pool.hot_pieces(info_hash)
520 }
521
522 fn stats(&self) -> DiskIoStats {
523 let mut s = self.stats.lock().clone();
524 let pool = self.pool.lock();
525 let ps = pool.stats();
526 s.write_buffer_bytes = ps.write_buffer_bytes;
527 s.read_cache_bytes = ps.read_cache_bytes;
528 s.pool_entries = ps.total_entries;
529 s.prefetch_count = ps.prefetch_count;
530 s.eviction_count = ps.eviction_count;
531 s.skeleton_count = ps.skeleton_count;
532 s
533 }
534
535 fn write_block_direct(
536 &self,
537 info_hash: Id20,
538 piece: u32,
539 begin: u32,
540 s0: &[u8],
541 s1: &[u8],
542 ) -> crate::Result<()> {
543 let storage = self.get_storage(info_hash)?;
544 storage.write_chunk_vectored(piece, begin, s0, s1)?;
545 let total = (s0.len() + s1.len()) as u64;
546 self.stats.lock().write_bytes += total;
547 Ok(())
548 }
549}
550
551pub struct MmapDiskIo {
560 storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
561 stats: Mutex<DiskIoStats>,
562}
563
564impl MmapDiskIo {
565 #[must_use]
567 pub fn new() -> Self {
568 Self {
569 storages: RwLock::new(HashMap::new()),
570 stats: Mutex::new(DiskIoStats::default()),
571 }
572 }
573
574 fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
575 self.storages
576 .read()
577 .get(&info_hash)
578 .cloned()
579 .ok_or_else(|| {
580 crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
581 std::io::ErrorKind::NotFound,
582 "torrent not registered",
583 )))
584 })
585 }
586}
587
588impl Default for MmapDiskIo {
589 fn default() -> Self {
590 Self::new()
591 }
592}
593
594impl DiskIoBackend for MmapDiskIo {
595 fn name(&self) -> &'static str {
596 "mmap"
597 }
598
599 fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
600 self.storages.write().insert(info_hash, storage);
601 }
602
603 fn unregister(&self, info_hash: Id20) {
604 self.storages.write().remove(&info_hash);
605 }
606
607 fn write_chunk(
608 &self,
609 info_hash: Id20,
610 piece: u32,
611 begin: u32,
612 data: Bytes,
613 _flush: bool,
614 ) -> crate::Result<()> {
615 let len = data.len();
616 let storage = self.get_storage(info_hash)?;
617 storage.write_chunk(piece, begin, &data)?;
618 self.stats.lock().write_bytes += len as u64;
619 Ok(())
620 }
621
622 fn read_chunk(
623 &self,
624 info_hash: Id20,
625 piece: u32,
626 begin: u32,
627 length: u32,
628 _volatile: bool,
629 ) -> crate::Result<Bytes> {
630 let storage = self.get_storage(info_hash)?;
631 let data = storage.read_chunk(piece, begin, length)?;
632 let bytes = Bytes::from(data);
633 self.stats.lock().read_bytes += bytes.len() as u64;
634 Ok(bytes)
635 }
636
637 fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
638 let storage = self.get_storage(info_hash)?;
639 let data = storage.read_piece(piece)?;
640 self.stats.lock().read_bytes += data.len() as u64;
641 Ok(data)
642 }
643
644 fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
645 let storage = self.get_storage(info_hash)?;
646 Ok(storage.verify_piece(piece, expected)?)
647 }
648
649 fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
650 let storage = self.get_storage(info_hash)?;
651 Ok(storage.verify_piece_v2(piece, expected)?)
652 }
653
654 fn hash_block(
655 &self,
656 info_hash: Id20,
657 piece: u32,
658 begin: u32,
659 length: u32,
660 ) -> crate::Result<Id32> {
661 let storage = self.get_storage(info_hash)?;
662 Ok(storage.hash_block(piece, begin, length)?)
663 }
664
665 fn clear_piece(&self, _info_hash: Id20, _piece: u32) {
666 }
668
669 fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
670 Ok(())
672 }
673
674 fn flush_all(&self) -> crate::Result<()> {
675 Ok(())
677 }
678
679 fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
680 vec![]
681 }
682
683 fn stats(&self) -> DiskIoStats {
684 self.stats.lock().clone()
685 }
686
687 fn write_block_direct(
688 &self,
689 info_hash: Id20,
690 piece: u32,
691 begin: u32,
692 s0: &[u8],
693 s1: &[u8],
694 ) -> crate::Result<()> {
695 let storage = self.get_storage(info_hash)?;
696 storage.write_chunk_vectored(piece, begin, s0, s1)?;
697 let total = (s0.len() + s1.len()) as u64;
698 self.stats.lock().write_bytes += total;
699 Ok(())
700 }
701}
702
703#[must_use]
713pub fn create_backend_from_config(config: &DiskConfig) -> Arc<dyn DiskIoBackend> {
714 #[cfg(all(target_os = "linux", feature = "io-uring"))]
715 if config.storage_mode == irontide_core::StorageMode::IoUring {
716 let uring_config = irontide_storage::IoUringConfig {
717 sq_depth: config.io_uring_sq_depth,
718 enable_direct_io: config.io_uring_direct_io,
719 batch_threshold: config.io_uring_batch_threshold,
720 };
721 match crate::io_uring_backend::IoUringDiskIo::new(config, uring_config) {
722 Ok(backend) => return Arc::new(backend),
723 Err(e) => {
724 tracing::warn!("io_uring init failed, falling back to posix: {e}");
725 }
726 }
727 }
728
729 #[cfg(all(target_os = "windows", feature = "iocp"))]
730 if config.storage_mode == irontide_core::StorageMode::Iocp {
731 let iocp_config = irontide_storage::IocpConfig {
732 concurrent_threads: config.iocp_concurrent_threads,
733 enable_direct_io: config.iocp_direct_io,
734 };
735 match crate::iocp_backend::IocpDiskIo::new(config, iocp_config) {
736 Ok(backend) => return Arc::new(backend),
737 Err(e) => {
738 tracing::warn!("IOCP init failed, falling back to posix: {e}");
739 }
740 }
741 }
742
743 if config.storage_mode == irontide_core::StorageMode::Mmap {
744 Arc::new(MmapDiskIo::new())
745 } else {
746 Arc::new(PosixDiskIo::new(config))
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use irontide_core::Lengths;
754 use irontide_storage::MemoryStorage;
755
756 fn make_hash() -> Id20 {
757 Id20([0xAB; 20])
758 }
759
760 fn make_hash_n(n: u8) -> Id20 {
761 let mut b = [0u8; 20];
762 b[0] = n;
763 Id20(b)
764 }
765
766 fn test_config() -> DiskConfig {
767 DiskConfig {
768 io_threads: 2,
769 cache_size: 1024 * 1024, ..DiskConfig::default()
771 }
772 }
773
774 fn make_storage(piece_size: u64) -> Arc<dyn TorrentStorage> {
776 let chunk = piece_size.min(16384) as u32;
777 let lengths = Lengths::new(piece_size, piece_size, chunk);
778 Arc::new(MemoryStorage::new(lengths))
779 }
780
781 fn make_storage_full(total: u64, piece_len: u64, chunk_size: u32) -> Arc<dyn TorrentStorage> {
783 let lengths = Lengths::new(total, piece_len, chunk_size);
784 Arc::new(MemoryStorage::new(lengths))
785 }
786
787 #[test]
792 fn disabled_backend_name() {
793 let backend = DisabledDiskIo;
794 assert_eq!(backend.name(), "disabled");
795 }
796
797 #[test]
798 fn disabled_backend_write_succeeds() {
799 let backend = DisabledDiskIo;
800 let result =
801 backend.write_chunk(make_hash(), 0, 0, Bytes::from_static(&[1, 2, 3, 4]), false);
802 assert!(result.is_ok());
803 }
804
805 #[test]
806 fn disabled_backend_read_returns_zeroed() {
807 let backend = DisabledDiskIo;
808 let length = 16384u32;
809 let data = backend
810 .read_chunk(make_hash(), 0, 0, length, false)
811 .unwrap();
812 assert_eq!(data.len(), length as usize);
813 assert!(data.iter().all(|&b| b == 0));
814 }
815
816 #[test]
817 fn disabled_backend_hash_always_passes() {
818 let backend = DisabledDiskIo;
819 let expected = Id20([0xFF; 20]);
820 let result = backend.hash_piece(make_hash(), 42, &expected).unwrap();
821 assert!(result);
822 }
823
824 #[test]
825 fn disabled_backend_hash_v2_always_passes() {
826 let backend = DisabledDiskIo;
827 let expected = Id32([0xFF; 32]);
828 let result = backend.hash_piece_v2(make_hash(), 42, &expected).unwrap();
829 assert!(result);
830 }
831
832 #[test]
833 fn disabled_backend_stats_default() {
834 let backend = DisabledDiskIo;
835 let stats = backend.stats();
836 assert_eq!(stats.read_bytes, 0);
837 assert_eq!(stats.write_bytes, 0);
838 assert_eq!(stats.cache_hits, 0);
839 assert_eq!(stats.cache_misses, 0);
840 assert_eq!(stats.write_buffer_bytes, 0);
841 }
842
843 #[test]
844 fn disabled_backend_cached_pieces_empty() {
845 let backend = DisabledDiskIo;
846 let pieces = backend.cached_pieces(make_hash());
847 assert!(pieces.is_empty());
848 }
849
850 #[test]
855 fn posix_backend_name() {
856 let backend = PosixDiskIo::new(&test_config());
857 assert_eq!(backend.name(), "posix");
858 }
859
860 #[test]
861 fn posix_register_unregister() {
862 let backend = PosixDiskIo::new(&test_config());
863 let ih = make_hash_n(1);
864 let storage = make_storage(100);
865 backend.register(ih, storage);
866
867 assert!(backend.read_chunk(ih, 0, 0, 10, false).is_ok());
869
870 backend.unregister(ih);
871
872 assert!(backend.read_chunk(ih, 0, 0, 10, false).is_err());
874 }
875
876 #[test]
877 fn posix_write_and_read_flush() {
878 let backend = PosixDiskIo::new(&test_config());
879 let ih = make_hash_n(2);
880 let storage = make_storage(100);
881 backend.register(ih, storage);
882
883 let data = vec![42u8; 50];
884 backend
885 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
886 .unwrap();
887 let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
888 assert_eq!(&read[..], &data[..]);
889 }
890
891 #[test]
892 fn posix_write_buffered_then_read() {
893 let backend = PosixDiskIo::new(&test_config());
894 let ih = make_hash_n(3);
895 let storage = make_storage(100);
896 backend.register(ih, storage);
897
898 let data = vec![99u8; 50];
899 backend
901 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
902 .unwrap();
903
904 let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
906 assert_eq!(&read[..], &data[..]);
907
908 let stats = backend.stats();
910 assert!(stats.cache_hits >= 1);
911 }
912
913 #[test]
914 fn posix_read_cache_hit() {
915 let backend = PosixDiskIo::new(&test_config());
916 let ih = make_hash_n(4);
917 let storage = make_storage(100);
918 backend.register(ih, storage);
919
920 let data = vec![7u8; 50];
921 backend
922 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
923 .unwrap();
924
925 let r1 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
927 assert_eq!(&r1[..], &data[..]);
928 let s1 = backend.stats();
929 assert_eq!(s1.cache_misses, 1);
930
931 let r2 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
933 assert_eq!(&r2[..], &data[..]);
934 let s2 = backend.stats();
935 assert_eq!(s2.cache_hits, 1);
936 }
937
938 #[test]
939 fn posix_hash_piece_correct() {
940 let backend = PosixDiskIo::new(&test_config());
941 let ih = make_hash_n(5);
942 let storage = make_storage(50);
943 backend.register(ih, storage);
944
945 let data = vec![9u8; 50];
946 backend
947 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
948 .unwrap();
949
950 let expected = irontide_core::sha1(&data);
951 assert!(backend.hash_piece(ih, 0, &expected).unwrap());
952 }
953
954 #[test]
955 fn posix_hash_piece_wrong() {
956 let backend = PosixDiskIo::new(&test_config());
957 let ih = make_hash_n(6);
958 let storage = make_storage(50);
959 backend.register(ih, storage);
960
961 let data = vec![9u8; 50];
962 backend
963 .write_chunk(ih, 0, 0, Bytes::from(data), true)
964 .unwrap();
965
966 let wrong = Id20([0xFF; 20]);
967 assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
968 }
969
970 #[test]
971 fn posix_hash_piece_v2() {
972 let backend = PosixDiskIo::new(&test_config());
973 let ih = make_hash_n(7);
974 let storage = make_storage(16384);
975 backend.register(ih, storage);
976
977 let data = vec![0xABu8; 16384];
978 backend
979 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
980 .unwrap();
981
982 let expected = irontide_core::sha256(&data);
983 assert!(backend.hash_piece_v2(ih, 0, &expected).unwrap());
984 }
985
986 #[test]
987 fn posix_clear_piece_drops_buffer() {
988 let backend = PosixDiskIo::new(&test_config());
989 let ih = make_hash_n(8);
990 let storage = make_storage(100);
991 backend.register(ih, storage);
992
993 let data = vec![55u8; 50];
994 backend
996 .write_chunk(ih, 0, 0, Bytes::from(data), false)
997 .unwrap();
998 assert!(backend.stats().write_buffer_bytes > 0);
999
1000 backend.clear_piece(ih, 0);
1001 assert_eq!(backend.stats().write_buffer_bytes, 0);
1002 }
1003
1004 #[test]
1005 fn posix_cached_pieces() {
1006 let backend = PosixDiskIo::new(&test_config());
1007 let ih = make_hash_n(9);
1008 let storage = make_storage_full(100, 50, 25);
1010 backend.register(ih, storage);
1011
1012 let data = vec![1u8; 25];
1013 backend
1014 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
1015 .unwrap();
1016 backend
1017 .write_chunk(ih, 1, 0, Bytes::from(data), true)
1018 .unwrap();
1019
1020 backend.read_chunk(ih, 0, 0, 25, false).unwrap();
1022 backend.read_chunk(ih, 0, 0, 25, false).unwrap();
1023
1024 let cached = backend.cached_pieces(ih);
1025 assert!(cached.contains(&0));
1026 assert!(!cached.contains(&1));
1028 }
1029
1030 #[test]
1035 fn mmap_backend_name() {
1036 let backend = MmapDiskIo::new();
1037 assert_eq!(backend.name(), "mmap");
1038 }
1039
1040 #[test]
1041 fn mmap_write_and_read() {
1042 let backend = MmapDiskIo::new();
1043 let ih = make_hash_n(10);
1044 let storage = make_storage(100);
1045 backend.register(ih, storage);
1046
1047 let data = vec![42u8; 50];
1048 backend
1049 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
1050 .unwrap();
1051 let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1052 assert_eq!(&read[..], &data[..]);
1053 }
1054
1055 #[test]
1056 fn mmap_cached_pieces_always_empty() {
1057 let backend = MmapDiskIo::new();
1058 let ih = make_hash_n(11);
1059 let storage = make_storage(100);
1060 backend.register(ih, storage);
1061
1062 backend
1063 .write_chunk(ih, 0, 0, Bytes::from(vec![1u8; 50]), false)
1064 .unwrap();
1065 backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1066
1067 assert!(backend.cached_pieces(ih).is_empty());
1068 }
1069
1070 #[test]
1071 fn mmap_stats_track_io() {
1072 let backend = MmapDiskIo::new();
1073 let ih = make_hash_n(12);
1074 let storage = make_storage(100);
1075 backend.register(ih, storage);
1076
1077 backend
1078 .write_chunk(ih, 0, 0, Bytes::from(vec![1u8; 50]), false)
1079 .unwrap();
1080 let stats = backend.stats();
1081 assert_eq!(stats.write_bytes, 50);
1082
1083 backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1084 let stats = backend.stats();
1085 assert_eq!(stats.read_bytes, 50);
1086 }
1087
1088 #[test]
1093 fn read_piece_returns_full_piece() {
1094 let backend = PosixDiskIo::new(&test_config());
1095 let ih = make_hash_n(20);
1096 let chunk_size = 16384u32;
1097 let piece_size = u64::from(chunk_size) * 2; let storage = make_storage_full(piece_size, piece_size, chunk_size);
1099 backend.register(ih, storage);
1100
1101 let chunk0 = vec![0xAAu8; chunk_size as usize];
1103 let chunk1 = vec![0xBBu8; chunk_size as usize];
1104 backend
1105 .write_chunk(ih, 0, 0, Bytes::from(chunk0.clone()), false)
1106 .expect("write chunk 0");
1107 backend
1108 .write_chunk(ih, 0, chunk_size, Bytes::from(chunk1.clone()), false)
1109 .expect("write chunk 1");
1110
1111 let piece_data = backend.read_piece(ih, 0).expect("read_piece");
1113 assert_eq!(piece_data.len(), piece_size as usize);
1114 assert_eq!(&piece_data[..chunk_size as usize], &chunk0[..]);
1115 assert_eq!(&piece_data[chunk_size as usize..], &chunk1[..]);
1116
1117 let stats = backend.stats();
1119 assert!(stats.read_bytes >= piece_size);
1120 }
1121
1122 #[test]
1127 fn posix_hash_from_cache_pass() {
1128 let backend = PosixDiskIo::new(&test_config());
1129 let ih = make_hash_n(30);
1130 let storage = make_storage(100);
1131 backend.register(ih, storage);
1132
1133 let d1 = vec![0xAA_u8; 50];
1135 let d2 = vec![0xBB_u8; 50];
1136 backend
1137 .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
1138 .unwrap();
1139 backend
1140 .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
1141 .unwrap();
1142
1143 let mut full = d1;
1145 full.extend_from_slice(&d2);
1146 let expected = irontide_core::sha1(&full);
1147
1148 assert!(backend.hash_piece(ih, 0, &expected).unwrap());
1150
1151 let piece_data = backend.read_piece(ih, 0).unwrap();
1153 assert_eq!(piece_data.len(), 100);
1154
1155 backend.read_chunk(ih, 0, 0, 50, false).unwrap();
1157
1158 let cached = backend.cached_pieces(ih);
1160 assert!(cached.contains(&0));
1161 }
1162
1163 #[test]
1164 fn posix_hash_from_cache_fail() {
1165 let backend = PosixDiskIo::new(&test_config());
1166 let ih = make_hash_n(31);
1167 let storage = make_storage(50);
1168 backend.register(ih, storage);
1169
1170 backend
1172 .write_chunk(ih, 0, 0, Bytes::from(vec![0xCC_u8; 50]), false)
1173 .unwrap();
1174
1175 let wrong = Id20([0xFF; 20]);
1177 assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
1178
1179 assert_eq!(backend.stats().write_buffer_bytes, 0);
1181 }
1182
1183 #[test]
1188 fn factory_creates_mmap_for_mmap_mode() {
1189 let config = DiskConfig {
1190 storage_mode: irontide_core::StorageMode::Mmap,
1191 ..DiskConfig::default()
1192 };
1193 let backend = create_backend_from_config(&config);
1194 assert_eq!(backend.name(), "mmap");
1195 }
1196
1197 #[test]
1198 fn factory_creates_posix_for_auto_mode() {
1199 let config = DiskConfig {
1200 storage_mode: irontide_core::StorageMode::Auto,
1201 ..DiskConfig::default()
1202 };
1203 let backend = create_backend_from_config(&config);
1204 assert_eq!(backend.name(), "posix");
1205 }
1206
1207 #[test]
1212 fn piece_completion_flows_to_hash_pool() {
1213 let backend = PosixDiskIo::new(&test_config());
1214 let ih = make_hash_n(40);
1215 let storage = make_storage(100);
1216 backend.register(ih, storage);
1217
1218 let d1 = vec![0xAA; 50];
1220 let d2 = vec![0xBB; 50];
1221 backend
1222 .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
1223 .expect("write block 0");
1224 backend
1225 .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
1226 .expect("write block 1");
1227
1228 let mut full = d1;
1229 full.extend_from_slice(&d2);
1230 let expected_hash = irontide_core::sha1(&full);
1231
1232 assert!(
1234 backend
1235 .hash_piece(ih, 0, &expected_hash)
1236 .expect("hash_piece"),
1237 "hash should match"
1238 );
1239
1240 let piece = backend.read_piece(ih, 0).expect("read_piece after hash");
1242 assert_eq!(piece, full);
1243
1244 let _ = backend
1247 .read_chunk(ih, 0, 0, 100, false)
1248 .expect("second read");
1249 let cached = backend.cached_pieces(ih);
1250 assert!(
1251 cached.contains(&0),
1252 "piece 0 should be in hot pieces after two accesses"
1253 );
1254 }
1255
1256 #[test]
1257 fn back_pressure_flush_reaches_disk() {
1258 let config = DiskConfig {
1260 buffer_pool_capacity: 64 * 1024,
1261 ..test_config()
1262 };
1263 let backend = PosixDiskIo::new(&config);
1264 let ih = make_hash_n(41);
1265 let piece_size = 16384_u64;
1266 let storage = make_storage_full(piece_size * 10, piece_size, 16384);
1267 backend.register(ih, storage);
1268
1269 for p in 0..8_u32 {
1273 let data = vec![p as u8; piece_size as usize];
1274 backend
1275 .write_chunk(ih, p, 0, Bytes::from(data), false)
1276 .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1277 backend
1278 .flush_piece(ih, p)
1279 .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1280 }
1281
1282 for p in 0..8_u32 {
1284 let piece = backend
1285 .read_piece(ih, p)
1286 .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1287 assert_eq!(piece.len(), piece_size as usize);
1288 assert!(
1289 piece.iter().all(|&b| b == p as u8),
1290 "piece {p} data mismatch"
1291 );
1292 }
1293
1294 for p in 0..6_u32 {
1297 let data = vec![(p + 10) as u8; piece_size as usize];
1298 backend
1299 .write_chunk(ih, p, 0, Bytes::from(data), false)
1300 .unwrap_or_else(|e| panic!("overwrite piece {p}: {e}"));
1301 }
1302 let stats = backend.stats();
1303 assert!(
1304 stats.skeleton_count > 0 || stats.eviction_count > 0,
1305 "should see evictions when exceeding budget: skeleton={}, eviction={}",
1306 stats.skeleton_count,
1307 stats.eviction_count
1308 );
1309 }
1310
1311 #[test]
1312 fn prefetch_then_suggest_then_serve() {
1313 let backend = PosixDiskIo::new(&test_config());
1314 let ih = make_hash_n(42);
1315 let storage = make_storage(100);
1316 backend.register(ih, storage);
1317
1318 let data = vec![0xCC; 100];
1320 backend
1321 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
1322 .expect("write flushed");
1323
1324 let r1 = backend
1326 .read_chunk(ih, 0, 0, 100, false)
1327 .expect("first read");
1328 assert_eq!(&r1[..], &data[..]);
1329
1330 let r2 = backend
1332 .read_chunk(ih, 0, 0, 100, false)
1333 .expect("second read");
1334 assert_eq!(&r2[..], &data[..]);
1335
1336 let cached = backend.cached_pieces(ih);
1338 assert!(cached.contains(&0), "piece 0 should be hot after two reads");
1339
1340 let s1 = backend.stats();
1342 let _ = backend
1343 .read_chunk(ih, 0, 0, 100, false)
1344 .expect("third read");
1345 let s2 = backend.stats();
1346 assert!(
1347 s2.cache_hits > s1.cache_hits,
1348 "third read should be a cache hit: before={}, after={}",
1349 s1.cache_hits,
1350 s2.cache_hits
1351 );
1352 }
1353
1354 #[test]
1355 fn full_download_verify_cycle() {
1356 let backend = PosixDiskIo::new(&test_config());
1357 let ih = make_hash_n(43);
1358 let chunk = 16384_u32;
1359 let piece_len = u64::from(chunk) * 4; let total = piece_len * 3; let storage = make_storage_full(total, piece_len, chunk);
1362 backend.register(ih, storage);
1363
1364 for p in 0..3_u32 {
1366 let mut full_piece = Vec::new();
1367 for b in 0..4_u32 {
1368 let block = vec![(p * 4 + b) as u8; chunk as usize];
1369 full_piece.extend_from_slice(&block);
1370 backend
1371 .write_chunk(ih, p, b * chunk, Bytes::from(block), false)
1372 .unwrap_or_else(|e| panic!("write piece {p} block {b}: {e}"));
1373 }
1374 let expected = irontide_core::sha1(&full_piece);
1375 assert!(
1376 backend
1377 .hash_piece(ih, p, &expected)
1378 .unwrap_or_else(|e| panic!("hash piece {p}: {e}")),
1379 "piece {p} hash should pass"
1380 );
1381 }
1382
1383 for p in 0..3_u32 {
1385 let data = backend
1386 .read_piece(ih, p)
1387 .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1388 assert_eq!(data.len(), piece_len as usize);
1389 for b in 0..4_u32 {
1390 let expected_byte = (p * 4 + b) as u8;
1391 let start = (b * chunk) as usize;
1392 let end = start + chunk as usize;
1393 assert!(
1394 data[start..end].iter().all(|&x| x == expected_byte),
1395 "piece {p} block {b}: expected 0x{expected_byte:02X}"
1396 );
1397 }
1398 }
1399 }
1400
1401 #[test]
1402 fn skeleton_eviction_then_completion() {
1403 let config = DiskConfig {
1405 buffer_pool_capacity: 32 * 1024,
1406 ..test_config()
1407 };
1408 let backend = PosixDiskIo::new(&config);
1409 let ih = make_hash_n(44);
1410 let storage = make_storage_full(16384 * 4, 16384, 16384);
1412 backend.register(ih, storage);
1413
1414 for p in 0..4_u32 {
1415 let data = vec![p as u8; 16384];
1416 backend
1417 .write_chunk(ih, p, 0, Bytes::from(data), false)
1418 .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1419 }
1420
1421 for p in 0..4_u32 {
1423 backend
1424 .flush_piece(ih, p)
1425 .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1426 let piece = backend
1427 .read_piece(ih, p)
1428 .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1429 assert_eq!(piece.len(), 16384);
1430 }
1431
1432 let stats = backend.stats();
1434 assert!(
1435 stats.skeleton_count > 0 || stats.eviction_count > 0,
1436 "should have evictions under pressure: skeleton={}, eviction={}",
1437 stats.skeleton_count,
1438 stats.eviction_count
1439 );
1440 }
1441
1442 #[test]
1443 fn hash_from_cache_fail_no_disk_write() {
1444 let backend = PosixDiskIo::new(&test_config());
1445 let ih = make_hash_n(46);
1446 let storage = make_storage(50);
1447 backend.register(ih, storage);
1448
1449 let data = vec![0xDD; 50];
1450 backend
1451 .write_chunk(ih, 0, 0, Bytes::from(data), false)
1452 .expect("buffered write");
1453
1454 let wrong_hash = Id20([0xFF; 20]);
1456 assert!(
1457 !backend
1458 .hash_piece(ih, 0, &wrong_hash)
1459 .expect("hash_piece should not error"),
1460 "hash should fail with wrong expected value"
1461 );
1462
1463 let piece = backend.read_piece(ih, 0).expect("read_piece");
1466 assert!(
1467 piece.iter().all(|&b| b == 0),
1468 "failed piece should not be written to disk"
1469 );
1470 }
1471
1472 #[test]
1473 fn seeding_throughput_with_cache() {
1474 let backend = PosixDiskIo::new(&test_config());
1475 let ih = make_hash_n(47);
1476 let chunk = 16384_u32;
1477 let piece_len = u64::from(chunk);
1478 let num_pieces = 64_u32;
1479 let storage = make_storage_full(piece_len * u64::from(num_pieces), piece_len, chunk);
1480 backend.register(ih, storage);
1481
1482 for p in 0..num_pieces {
1484 let data = vec![p as u8; chunk as usize];
1485 backend
1486 .write_chunk(ih, p, 0, Bytes::from(data), true)
1487 .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1488 }
1489
1490 for p in 0..num_pieces {
1492 backend
1493 .read_chunk(ih, p, 0, chunk, false)
1494 .unwrap_or_else(|e| panic!("first read piece {p}: {e}"));
1495 }
1496
1497 let stats_before = backend.stats();
1499 for p in 0..num_pieces {
1500 backend
1501 .read_chunk(ih, p, 0, chunk, false)
1502 .unwrap_or_else(|e| panic!("second read piece {p}: {e}"));
1503 }
1504 let stats_after = backend.stats();
1505
1506 let hits = stats_after
1507 .cache_hits
1508 .saturating_sub(stats_before.cache_hits);
1509 let total = u64::from(num_pieces);
1510 let hit_rate = hits as f64 / total as f64;
1511
1512 assert!(
1514 hit_rate > 0.8,
1515 "cache hit rate {hit_rate:.1} should be >80% (hits={hits}, total={total})"
1516 );
1517 }
1518
1519 #[test]
1524 fn write_block_direct_contiguous() {
1525 let backend = PosixDiskIo::new(&test_config());
1526 let ih = make_hash_n(60);
1527 let storage = make_storage(100);
1528 backend.register(ih, Arc::clone(&storage));
1529
1530 let data = vec![0xCDu8; 50];
1532 backend.write_block_direct(ih, 0, 0, &data, &[]).unwrap();
1533
1534 let read = storage.read_chunk(0, 0, 50).unwrap();
1535 assert_eq!(read, data);
1536
1537 let stats = backend.stats();
1539 assert_eq!(stats.write_bytes, 50);
1540 }
1541
1542 #[test]
1543 fn write_block_direct_split() {
1544 let backend = PosixDiskIo::new(&test_config());
1545 let ih = make_hash_n(61);
1546 let storage = make_storage(100);
1547 backend.register(ih, Arc::clone(&storage));
1548
1549 let s0: Vec<u8> = (0..30).collect();
1551 let s1: Vec<u8> = (30..50).collect();
1552 backend.write_block_direct(ih, 0, 10, &s0, &s1).unwrap();
1553
1554 let read = storage.read_chunk(0, 10, 50).unwrap();
1555 let expected: Vec<u8> = (0..50).collect();
1556 assert_eq!(read, expected);
1557
1558 let stats = backend.stats();
1559 assert_eq!(stats.write_bytes, 50);
1560 }
1561}