1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_precision_loss,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 reason = "M175: disk I/O sizes bounded by piece_length (u32 by construction in Lengths::new); offset arithmetic stays within file lengths"
7)]
8
9use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::{Mutex, RwLock};
19
20use bytes::Bytes;
21use irontide_core::{Id20, Id32};
22use irontide_storage::TorrentStorage;
23
24use crate::buffer_pool::BufferPool;
25use crate::disk::DiskConfig;
26
27#[derive(Debug, Clone, Default)]
29pub struct DiskIoStats {
30 pub read_bytes: u64,
32 pub write_bytes: u64,
34 pub cache_hits: u64,
36 pub cache_misses: u64,
38 pub write_buffer_bytes: usize,
40 pub read_cache_bytes: usize,
42 pub pool_entries: usize,
44 pub prefetch_count: u64,
46 pub eviction_count: u64,
48 pub skeleton_count: u64,
50}
51
52pub trait DiskIoBackend: Send + Sync {
57 fn name(&self) -> &str;
59
60 fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>);
62
63 fn unregister(&self, info_hash: Id20);
65
66 fn write_chunk(
72 &self,
73 info_hash: Id20,
74 piece: u32,
75 begin: u32,
76 data: Bytes,
77 flush: bool,
78 ) -> crate::Result<()>;
79
80 fn read_chunk(
86 &self,
87 info_hash: Id20,
88 piece: u32,
89 begin: u32,
90 length: u32,
91 volatile: bool,
92 ) -> crate::Result<Bytes>;
93
94 fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>>;
102
103 fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool>;
109
110 fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool>;
116
117 fn hash_block(
123 &self,
124 info_hash: Id20,
125 piece: u32,
126 begin: u32,
127 length: u32,
128 ) -> crate::Result<Id32>;
129
130 fn clear_piece(&self, info_hash: Id20, piece: u32);
132
133 fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()>;
139
140 fn flush_all(&self) -> crate::Result<()>;
146
147 fn cached_pieces(&self, info_hash: Id20) -> Vec<u32>;
149
150 fn stats(&self) -> DiskIoStats;
152
153 fn write_block_direct(
165 &self,
166 info_hash: Id20,
167 piece: u32,
168 begin: u32,
169 s0: &[u8],
170 s1: &[u8],
171 ) -> crate::Result<()>;
172}
173
174pub struct DisabledDiskIo;
179
180impl DiskIoBackend for DisabledDiskIo {
181 fn name(&self) -> &'static str {
182 "disabled"
183 }
184
185 fn register(&self, _info_hash: Id20, _storage: Arc<dyn TorrentStorage>) {}
186
187 fn unregister(&self, _info_hash: Id20) {}
188
189 fn write_chunk(
190 &self,
191 _info_hash: Id20,
192 _piece: u32,
193 _begin: u32,
194 _data: Bytes,
195 _flush: bool,
196 ) -> crate::Result<()> {
197 Ok(())
198 }
199
200 fn read_chunk(
201 &self,
202 _info_hash: Id20,
203 _piece: u32,
204 _begin: u32,
205 length: u32,
206 _volatile: bool,
207 ) -> crate::Result<Bytes> {
208 Ok(Bytes::from(vec![0u8; length as usize]))
209 }
210
211 fn read_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<Vec<u8>> {
212 Ok(Vec::new())
213 }
214
215 fn hash_piece(&self, _info_hash: Id20, _piece: u32, _expected: &Id20) -> crate::Result<bool> {
216 Ok(true)
217 }
218
219 fn hash_piece_v2(
220 &self,
221 _info_hash: Id20,
222 _piece: u32,
223 _expected: &Id32,
224 ) -> crate::Result<bool> {
225 Ok(true)
226 }
227
228 fn hash_block(
229 &self,
230 _info_hash: Id20,
231 _piece: u32,
232 _begin: u32,
233 _length: u32,
234 ) -> crate::Result<Id32> {
235 Ok(Id32([0u8; 32]))
236 }
237
238 fn clear_piece(&self, _info_hash: Id20, _piece: u32) {}
239
240 fn flush_piece(&self, _info_hash: Id20, _piece: u32) -> crate::Result<()> {
241 Ok(())
242 }
243
244 fn flush_all(&self) -> crate::Result<()> {
245 Ok(())
246 }
247
248 fn cached_pieces(&self, _info_hash: Id20) -> Vec<u32> {
249 vec![]
250 }
251
252 fn stats(&self) -> DiskIoStats {
253 DiskIoStats::default()
254 }
255
256 fn write_block_direct(
257 &self,
258 _info_hash: Id20,
259 _piece: u32,
260 _begin: u32,
261 _s0: &[u8],
262 _s1: &[u8],
263 ) -> crate::Result<()> {
264 Ok(())
265 }
266}
267
268pub struct PosixDiskIo {
279 storages: RwLock<HashMap<Id20, Arc<dyn TorrentStorage>>>,
280 pool: Mutex<BufferPool>,
281 stats: Mutex<DiskIoStats>,
282}
283
284impl PosixDiskIo {
285 #[must_use]
287 pub fn new(config: &DiskConfig) -> Self {
288 let mut pool = BufferPool::with_capacity(config.buffer_pool_capacity);
289 pool.set_mlock(config.enable_mlock);
290 Self {
291 storages: RwLock::new(HashMap::new()),
292 pool: Mutex::new(pool),
293 stats: Mutex::new(DiskIoStats::default()),
294 }
295 }
296
297 fn get_storage(&self, info_hash: Id20) -> crate::Result<Arc<dyn TorrentStorage>> {
298 self.storages
299 .read()
300 .get(&info_hash)
301 .cloned()
302 .ok_or_else(|| {
303 crate::Error::Storage(irontide_storage::Error::Io(std::io::Error::new(
304 std::io::ErrorKind::NotFound,
305 "torrent not registered",
306 )))
307 })
308 }
309}
310
311impl DiskIoBackend for PosixDiskIo {
312 fn name(&self) -> &'static str {
313 "posix"
314 }
315
316 fn register(&self, info_hash: Id20, storage: Arc<dyn TorrentStorage>) {
317 self.storages.write().insert(info_hash, storage);
318 }
319
320 fn unregister(&self, info_hash: Id20) {
321 self.storages.write().remove(&info_hash);
322 self.pool.lock().clear_torrent(info_hash);
323 }
324
325 fn write_chunk(
326 &self,
327 info_hash: Id20,
328 piece: u32,
329 begin: u32,
330 data: Bytes,
331 flush: bool,
332 ) -> crate::Result<()> {
333 let len = data.len();
334
335 if flush {
336 let storage = self.get_storage(info_hash)?;
337 storage.write_chunk(piece, begin, &data)?;
338 self.stats.lock().write_bytes += len as u64;
339 return Ok(());
340 }
341
342 let key = (info_hash, piece);
346 self.pool.lock().write_block(key, begin, data, 0);
347 self.stats.lock().write_bytes += len as u64;
348 Ok(())
349 }
350
351 fn read_chunk(
352 &self,
353 info_hash: Id20,
354 piece: u32,
355 begin: u32,
356 length: u32,
357 volatile: bool,
358 ) -> crate::Result<Bytes> {
359 let key = (info_hash, piece);
360
361 if !volatile {
363 let mut pool = self.pool.lock();
364 if let Some(data) = pool.read_block(key, begin, length as usize) {
365 drop(pool);
366 self.stats.lock().cache_hits += 1;
367 return Ok(data);
368 }
369 }
370 self.stats.lock().cache_misses += 1;
371
372 let storage = self.get_storage(info_hash)?;
374 if !volatile {
375 if let Ok(piece_data) = storage.read_piece(piece) {
376 let piece_bytes = Bytes::from(piece_data);
377 self.stats.lock().read_bytes += piece_bytes.len() as u64;
378
379 let mut pool = self.pool.lock();
380 pool.prefetch_piece(key, piece_bytes.clone());
381 drop(pool);
382
383 let end = (begin as usize).saturating_add(length as usize);
385 if end <= piece_bytes.len() {
386 return Ok(piece_bytes.slice(begin as usize..end));
387 }
388 } else {
391 }
393 }
394
395 let data = storage.read_chunk(piece, begin, length)?;
397 let bytes = Bytes::from(data);
398 self.stats.lock().read_bytes += bytes.len() as u64;
399 Ok(bytes)
400 }
401
402 fn read_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<Vec<u8>> {
403 self.flush_piece(info_hash, piece)?;
404 let storage = self.get_storage(info_hash)?;
405 let data = storage.read_piece(piece)?;
406 self.stats.lock().read_bytes += data.len() as u64;
407 Ok(data)
408 }
409
410 fn hash_piece(&self, info_hash: Id20, piece: u32, expected: &Id20) -> crate::Result<bool> {
411 let key = (info_hash, piece);
412
413 let cached_data = {
418 let mut pool = self.pool.lock();
419 pool.take_all_blocks(key)
420 };
421
422 if let Some(data) = cached_data {
423 let hash = irontide_core::sha1(&data);
424 if hash == *expected {
425 let storage = self.get_storage(info_hash)?;
427 storage.write_chunk(piece, 0, &data)?;
428 self.stats.lock().write_bytes += data.len() as u64;
429
430 let mut pool = self.pool.lock();
431 pool.promote_to_cached(key, Bytes::from(data));
432 return Ok(true);
433 }
434 return Ok(false);
436 }
437
438 self.flush_piece(info_hash, piece)?;
440 let storage = self.get_storage(info_hash)?;
441 Ok(storage.verify_piece(piece, expected)?)
442 }
443
444 fn hash_piece_v2(&self, info_hash: Id20, piece: u32, expected: &Id32) -> crate::Result<bool> {
445 let key = (info_hash, piece);
446
447 let cached_data = {
449 let mut pool = self.pool.lock();
450 pool.take_all_blocks(key)
451 };
452
453 if let Some(data) = cached_data {
454 let hash = irontide_core::sha256(&data);
455 if hash == *expected {
456 let storage = self.get_storage(info_hash)?;
457 storage.write_chunk(piece, 0, &data)?;
458 self.stats.lock().write_bytes += data.len() as u64;
459
460 let mut pool = self.pool.lock();
461 pool.promote_to_cached(key, Bytes::from(data));
462 return Ok(true);
463 }
464 return Ok(false);
465 }
466
467 self.flush_piece(info_hash, piece)?;
468 let storage = self.get_storage(info_hash)?;
469 Ok(storage.verify_piece_v2(piece, expected)?)
470 }
471
472 fn hash_block(
473 &self,
474 info_hash: Id20,
475 piece: u32,
476 begin: u32,
477 length: u32,
478 ) -> crate::Result<Id32> {
479 self.flush_piece(info_hash, piece)?;
480 let storage = self.get_storage(info_hash)?;
481 Ok(storage.hash_block(piece, begin, length)?)
482 }
483
484 fn clear_piece(&self, info_hash: Id20, piece: u32) {
485 self.pool.lock().clear_piece((info_hash, piece));
486 }
487
488 fn flush_piece(&self, info_hash: Id20, piece: u32) -> crate::Result<()> {
489 let blocks = {
490 let mut pool = self.pool.lock();
491 match pool.flush_piece((info_hash, piece)) {
492 Some(b) => b,
493 None => return Ok(()),
494 }
495 };
496 let storage = self.get_storage(info_hash)?;
497 for (begin, data) in blocks {
498 storage.write_chunk(piece, begin, &data)?;
499 }
500 Ok(())
501 }
502
503 fn flush_all(&self) -> crate::Result<()> {
504 let all_blocks = {
505 let mut pool = self.pool.lock();
506 pool.flush_all()
507 };
508 for ((info_hash, piece), blocks) in all_blocks {
509 let storage = self.get_storage(info_hash)?;
510 for (begin, data) in blocks {
511 storage.write_chunk(piece, begin, &data)?;
512 }
513 }
514 Ok(())
515 }
516
517 fn cached_pieces(&self, info_hash: Id20) -> Vec<u32> {
518 let pool = self.pool.lock();
519 pool.hot_pieces(info_hash)
520 }
521
522 fn stats(&self) -> DiskIoStats {
523 let mut s = self.stats.lock().clone();
524 let pool = self.pool.lock();
525 let ps = pool.stats();
526 s.write_buffer_bytes = ps.write_buffer_bytes;
527 s.read_cache_bytes = ps.read_cache_bytes;
528 s.pool_entries = ps.total_entries;
529 s.prefetch_count = ps.prefetch_count;
530 s.eviction_count = ps.eviction_count;
531 s.skeleton_count = ps.skeleton_count;
532 s
533 }
534
535 fn write_block_direct(
536 &self,
537 info_hash: Id20,
538 piece: u32,
539 begin: u32,
540 s0: &[u8],
541 s1: &[u8],
542 ) -> crate::Result<()> {
543 let storage = self.get_storage(info_hash)?;
544 storage.write_chunk_vectored(piece, begin, s0, s1)?;
545 let total = (s0.len() + s1.len()) as u64;
546 self.stats.lock().write_bytes += total;
547 Ok(())
548 }
549}
550
551#[must_use]
563pub fn create_backend_from_config(config: &DiskConfig) -> Arc<dyn DiskIoBackend> {
564 Arc::new(PosixDiskIo::new(config))
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570 use irontide_core::Lengths;
571 use irontide_storage::MemoryStorage;
572
573 fn make_hash() -> Id20 {
574 Id20([0xAB; 20])
575 }
576
577 fn make_hash_n(n: u8) -> Id20 {
578 let mut b = [0u8; 20];
579 b[0] = n;
580 Id20(b)
581 }
582
583 fn test_config() -> DiskConfig {
584 DiskConfig {
585 io_threads: 2,
586 cache_size: 1024 * 1024, ..DiskConfig::default()
588 }
589 }
590
591 fn make_storage(piece_size: u64) -> Arc<dyn TorrentStorage> {
593 let chunk = piece_size.min(16384) as u32;
594 let lengths = Lengths::new(piece_size, piece_size, chunk);
595 Arc::new(MemoryStorage::new(lengths))
596 }
597
598 fn make_storage_full(total: u64, piece_len: u64, chunk_size: u32) -> Arc<dyn TorrentStorage> {
600 let lengths = Lengths::new(total, piece_len, chunk_size);
601 Arc::new(MemoryStorage::new(lengths))
602 }
603
604 #[test]
609 fn disabled_backend_name() {
610 let backend = DisabledDiskIo;
611 assert_eq!(backend.name(), "disabled");
612 }
613
614 #[test]
615 fn disabled_backend_write_succeeds() {
616 let backend = DisabledDiskIo;
617 let result =
618 backend.write_chunk(make_hash(), 0, 0, Bytes::from_static(&[1, 2, 3, 4]), false);
619 assert!(result.is_ok());
620 }
621
622 #[test]
623 fn disabled_backend_read_returns_zeroed() {
624 let backend = DisabledDiskIo;
625 let length = 16384u32;
626 let data = backend
627 .read_chunk(make_hash(), 0, 0, length, false)
628 .unwrap();
629 assert_eq!(data.len(), length as usize);
630 assert!(data.iter().all(|&b| b == 0));
631 }
632
633 #[test]
634 fn disabled_backend_hash_always_passes() {
635 let backend = DisabledDiskIo;
636 let expected = Id20([0xFF; 20]);
637 let result = backend.hash_piece(make_hash(), 42, &expected).unwrap();
638 assert!(result);
639 }
640
641 #[test]
642 fn disabled_backend_hash_v2_always_passes() {
643 let backend = DisabledDiskIo;
644 let expected = Id32([0xFF; 32]);
645 let result = backend.hash_piece_v2(make_hash(), 42, &expected).unwrap();
646 assert!(result);
647 }
648
649 #[test]
650 fn disabled_backend_stats_default() {
651 let backend = DisabledDiskIo;
652 let stats = backend.stats();
653 assert_eq!(stats.read_bytes, 0);
654 assert_eq!(stats.write_bytes, 0);
655 assert_eq!(stats.cache_hits, 0);
656 assert_eq!(stats.cache_misses, 0);
657 assert_eq!(stats.write_buffer_bytes, 0);
658 }
659
660 #[test]
661 fn disabled_backend_cached_pieces_empty() {
662 let backend = DisabledDiskIo;
663 let pieces = backend.cached_pieces(make_hash());
664 assert!(pieces.is_empty());
665 }
666
667 #[test]
672 fn posix_backend_name() {
673 let backend = PosixDiskIo::new(&test_config());
674 assert_eq!(backend.name(), "posix");
675 }
676
677 #[test]
678 fn posix_register_unregister() {
679 let backend = PosixDiskIo::new(&test_config());
680 let ih = make_hash_n(1);
681 let storage = make_storage(100);
682 backend.register(ih, storage);
683
684 assert!(backend.read_chunk(ih, 0, 0, 10, false).is_ok());
686
687 backend.unregister(ih);
688
689 assert!(backend.read_chunk(ih, 0, 0, 10, false).is_err());
691 }
692
693 #[test]
694 fn posix_write_and_read_flush() {
695 let backend = PosixDiskIo::new(&test_config());
696 let ih = make_hash_n(2);
697 let storage = make_storage(100);
698 backend.register(ih, storage);
699
700 let data = vec![42u8; 50];
701 backend
702 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
703 .unwrap();
704 let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
705 assert_eq!(&read[..], &data[..]);
706 }
707
708 #[test]
709 fn posix_write_buffered_then_read() {
710 let backend = PosixDiskIo::new(&test_config());
711 let ih = make_hash_n(3);
712 let storage = make_storage(100);
713 backend.register(ih, storage);
714
715 let data = vec![99u8; 50];
716 backend
718 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), false)
719 .unwrap();
720
721 let read = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
723 assert_eq!(&read[..], &data[..]);
724
725 let stats = backend.stats();
727 assert!(stats.cache_hits >= 1);
728 }
729
730 #[test]
731 fn posix_read_cache_hit() {
732 let backend = PosixDiskIo::new(&test_config());
733 let ih = make_hash_n(4);
734 let storage = make_storage(100);
735 backend.register(ih, storage);
736
737 let data = vec![7u8; 50];
738 backend
739 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
740 .unwrap();
741
742 let r1 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
744 assert_eq!(&r1[..], &data[..]);
745 let s1 = backend.stats();
746 assert_eq!(s1.cache_misses, 1);
747
748 let r2 = backend.read_chunk(ih, 0, 0, 50, false).unwrap();
750 assert_eq!(&r2[..], &data[..]);
751 let s2 = backend.stats();
752 assert_eq!(s2.cache_hits, 1);
753 }
754
755 #[test]
756 fn posix_hash_piece_correct() {
757 let backend = PosixDiskIo::new(&test_config());
758 let ih = make_hash_n(5);
759 let storage = make_storage(50);
760 backend.register(ih, storage);
761
762 let data = vec![9u8; 50];
763 backend
764 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
765 .unwrap();
766
767 let expected = irontide_core::sha1(&data);
768 assert!(backend.hash_piece(ih, 0, &expected).unwrap());
769 }
770
771 #[test]
772 fn posix_hash_piece_wrong() {
773 let backend = PosixDiskIo::new(&test_config());
774 let ih = make_hash_n(6);
775 let storage = make_storage(50);
776 backend.register(ih, storage);
777
778 let data = vec![9u8; 50];
779 backend
780 .write_chunk(ih, 0, 0, Bytes::from(data), true)
781 .unwrap();
782
783 let wrong = Id20([0xFF; 20]);
784 assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
785 }
786
787 #[test]
788 fn posix_hash_piece_v2() {
789 let backend = PosixDiskIo::new(&test_config());
790 let ih = make_hash_n(7);
791 let storage = make_storage(16384);
792 backend.register(ih, storage);
793
794 let data = vec![0xABu8; 16384];
795 backend
796 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
797 .unwrap();
798
799 let expected = irontide_core::sha256(&data);
800 assert!(backend.hash_piece_v2(ih, 0, &expected).unwrap());
801 }
802
803 #[test]
804 fn posix_clear_piece_drops_buffer() {
805 let backend = PosixDiskIo::new(&test_config());
806 let ih = make_hash_n(8);
807 let storage = make_storage(100);
808 backend.register(ih, storage);
809
810 let data = vec![55u8; 50];
811 backend
813 .write_chunk(ih, 0, 0, Bytes::from(data), false)
814 .unwrap();
815 assert!(backend.stats().write_buffer_bytes > 0);
816
817 backend.clear_piece(ih, 0);
818 assert_eq!(backend.stats().write_buffer_bytes, 0);
819 }
820
821 #[test]
822 fn posix_cached_pieces() {
823 let backend = PosixDiskIo::new(&test_config());
824 let ih = make_hash_n(9);
825 let storage = make_storage_full(100, 50, 25);
827 backend.register(ih, storage);
828
829 let data = vec![1u8; 25];
830 backend
831 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
832 .unwrap();
833 backend
834 .write_chunk(ih, 1, 0, Bytes::from(data), true)
835 .unwrap();
836
837 backend.read_chunk(ih, 0, 0, 25, false).unwrap();
839 backend.read_chunk(ih, 0, 0, 25, false).unwrap();
840
841 let cached = backend.cached_pieces(ih);
842 assert!(cached.contains(&0));
843 assert!(!cached.contains(&1));
845 }
846
847 #[test]
852 fn read_piece_returns_full_piece() {
853 let backend = PosixDiskIo::new(&test_config());
854 let ih = make_hash_n(20);
855 let chunk_size = 16384u32;
856 let piece_size = u64::from(chunk_size) * 2; let storage = make_storage_full(piece_size, piece_size, chunk_size);
858 backend.register(ih, storage);
859
860 let chunk0 = vec![0xAAu8; chunk_size as usize];
862 let chunk1 = vec![0xBBu8; chunk_size as usize];
863 backend
864 .write_chunk(ih, 0, 0, Bytes::from(chunk0.clone()), false)
865 .expect("write chunk 0");
866 backend
867 .write_chunk(ih, 0, chunk_size, Bytes::from(chunk1.clone()), false)
868 .expect("write chunk 1");
869
870 let piece_data = backend.read_piece(ih, 0).expect("read_piece");
872 assert_eq!(piece_data.len(), piece_size as usize);
873 assert_eq!(&piece_data[..chunk_size as usize], &chunk0[..]);
874 assert_eq!(&piece_data[chunk_size as usize..], &chunk1[..]);
875
876 let stats = backend.stats();
878 assert!(stats.read_bytes >= piece_size);
879 }
880
881 #[test]
886 fn posix_hash_from_cache_pass() {
887 let backend = PosixDiskIo::new(&test_config());
888 let ih = make_hash_n(30);
889 let storage = make_storage(100);
890 backend.register(ih, storage);
891
892 let d1 = vec![0xAA_u8; 50];
894 let d2 = vec![0xBB_u8; 50];
895 backend
896 .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
897 .unwrap();
898 backend
899 .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
900 .unwrap();
901
902 let mut full = d1;
904 full.extend_from_slice(&d2);
905 let expected = irontide_core::sha1(&full);
906
907 assert!(backend.hash_piece(ih, 0, &expected).unwrap());
909
910 let piece_data = backend.read_piece(ih, 0).unwrap();
912 assert_eq!(piece_data.len(), 100);
913
914 backend.read_chunk(ih, 0, 0, 50, false).unwrap();
916
917 let cached = backend.cached_pieces(ih);
919 assert!(cached.contains(&0));
920 }
921
922 #[test]
923 fn posix_hash_from_cache_fail() {
924 let backend = PosixDiskIo::new(&test_config());
925 let ih = make_hash_n(31);
926 let storage = make_storage(50);
927 backend.register(ih, storage);
928
929 backend
931 .write_chunk(ih, 0, 0, Bytes::from(vec![0xCC_u8; 50]), false)
932 .unwrap();
933
934 let wrong = Id20([0xFF; 20]);
936 assert!(!backend.hash_piece(ih, 0, &wrong).unwrap());
937
938 assert_eq!(backend.stats().write_buffer_bytes, 0);
940 }
941
942 #[test]
947 fn factory_creates_posix() {
948 let config = DiskConfig::default();
949 let backend = create_backend_from_config(&config);
950 assert_eq!(backend.name(), "posix");
951 }
952
953 #[test]
958 fn piece_completion_flows_to_hash_pool() {
959 let backend = PosixDiskIo::new(&test_config());
960 let ih = make_hash_n(40);
961 let storage = make_storage(100);
962 backend.register(ih, storage);
963
964 let d1 = vec![0xAA; 50];
966 let d2 = vec![0xBB; 50];
967 backend
968 .write_chunk(ih, 0, 0, Bytes::from(d1.clone()), false)
969 .expect("write block 0");
970 backend
971 .write_chunk(ih, 0, 50, Bytes::from(d2.clone()), false)
972 .expect("write block 1");
973
974 let mut full = d1;
975 full.extend_from_slice(&d2);
976 let expected_hash = irontide_core::sha1(&full);
977
978 assert!(
980 backend
981 .hash_piece(ih, 0, &expected_hash)
982 .expect("hash_piece"),
983 "hash should match"
984 );
985
986 let piece = backend.read_piece(ih, 0).expect("read_piece after hash");
988 assert_eq!(piece, full);
989
990 let _ = backend
993 .read_chunk(ih, 0, 0, 100, false)
994 .expect("second read");
995 let cached = backend.cached_pieces(ih);
996 assert!(
997 cached.contains(&0),
998 "piece 0 should be in hot pieces after two accesses"
999 );
1000 }
1001
1002 #[test]
1003 fn back_pressure_flush_reaches_disk() {
1004 let config = DiskConfig {
1006 buffer_pool_capacity: 64 * 1024,
1007 ..test_config()
1008 };
1009 let backend = PosixDiskIo::new(&config);
1010 let ih = make_hash_n(41);
1011 let piece_size = 16384_u64;
1012 let storage = make_storage_full(piece_size * 10, piece_size, 16384);
1013 backend.register(ih, storage);
1014
1015 for p in 0..8_u32 {
1019 let data = vec![p as u8; piece_size as usize];
1020 backend
1021 .write_chunk(ih, p, 0, Bytes::from(data), false)
1022 .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1023 backend
1024 .flush_piece(ih, p)
1025 .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1026 }
1027
1028 for p in 0..8_u32 {
1030 let piece = backend
1031 .read_piece(ih, p)
1032 .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1033 assert_eq!(piece.len(), piece_size as usize);
1034 assert!(
1035 piece.iter().all(|&b| b == p as u8),
1036 "piece {p} data mismatch"
1037 );
1038 }
1039
1040 for p in 0..6_u32 {
1043 let data = vec![(p + 10) as u8; piece_size as usize];
1044 backend
1045 .write_chunk(ih, p, 0, Bytes::from(data), false)
1046 .unwrap_or_else(|e| panic!("overwrite piece {p}: {e}"));
1047 }
1048 let stats = backend.stats();
1049 assert!(
1050 stats.skeleton_count > 0 || stats.eviction_count > 0,
1051 "should see evictions when exceeding budget: skeleton={}, eviction={}",
1052 stats.skeleton_count,
1053 stats.eviction_count
1054 );
1055 }
1056
1057 #[test]
1058 fn prefetch_then_suggest_then_serve() {
1059 let backend = PosixDiskIo::new(&test_config());
1060 let ih = make_hash_n(42);
1061 let storage = make_storage(100);
1062 backend.register(ih, storage);
1063
1064 let data = vec![0xCC; 100];
1066 backend
1067 .write_chunk(ih, 0, 0, Bytes::from(data.clone()), true)
1068 .expect("write flushed");
1069
1070 let r1 = backend
1072 .read_chunk(ih, 0, 0, 100, false)
1073 .expect("first read");
1074 assert_eq!(&r1[..], &data[..]);
1075
1076 let r2 = backend
1078 .read_chunk(ih, 0, 0, 100, false)
1079 .expect("second read");
1080 assert_eq!(&r2[..], &data[..]);
1081
1082 let cached = backend.cached_pieces(ih);
1084 assert!(cached.contains(&0), "piece 0 should be hot after two reads");
1085
1086 let s1 = backend.stats();
1088 let _ = backend
1089 .read_chunk(ih, 0, 0, 100, false)
1090 .expect("third read");
1091 let s2 = backend.stats();
1092 assert!(
1093 s2.cache_hits > s1.cache_hits,
1094 "third read should be a cache hit: before={}, after={}",
1095 s1.cache_hits,
1096 s2.cache_hits
1097 );
1098 }
1099
1100 #[test]
1101 fn full_download_verify_cycle() {
1102 let backend = PosixDiskIo::new(&test_config());
1103 let ih = make_hash_n(43);
1104 let chunk = 16384_u32;
1105 let piece_len = u64::from(chunk) * 4; let total = piece_len * 3; let storage = make_storage_full(total, piece_len, chunk);
1108 backend.register(ih, storage);
1109
1110 for p in 0..3_u32 {
1112 let mut full_piece = Vec::new();
1113 for b in 0..4_u32 {
1114 let block = vec![(p * 4 + b) as u8; chunk as usize];
1115 full_piece.extend_from_slice(&block);
1116 backend
1117 .write_chunk(ih, p, b * chunk, Bytes::from(block), false)
1118 .unwrap_or_else(|e| panic!("write piece {p} block {b}: {e}"));
1119 }
1120 let expected = irontide_core::sha1(&full_piece);
1121 assert!(
1122 backend
1123 .hash_piece(ih, p, &expected)
1124 .unwrap_or_else(|e| panic!("hash piece {p}: {e}")),
1125 "piece {p} hash should pass"
1126 );
1127 }
1128
1129 for p in 0..3_u32 {
1131 let data = backend
1132 .read_piece(ih, p)
1133 .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1134 assert_eq!(data.len(), piece_len as usize);
1135 for b in 0..4_u32 {
1136 let expected_byte = (p * 4 + b) as u8;
1137 let start = (b * chunk) as usize;
1138 let end = start + chunk as usize;
1139 assert!(
1140 data[start..end].iter().all(|&x| x == expected_byte),
1141 "piece {p} block {b}: expected 0x{expected_byte:02X}"
1142 );
1143 }
1144 }
1145 }
1146
1147 #[test]
1148 fn skeleton_eviction_then_completion() {
1149 let config = DiskConfig {
1151 buffer_pool_capacity: 32 * 1024,
1152 ..test_config()
1153 };
1154 let backend = PosixDiskIo::new(&config);
1155 let ih = make_hash_n(44);
1156 let storage = make_storage_full(16384 * 4, 16384, 16384);
1158 backend.register(ih, storage);
1159
1160 for p in 0..4_u32 {
1161 let data = vec![p as u8; 16384];
1162 backend
1163 .write_chunk(ih, p, 0, Bytes::from(data), false)
1164 .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1165 }
1166
1167 for p in 0..4_u32 {
1169 backend
1170 .flush_piece(ih, p)
1171 .unwrap_or_else(|e| panic!("flush piece {p}: {e}"));
1172 let piece = backend
1173 .read_piece(ih, p)
1174 .unwrap_or_else(|e| panic!("read piece {p}: {e}"));
1175 assert_eq!(piece.len(), 16384);
1176 }
1177
1178 let stats = backend.stats();
1180 assert!(
1181 stats.skeleton_count > 0 || stats.eviction_count > 0,
1182 "should have evictions under pressure: skeleton={}, eviction={}",
1183 stats.skeleton_count,
1184 stats.eviction_count
1185 );
1186 }
1187
1188 #[test]
1189 fn hash_from_cache_fail_no_disk_write() {
1190 let backend = PosixDiskIo::new(&test_config());
1191 let ih = make_hash_n(46);
1192 let storage = make_storage(50);
1193 backend.register(ih, storage);
1194
1195 let data = vec![0xDD; 50];
1196 backend
1197 .write_chunk(ih, 0, 0, Bytes::from(data), false)
1198 .expect("buffered write");
1199
1200 let wrong_hash = Id20([0xFF; 20]);
1202 assert!(
1203 !backend
1204 .hash_piece(ih, 0, &wrong_hash)
1205 .expect("hash_piece should not error"),
1206 "hash should fail with wrong expected value"
1207 );
1208
1209 let piece = backend.read_piece(ih, 0).expect("read_piece");
1212 assert!(
1213 piece.iter().all(|&b| b == 0),
1214 "failed piece should not be written to disk"
1215 );
1216 }
1217
1218 #[test]
1219 fn seeding_throughput_with_cache() {
1220 let backend = PosixDiskIo::new(&test_config());
1221 let ih = make_hash_n(47);
1222 let chunk = 16384_u32;
1223 let piece_len = u64::from(chunk);
1224 let num_pieces = 64_u32;
1225 let storage = make_storage_full(piece_len * u64::from(num_pieces), piece_len, chunk);
1226 backend.register(ih, storage);
1227
1228 for p in 0..num_pieces {
1230 let data = vec![p as u8; chunk as usize];
1231 backend
1232 .write_chunk(ih, p, 0, Bytes::from(data), true)
1233 .unwrap_or_else(|e| panic!("write piece {p}: {e}"));
1234 }
1235
1236 for p in 0..num_pieces {
1238 backend
1239 .read_chunk(ih, p, 0, chunk, false)
1240 .unwrap_or_else(|e| panic!("first read piece {p}: {e}"));
1241 }
1242
1243 let stats_before = backend.stats();
1245 for p in 0..num_pieces {
1246 backend
1247 .read_chunk(ih, p, 0, chunk, false)
1248 .unwrap_or_else(|e| panic!("second read piece {p}: {e}"));
1249 }
1250 let stats_after = backend.stats();
1251
1252 let hits = stats_after
1253 .cache_hits
1254 .saturating_sub(stats_before.cache_hits);
1255 let total = u64::from(num_pieces);
1256 let hit_rate = hits as f64 / total as f64;
1257
1258 assert!(
1260 hit_rate > 0.8,
1261 "cache hit rate {hit_rate:.1} should be >80% (hits={hits}, total={total})"
1262 );
1263 }
1264
1265 #[test]
1270 fn write_block_direct_contiguous() {
1271 let backend = PosixDiskIo::new(&test_config());
1272 let ih = make_hash_n(60);
1273 let storage = make_storage(100);
1274 backend.register(ih, Arc::clone(&storage));
1275
1276 let data = vec![0xCDu8; 50];
1278 backend.write_block_direct(ih, 0, 0, &data, &[]).unwrap();
1279
1280 let read = storage.read_chunk(0, 0, 50).unwrap();
1281 assert_eq!(read, data);
1282
1283 let stats = backend.stats();
1285 assert_eq!(stats.write_bytes, 50);
1286 }
1287
1288 #[test]
1289 fn write_block_direct_split() {
1290 let backend = PosixDiskIo::new(&test_config());
1291 let ih = make_hash_n(61);
1292 let storage = make_storage(100);
1293 backend.register(ih, Arc::clone(&storage));
1294
1295 let s0: Vec<u8> = (0..30).collect();
1297 let s1: Vec<u8> = (30..50).collect();
1298 backend.write_block_direct(ih, 0, 10, &s0, &s1).unwrap();
1299
1300 let read = storage.read_chunk(0, 10, 50).unwrap();
1301 let expected: Vec<u8> = (0..50).collect();
1302 assert_eq!(read, expected);
1303
1304 let stats = backend.stats();
1305 assert_eq!(stats.write_bytes, 50);
1306 }
1307}