rustybit_lib/storage/storage_manager/
mod.rs

1mod file_metadata;
2
3use std::net::SocketAddrV4;
4
5use anyhow::Context;
6use bittorrent_peer_protocol::Block;
7pub use file_metadata::TorrentFileMetadata;
8use tokio::sync::mpsc;
9
10use super::piece_hash_verifier::PieceHashVerifier;
11use super::util::{find_file_offsets_for_data, write_data_to_files};
12use super::{Storage, StorageOp};
13
14pub struct StorageManager<'a> {
15    storage: &'a mut dyn Storage,
16    piece_hash_verifier: PieceHashVerifier,
17    file_metadata: TorrentFileMetadata,
18    piece_length: u64,
19    number_of_pieces: usize,
20    total_torrent_length: usize,
21}
22
23impl<'a> StorageManager<'a> {
24    pub fn new(
25        storage: &'a mut dyn Storage,
26        file_metadata: TorrentFileMetadata,
27        piece_length: u64,
28        piece_hash_verifier: PieceHashVerifier,
29        number_of_pieces: usize,
30        total_torrent_length: usize,
31    ) -> anyhow::Result<Self> {
32        Ok(StorageManager {
33            storage,
34            piece_hash_verifier,
35            file_metadata,
36            piece_length,
37            number_of_pieces,
38            total_torrent_length,
39        })
40    }
41
42    pub async fn listen_for_blocks(
43        &mut self,
44        mut rx: mpsc::Receiver<StorageOp>,
45        tx: mpsc::Sender<(SocketAddrV4, u32, bool)>,
46    ) -> anyhow::Result<()> {
47        while let Some(requested_op) = rx.recv().await {
48            match requested_op {
49                StorageOp::AddBlock(Block { index, begin, block }) => {
50                    let file_offsets = find_file_offsets_for_data(
51                        &self.file_metadata.file_infos,
52                        index,
53                        self.piece_length,
54                        Some(begin),
55                    )
56                    .context("error while finding offsets for a block")?
57                    .context("bug: failed to find a matching file for a block?")?;
58
59                    write_data_to_files(self.storage, &block, file_offsets, &self.file_metadata.file_infos)
60                        .context("error while writing block to files")?;
61                }
62                StorageOp::CheckPieceHash((peer_addr, piece_idx, expected_hash)) => {
63                    let verification_result = self
64                        .piece_hash_verifier
65                        .verify_piece_hash(
66                            self.storage,
67                            self.file_metadata.file_infos.as_slice(),
68                            piece_idx,
69                            self.number_of_pieces,
70                            self.total_torrent_length,
71                            &expected_hash,
72                        )
73                        .context("error while verifying piece hash")?;
74
75                    tx.send((peer_addr, piece_idx, verification_result.unwrap_or(false)))
76                        .await
77                        .context("error sending piece hash verification result")?;
78                }
79            };
80        }
81
82        Ok(())
83    }
84}