rustybit_lib/storage/storage_manager/
mod.rs1mod file_metadata;
2
3use std::net::SocketAddrV4;
4
5use anyhow::Context;
6use bittorrent_peer_protocol::Block;
7pub use file_metadata::TorrentFileMetadata;
8use tokio::sync::mpsc;
9
10use super::piece_hash_verifier::PieceHashVerifier;
11use super::util::{find_file_offsets_for_data, write_data_to_files};
12use super::{Storage, StorageOp};
13
14pub struct StorageManager<'a> {
15 storage: &'a mut dyn Storage,
16 piece_hash_verifier: PieceHashVerifier,
17 file_metadata: TorrentFileMetadata,
18 piece_length: u64,
19 number_of_pieces: usize,
20 total_torrent_length: usize,
21}
22
23impl<'a> StorageManager<'a> {
24 pub fn new(
25 storage: &'a mut dyn Storage,
26 file_metadata: TorrentFileMetadata,
27 piece_length: u64,
28 piece_hash_verifier: PieceHashVerifier,
29 number_of_pieces: usize,
30 total_torrent_length: usize,
31 ) -> anyhow::Result<Self> {
32 Ok(StorageManager {
33 storage,
34 piece_hash_verifier,
35 file_metadata,
36 piece_length,
37 number_of_pieces,
38 total_torrent_length,
39 })
40 }
41
42 pub async fn listen_for_blocks(
43 &mut self,
44 mut rx: mpsc::Receiver<StorageOp>,
45 tx: mpsc::Sender<(SocketAddrV4, u32, bool)>,
46 ) -> anyhow::Result<()> {
47 while let Some(requested_op) = rx.recv().await {
48 match requested_op {
49 StorageOp::AddBlock(Block { index, begin, block }) => {
50 let file_offsets = find_file_offsets_for_data(
51 &self.file_metadata.file_infos,
52 index,
53 self.piece_length,
54 Some(begin),
55 )
56 .context("error while finding offsets for a block")?
57 .context("bug: failed to find a matching file for a block?")?;
58
59 write_data_to_files(self.storage, &block, file_offsets, &self.file_metadata.file_infos)
60 .context("error while writing block to files")?;
61 }
62 StorageOp::CheckPieceHash((peer_addr, piece_idx, expected_hash)) => {
63 let verification_result = self
64 .piece_hash_verifier
65 .verify_piece_hash(
66 self.storage,
67 self.file_metadata.file_infos.as_slice(),
68 piece_idx,
69 self.number_of_pieces,
70 self.total_torrent_length,
71 &expected_hash,
72 )
73 .context("error while verifying piece hash")?;
74
75 tx.send((peer_addr, piece_idx, verification_result.unwrap_or(false)))
76 .await
77 .context("error sending piece hash verification result")?;
78 }
79 };
80 }
81
82 Ok(())
83 }
84}