rustybit_lib/
torrent.rs

1use std::net::SocketAddrV4;
2use std::path::PathBuf;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5
6use anyhow::Context;
7use leechy_dht::DhtRequester;
8use tokio::sync::mpsc::{self, unbounded_channel};
9use tokio::sync::{oneshot, RwLock};
10use tokio::task::JoinHandle;
11use tracing::instrument;
12use url::Url;
13
14use crate::parser::MetaInfo;
15use crate::peer_connection_manager::PeerConnectionManager;
16use crate::state::event::{PeerEvent, TorrentManagerReq};
17use crate::state::torrent::PieceState;
18use crate::stats::{Stats, DOWNLOADED_BYTES, DOWNLOADED_PIECES};
19use crate::storage::{FileInfo, StorageOp};
20use crate::torrent_meta::TorrentMeta;
21use crate::util::generate_peer_id;
22use crate::{
23    tracker, FileStorage, PieceHashVerifier, Storage, StorageManager, Torrent, TorrentFileMetadata, TorrentSharedState,
24};
25
26#[derive(Debug)]
27pub struct TorrentManager {
28    next_torrent_idx: usize,
29    http_client: reqwest::Client,
30    base_download_path: PathBuf,
31    peer_id: String,
32}
33
34impl TorrentManager {
35    pub fn new(download_path: PathBuf) -> anyhow::Result<Self> {
36        let client = reqwest::Client::builder()
37            .gzip(true)
38            .build()
39            .context("building reqwest client")?;
40        Ok(TorrentManager {
41            next_torrent_idx: 0,
42            http_client: client,
43            base_download_path: download_path,
44            peer_id: generate_peer_id(),
45        })
46    }
47
48    #[instrument(err(level = "debug"), skip_all, fields(torrent_id = self.next_torrent_idx))]
49    pub async fn add_new_torrent<'a>(
50        &mut self,
51        mut meta_info: MetaInfo<'a>,
52    ) -> anyhow::Result<Option<JoinHandle<anyhow::Result<()>>>> {
53        let splitted_piece_hashes = meta_info
54            .info
55            .pieces
56            .chunks(20)
57            .map(|item| try_into!(item, [u8; 20]))
58            .collect::<anyhow::Result<Vec<[u8; 20]>>>()?;
59
60        let file_metadata = {
61            let mut base_path = self.base_download_path.clone();
62            // Multi-file mode: add directory name
63            if meta_info.info.files.is_some() {
64                base_path.push(&*meta_info.info.name);
65            }
66            TorrentFileMetadata::new(&mut meta_info.info, &base_path).context("bug: bad metadata")?
67        };
68
69        let length = meta_info
70            .info
71            .files
72            .as_ref()
73            .map_or_else(
74                || {
75                    meta_info
76                        .info
77                        .length
78                        .map(|len| try_into!(len, usize).ok())
79                        .ok_or(anyhow::anyhow!(
80                            "Malformed torrent file: both 'files' and 'length' fields are missing",
81                        ))
82                },
83                |files| {
84                    Ok(try_into!(
85                        files.iter().fold(0, |mut acc, file| {
86                            acc += file.length;
87                            acc
88                        }),
89                        usize
90                    )
91                    .ok())
92                },
93            )?
94            .context("converting total torrent length to usize")?;
95
96        let mut storage =
97            FileStorage::new(&file_metadata.file_infos).context("error while creating a file-based storage")?;
98        let mut piece_hash_verifier = PieceHashVerifier::new(meta_info.info.piece_length);
99
100        let (verified_pieces, downloaded, left, piece_states) = match self.do_initial_checksum_verification(
101            &mut piece_hash_verifier,
102            &mut storage,
103            &file_metadata.file_infos,
104            &splitted_piece_hashes,
105            length,
106            meta_info.info.piece_length,
107        )? {
108            Some((verified_pieces, downloaded, left, piece_states)) => {
109                (verified_pieces, downloaded, left, piece_states)
110            }
111            None => return Ok(None),
112        };
113        DOWNLOADED_BYTES.store(downloaded, Ordering::Relaxed);
114        DOWNLOADED_PIECES.store(verified_pieces, Ordering::Relaxed);
115
116        let info_hash = meta_info.info.hash()?;
117        let n_of_pieces = splitted_piece_hashes.len();
118
119        let torrent_meta = TorrentMeta::new(info_hash, meta_info.info.piece_length, length, n_of_pieces);
120
121        let piece_length = try_into!(meta_info.info.piece_length, u64)?;
122        let (storage_task, storage_tx, hash_check_rx) = spawn_storage_task(
123            storage,
124            file_metadata,
125            piece_length,
126            piece_hash_verifier,
127            n_of_pieces,
128            length,
129        )
130        .context("spawning a storage manager")?;
131
132        let torrent_state = Arc::new(RwLock::new(TorrentSharedState::new(piece_states, n_of_pieces)));
133        let (torrent_task, peer_event_tx, new_peer_tx) = spawn_torrent_task(
134            torrent_meta.clone(),
135            torrent_state.clone(),
136            splitted_piece_hashes,
137            storage_tx,
138            hash_check_rx,
139        );
140
141        let (stats_task, stats_cancel_tx) = spawn_stats_task(length, downloaded, left);
142
143        let peers = self
144            .get_peers_from_tracker(
145                try_into!(length, u64)?,
146                try_into!(downloaded, u64)?,
147                &meta_info.announce,
148                info_hash,
149            )
150            .await?;
151        let peer_id: [u8; 20] = self.peer_id.as_bytes().try_into()?;
152
153        let root_handle = tokio::spawn(async move {
154            let (peer_manager_cancel_tx, peer_manager_cancel_rx) = oneshot::channel::<()>();
155            let (peer_queue_tx, peer_queue_rx) = mpsc::channel(10);
156            let mut peer_manager = PeerConnectionManager::new(
157                peer_id,
158                torrent_state,
159                torrent_meta,
160                peer_queue_rx,
161                peer_event_tx,
162                peer_manager_cancel_rx,
163                new_peer_tx,
164            );
165            let peer_manager_task = tokio::spawn(async move { peer_manager.handle(peers).await });
166
167            let (dht_cancel_tx, dht_cancel_rx) = oneshot::channel();
168
169            let mut dht_requester = DhtRequester::new(None, info_hash).context("creating DhtRequester")?;
170            let dht_requester_task =
171                tokio::spawn(async move { dht_requester.process_dht_nodes(dht_cancel_rx, peer_queue_tx).await });
172
173            torrent_task.await.context("torrent task")??;
174
175            // Finish all other tasks
176            let _ = dht_cancel_tx.send(());
177            let _ = peer_manager_cancel_tx.send(());
178            peer_manager_task
179                .await
180                .context("peer manager task panicked?")?
181                .context("peer manager")?;
182            dht_requester_task.await.context("dht requester task")??;
183            storage_task.await.context("storage task")??;
184            stats_cancel_tx
185                .send(())
186                .map_err(|_| anyhow::anyhow!("bug: stats collector exited before cancellation?"))?;
187            stats_task.await.context("stats task")?;
188
189            Ok::<(), anyhow::Error>(())
190        });
191
192        Ok(Some(root_handle))
193    }
194
195    async fn get_peers_from_tracker(
196        &self,
197        total_length: u64,
198        downloaded: u64,
199        annouce_url: &str,
200        info_hash: [u8; 20],
201    ) -> anyhow::Result<Vec<SocketAddrV4>> {
202        let request = tracker::TrackerRequest::new(
203            &self.peer_id,
204            info_hash,
205            &[None, Some(downloaded), Some(total_length)],
206            Some(tracker::EventType::Started),
207        );
208
209        let tracker_announce_url = params(annouce_url, request)?;
210
211        let response = self
212            .http_client
213            .request(reqwest::Method::GET, tracker_announce_url)
214            .header("User-Agent", "RustyBitTorrent")
215            .send()
216            .await
217            .context("sending request to the tracker")?;
218
219        let resp: tracker::TrackerResponse = serde_bencode::from_bytes(
220            &response
221                .bytes()
222                .await
223                .context("unable to get the tracker's response body")?,
224        )
225        .context("error while parsing the tracker's response")?;
226
227        match resp.get_peers() {
228            Some(peers) => Ok(peers?),
229            None => anyhow::bail!("peers are missing in the tracker response"),
230        }
231    }
232
233    fn do_initial_checksum_verification(
234        &self,
235        verifier: &mut PieceHashVerifier,
236        storage: &mut dyn Storage,
237        file_infos: &[FileInfo],
238        piece_hashes: &[[u8; 20]],
239        total_length: usize,
240        piece_length: usize,
241    ) -> anyhow::Result<Option<(usize, usize, usize, Vec<PieceState>)>> {
242        tracing::info!("Starting initial checksum verification");
243        let (verified_pieces, piece_states) = verifier
244            .check_all_pieces(storage, file_infos, piece_hashes, total_length)
245            .context("error while doing initial checksums verification")?;
246
247        if verified_pieces == piece_hashes.len() {
248            tracing::info!(%verified_pieces, "Finished initial checksum verification: already downloaded all pieces, exiting...");
249            return Ok(None);
250        } else {
251            tracing::info!(%verified_pieces, "Finished initial checksum verification: already downloaded and verified some of the pieces");
252        }
253
254        let (downloaded, left) = {
255            let downloaded_bytes = verified_pieces * piece_length;
256            let bytes_left = total_length - downloaded_bytes;
257            (downloaded_bytes, bytes_left)
258        };
259
260        Ok(Some((verified_pieces, downloaded, left, piece_states)))
261    }
262}
263
264fn spawn_storage_task(
265    mut storage: FileStorage,
266    metadata: TorrentFileMetadata,
267    piece_length: u64,
268    hash_verifier: PieceHashVerifier,
269    n_of_pieces: usize,
270    total_length: usize,
271) -> anyhow::Result<(
272    JoinHandle<anyhow::Result<()>>,
273    mpsc::Sender<StorageOp>,
274    mpsc::Receiver<(SocketAddrV4, u32, bool)>,
275)> {
276    let (storage_tx, storage_rx) = mpsc::channel(200);
277    let (hash_check_tx, hash_check_rx) = mpsc::channel(200);
278    let storage_mgr_task = tokio::task::spawn(async move {
279        let mut storage_manager = StorageManager::new(
280            &mut storage as &mut dyn Storage,
281            metadata,
282            piece_length,
283            hash_verifier,
284            n_of_pieces,
285            total_length,
286        )
287        .context("error while creating a storage manager")?;
288        storage_manager.listen_for_blocks(storage_rx, hash_check_tx).await
289    });
290    Ok((storage_mgr_task, storage_tx, hash_check_rx))
291}
292
293fn spawn_torrent_task(
294    meta: TorrentMeta,
295    state: Arc<RwLock<TorrentSharedState>>,
296    piece_hashes: Vec<[u8; 20]>,
297    storage_tx: mpsc::Sender<StorageOp>,
298    hash_check_rx: mpsc::Receiver<(SocketAddrV4, u32, bool)>,
299) -> (
300    JoinHandle<anyhow::Result<()>>,
301    mpsc::UnboundedSender<(SocketAddrV4, PeerEvent)>,
302    mpsc::UnboundedSender<(SocketAddrV4, mpsc::Sender<TorrentManagerReq>)>,
303) {
304    let (peer_event_tx, peer_event_rx) = unbounded_channel();
305    let (new_peer_tx, new_peer_rx) = unbounded_channel();
306    let torrent_task = tokio::spawn(async move {
307        let mut torrent = Torrent::new(
308            meta,
309            state,
310            piece_hashes,
311            peer_event_rx,
312            new_peer_rx,
313            storage_tx,
314            hash_check_rx,
315        );
316        torrent.handle().await?;
317        Ok::<(), anyhow::Error>(())
318    });
319    (torrent_task, peer_event_tx, new_peer_tx)
320}
321
322fn spawn_stats_task(length: usize, downloaded: usize, left: usize) -> (JoinHandle<()>, oneshot::Sender<()>) {
323    let (stats_cancel_tx, stats_cancel_rx) = oneshot::channel();
324    let stats_task = {
325        let mut stats = Stats::new(downloaded, left, length);
326        tokio::spawn(async move { stats.collect_stats(stats_cancel_rx).await })
327    };
328    (stats_task, stats_cancel_tx)
329}
330
331fn params(url: &str, request: tracker::TrackerRequest<'_>) -> anyhow::Result<Url> {
332    let mut url = Url::parse(url).context("tracker announce URL parsing")?;
333
334    let mut query = request.into_query_params()?;
335
336    // NOTE: Some trackers include additional query params in the announce URL.
337    // Some even require them to be the first ones in the query, so we have
338    // to reorder things a bit in order to be sure that this supports as much
339    // torrent trackers as possible
340    if let Some(existing_query) = url.query() {
341        query.insert_str(0, existing_query);
342        query.insert(existing_query.len(), '&');
343    }
344
345    url.set_query(Some(&query));
346
347    Ok(url)
348}