1use std::net::SocketAddrV4;
2use std::path::PathBuf;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5
6use anyhow::Context;
7use leechy_dht::DhtRequester;
8use tokio::sync::mpsc::{self, unbounded_channel};
9use tokio::sync::{oneshot, RwLock};
10use tokio::task::JoinHandle;
11use tracing::instrument;
12use url::Url;
13
14use crate::parser::MetaInfo;
15use crate::peer_connection_manager::PeerConnectionManager;
16use crate::state::event::{PeerEvent, TorrentManagerReq};
17use crate::state::torrent::PieceState;
18use crate::stats::{Stats, DOWNLOADED_BYTES, DOWNLOADED_PIECES};
19use crate::storage::{FileInfo, StorageOp};
20use crate::torrent_meta::TorrentMeta;
21use crate::util::generate_peer_id;
22use crate::{
23 tracker, FileStorage, PieceHashVerifier, Storage, StorageManager, Torrent, TorrentFileMetadata, TorrentSharedState,
24};
25
26#[derive(Debug)]
27pub struct TorrentManager {
28 next_torrent_idx: usize,
29 http_client: reqwest::Client,
30 base_download_path: PathBuf,
31 peer_id: String,
32}
33
34impl TorrentManager {
35 pub fn new(download_path: PathBuf) -> anyhow::Result<Self> {
36 let client = reqwest::Client::builder()
37 .gzip(true)
38 .build()
39 .context("building reqwest client")?;
40 Ok(TorrentManager {
41 next_torrent_idx: 0,
42 http_client: client,
43 base_download_path: download_path,
44 peer_id: generate_peer_id(),
45 })
46 }
47
48 #[instrument(err(level = "debug"), skip_all, fields(torrent_id = self.next_torrent_idx))]
49 pub async fn add_new_torrent<'a>(
50 &mut self,
51 mut meta_info: MetaInfo<'a>,
52 ) -> anyhow::Result<Option<JoinHandle<anyhow::Result<()>>>> {
53 let splitted_piece_hashes = meta_info
54 .info
55 .pieces
56 .chunks(20)
57 .map(|item| try_into!(item, [u8; 20]))
58 .collect::<anyhow::Result<Vec<[u8; 20]>>>()?;
59
60 let file_metadata = {
61 let mut base_path = self.base_download_path.clone();
62 if meta_info.info.files.is_some() {
64 base_path.push(&*meta_info.info.name);
65 }
66 TorrentFileMetadata::new(&mut meta_info.info, &base_path).context("bug: bad metadata")?
67 };
68
69 let length = meta_info
70 .info
71 .files
72 .as_ref()
73 .map_or_else(
74 || {
75 meta_info
76 .info
77 .length
78 .map(|len| try_into!(len, usize).ok())
79 .ok_or(anyhow::anyhow!(
80 "Malformed torrent file: both 'files' and 'length' fields are missing",
81 ))
82 },
83 |files| {
84 Ok(try_into!(
85 files.iter().fold(0, |mut acc, file| {
86 acc += file.length;
87 acc
88 }),
89 usize
90 )
91 .ok())
92 },
93 )?
94 .context("converting total torrent length to usize")?;
95
96 let mut storage =
97 FileStorage::new(&file_metadata.file_infos).context("error while creating a file-based storage")?;
98 let mut piece_hash_verifier = PieceHashVerifier::new(meta_info.info.piece_length);
99
100 let (verified_pieces, downloaded, left, piece_states) = match self.do_initial_checksum_verification(
101 &mut piece_hash_verifier,
102 &mut storage,
103 &file_metadata.file_infos,
104 &splitted_piece_hashes,
105 length,
106 meta_info.info.piece_length,
107 )? {
108 Some((verified_pieces, downloaded, left, piece_states)) => {
109 (verified_pieces, downloaded, left, piece_states)
110 }
111 None => return Ok(None),
112 };
113 DOWNLOADED_BYTES.store(downloaded, Ordering::Relaxed);
114 DOWNLOADED_PIECES.store(verified_pieces, Ordering::Relaxed);
115
116 let info_hash = meta_info.info.hash()?;
117 let n_of_pieces = splitted_piece_hashes.len();
118
119 let torrent_meta = TorrentMeta::new(info_hash, meta_info.info.piece_length, length, n_of_pieces);
120
121 let piece_length = try_into!(meta_info.info.piece_length, u64)?;
122 let (storage_task, storage_tx, hash_check_rx) = spawn_storage_task(
123 storage,
124 file_metadata,
125 piece_length,
126 piece_hash_verifier,
127 n_of_pieces,
128 length,
129 )
130 .context("spawning a storage manager")?;
131
132 let torrent_state = Arc::new(RwLock::new(TorrentSharedState::new(piece_states, n_of_pieces)));
133 let (torrent_task, peer_event_tx, new_peer_tx) = spawn_torrent_task(
134 torrent_meta.clone(),
135 torrent_state.clone(),
136 splitted_piece_hashes,
137 storage_tx,
138 hash_check_rx,
139 );
140
141 let (stats_task, stats_cancel_tx) = spawn_stats_task(length, downloaded, left);
142
143 let peers = self
144 .get_peers_from_tracker(
145 try_into!(length, u64)?,
146 try_into!(downloaded, u64)?,
147 &meta_info.announce,
148 info_hash,
149 )
150 .await?;
151 let peer_id: [u8; 20] = self.peer_id.as_bytes().try_into()?;
152
153 let root_handle = tokio::spawn(async move {
154 let (peer_manager_cancel_tx, peer_manager_cancel_rx) = oneshot::channel::<()>();
155 let (peer_queue_tx, peer_queue_rx) = mpsc::channel(10);
156 let mut peer_manager = PeerConnectionManager::new(
157 peer_id,
158 torrent_state,
159 torrent_meta,
160 peer_queue_rx,
161 peer_event_tx,
162 peer_manager_cancel_rx,
163 new_peer_tx,
164 );
165 let peer_manager_task = tokio::spawn(async move { peer_manager.handle(peers).await });
166
167 let (dht_cancel_tx, dht_cancel_rx) = oneshot::channel();
168
169 let mut dht_requester = DhtRequester::new(None, info_hash).context("creating DhtRequester")?;
170 let dht_requester_task =
171 tokio::spawn(async move { dht_requester.process_dht_nodes(dht_cancel_rx, peer_queue_tx).await });
172
173 torrent_task.await.context("torrent task")??;
174
175 let _ = dht_cancel_tx.send(());
177 let _ = peer_manager_cancel_tx.send(());
178 peer_manager_task
179 .await
180 .context("peer manager task panicked?")?
181 .context("peer manager")?;
182 dht_requester_task.await.context("dht requester task")??;
183 storage_task.await.context("storage task")??;
184 stats_cancel_tx
185 .send(())
186 .map_err(|_| anyhow::anyhow!("bug: stats collector exited before cancellation?"))?;
187 stats_task.await.context("stats task")?;
188
189 Ok::<(), anyhow::Error>(())
190 });
191
192 Ok(Some(root_handle))
193 }
194
195 async fn get_peers_from_tracker(
196 &self,
197 total_length: u64,
198 downloaded: u64,
199 annouce_url: &str,
200 info_hash: [u8; 20],
201 ) -> anyhow::Result<Vec<SocketAddrV4>> {
202 let request = tracker::TrackerRequest::new(
203 &self.peer_id,
204 info_hash,
205 &[None, Some(downloaded), Some(total_length)],
206 Some(tracker::EventType::Started),
207 );
208
209 let tracker_announce_url = params(annouce_url, request)?;
210
211 let response = self
212 .http_client
213 .request(reqwest::Method::GET, tracker_announce_url)
214 .header("User-Agent", "RustyBitTorrent")
215 .send()
216 .await
217 .context("sending request to the tracker")?;
218
219 let resp: tracker::TrackerResponse = serde_bencode::from_bytes(
220 &response
221 .bytes()
222 .await
223 .context("unable to get the tracker's response body")?,
224 )
225 .context("error while parsing the tracker's response")?;
226
227 match resp.get_peers() {
228 Some(peers) => Ok(peers?),
229 None => anyhow::bail!("peers are missing in the tracker response"),
230 }
231 }
232
233 fn do_initial_checksum_verification(
234 &self,
235 verifier: &mut PieceHashVerifier,
236 storage: &mut dyn Storage,
237 file_infos: &[FileInfo],
238 piece_hashes: &[[u8; 20]],
239 total_length: usize,
240 piece_length: usize,
241 ) -> anyhow::Result<Option<(usize, usize, usize, Vec<PieceState>)>> {
242 tracing::info!("Starting initial checksum verification");
243 let (verified_pieces, piece_states) = verifier
244 .check_all_pieces(storage, file_infos, piece_hashes, total_length)
245 .context("error while doing initial checksums verification")?;
246
247 if verified_pieces == piece_hashes.len() {
248 tracing::info!(%verified_pieces, "Finished initial checksum verification: already downloaded all pieces, exiting...");
249 return Ok(None);
250 } else {
251 tracing::info!(%verified_pieces, "Finished initial checksum verification: already downloaded and verified some of the pieces");
252 }
253
254 let (downloaded, left) = {
255 let downloaded_bytes = verified_pieces * piece_length;
256 let bytes_left = total_length - downloaded_bytes;
257 (downloaded_bytes, bytes_left)
258 };
259
260 Ok(Some((verified_pieces, downloaded, left, piece_states)))
261 }
262}
263
264fn spawn_storage_task(
265 mut storage: FileStorage,
266 metadata: TorrentFileMetadata,
267 piece_length: u64,
268 hash_verifier: PieceHashVerifier,
269 n_of_pieces: usize,
270 total_length: usize,
271) -> anyhow::Result<(
272 JoinHandle<anyhow::Result<()>>,
273 mpsc::Sender<StorageOp>,
274 mpsc::Receiver<(SocketAddrV4, u32, bool)>,
275)> {
276 let (storage_tx, storage_rx) = mpsc::channel(200);
277 let (hash_check_tx, hash_check_rx) = mpsc::channel(200);
278 let storage_mgr_task = tokio::task::spawn(async move {
279 let mut storage_manager = StorageManager::new(
280 &mut storage as &mut dyn Storage,
281 metadata,
282 piece_length,
283 hash_verifier,
284 n_of_pieces,
285 total_length,
286 )
287 .context("error while creating a storage manager")?;
288 storage_manager.listen_for_blocks(storage_rx, hash_check_tx).await
289 });
290 Ok((storage_mgr_task, storage_tx, hash_check_rx))
291}
292
293fn spawn_torrent_task(
294 meta: TorrentMeta,
295 state: Arc<RwLock<TorrentSharedState>>,
296 piece_hashes: Vec<[u8; 20]>,
297 storage_tx: mpsc::Sender<StorageOp>,
298 hash_check_rx: mpsc::Receiver<(SocketAddrV4, u32, bool)>,
299) -> (
300 JoinHandle<anyhow::Result<()>>,
301 mpsc::UnboundedSender<(SocketAddrV4, PeerEvent)>,
302 mpsc::UnboundedSender<(SocketAddrV4, mpsc::Sender<TorrentManagerReq>)>,
303) {
304 let (peer_event_tx, peer_event_rx) = unbounded_channel();
305 let (new_peer_tx, new_peer_rx) = unbounded_channel();
306 let torrent_task = tokio::spawn(async move {
307 let mut torrent = Torrent::new(
308 meta,
309 state,
310 piece_hashes,
311 peer_event_rx,
312 new_peer_rx,
313 storage_tx,
314 hash_check_rx,
315 );
316 torrent.handle().await?;
317 Ok::<(), anyhow::Error>(())
318 });
319 (torrent_task, peer_event_tx, new_peer_tx)
320}
321
322fn spawn_stats_task(length: usize, downloaded: usize, left: usize) -> (JoinHandle<()>, oneshot::Sender<()>) {
323 let (stats_cancel_tx, stats_cancel_rx) = oneshot::channel();
324 let stats_task = {
325 let mut stats = Stats::new(downloaded, left, length);
326 tokio::spawn(async move { stats.collect_stats(stats_cancel_rx).await })
327 };
328 (stats_task, stats_cancel_tx)
329}
330
331fn params(url: &str, request: tracker::TrackerRequest<'_>) -> anyhow::Result<Url> {
332 let mut url = Url::parse(url).context("tracker announce URL parsing")?;
333
334 let mut query = request.into_query_params()?;
335
336 if let Some(existing_query) = url.query() {
341 query.insert_str(0, existing_query);
342 query.insert(existing_query.len(), '&');
343 }
344
345 url.set_query(Some(&query));
346
347 Ok(url)
348}