1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_possible_wrap,
4 clippy::cast_sign_loss,
5 clippy::unchecked_time_subtraction,
6 reason = "M175: BEP 9 ut_metadata — piece counts bounded by metadata size; remaining time-sub sites are test fixtures"
7)]
8
9use std::collections::{HashMap, HashSet};
10use std::net::SocketAddr;
11use std::time::{Duration, Instant};
12
13use bytes::Bytes;
14use irontide_core::Id20;
15
16const METADATA_PIECE_SIZE: u64 = 16384;
18
19const METADATA_BASE_TIMEOUT_SECS: u64 = 5;
21
22const METADATA_MAX_TIMEOUT_SECS: u64 = 60;
24
25#[allow(dead_code)]
34pub struct MetadataDownloader {
35 info_hash: Id20,
36 total_size: Option<u64>,
37 pieces: HashMap<u32, Bytes>,
38 num_pieces: Option<u32>,
39 requested_peers: HashMap<SocketAddr, HashSet<u32>>,
41 rejected_peers: HashSet<SocketAddr>,
43 piece_request_times: HashMap<u32, Instant>,
45 piece_retry_count: HashMap<u32, u32>,
47}
48
49#[allow(dead_code)]
50impl MetadataDownloader {
51 #[must_use]
53 pub fn new(info_hash: Id20) -> Self {
54 Self {
55 info_hash,
56 total_size: None,
57 pieces: HashMap::new(),
58 num_pieces: None,
59 requested_peers: HashMap::new(),
60 rejected_peers: HashSet::new(),
61 piece_request_times: HashMap::new(),
62 piece_retry_count: HashMap::new(),
63 }
64 }
65
66 pub fn set_total_size(&mut self, size: u64) {
68 self.total_size = Some(size);
69 self.num_pieces = Some(size.div_ceil(METADATA_PIECE_SIZE) as u32);
70 }
71
72 pub fn piece_received(&mut self, piece: u32, data: Bytes) -> bool {
74 self.pieces.insert(piece, data);
75 match self.num_pieces {
76 Some(n) => self.pieces.len() == n as usize,
77 None => false,
78 }
79 }
80
81 pub fn assemble_and_verify(&self) -> crate::Result<Vec<u8>> {
88 let num_pieces = self
89 .num_pieces
90 .ok_or_else(|| crate::Error::Connection("metadata incomplete".to_string()))?;
91
92 if self.pieces.len() != num_pieces as usize {
93 return Err(crate::Error::Connection("metadata incomplete".to_string()));
94 }
95
96 let mut assembled = Vec::with_capacity(self.total_size.unwrap_or(0) as usize);
97 for i in 0..num_pieces {
98 let piece = self
99 .pieces
100 .get(&i)
101 .ok_or_else(|| crate::Error::Connection("metadata incomplete".to_string()))?;
102 assembled.extend_from_slice(piece);
103 }
104
105 let hash = irontide_core::sha1(&assembled);
106 if hash != self.info_hash {
107 return Err(crate::Error::MetadataHashMismatch);
108 }
109
110 Ok(assembled)
111 }
112
113 #[must_use]
117 pub fn missing_pieces(&self) -> Vec<u32> {
118 match self.num_pieces {
119 None => Vec::new(),
120 Some(n) => (0..n).filter(|i| !self.pieces.contains_key(i)).collect(),
121 }
122 }
123
124 pub fn mark_rejected(&mut self, peer: SocketAddr) {
129 self.rejected_peers.insert(peer);
130 self.requested_peers.remove(&peer);
131 }
132
133 #[must_use]
135 pub fn is_rejected(&self, peer: &SocketAddr) -> bool {
136 self.rejected_peers.contains(peer)
137 }
138
139 pub fn request_all_from_peer(&mut self, peer: SocketAddr) -> Vec<u32> {
145 if self.rejected_peers.contains(&peer) {
146 return Vec::new();
147 }
148
149 let missing = self.missing_pieces();
150 if missing.is_empty() {
151 return Vec::new();
152 }
153
154 let now = Instant::now();
155 let peer_set = self.requested_peers.entry(peer).or_default();
156 for &piece in &missing {
157 peer_set.insert(piece);
158 self.piece_request_times.insert(piece, now);
160 self.piece_retry_count.remove(&piece);
161 }
162
163 missing
164 }
165
166 #[must_use]
169 pub fn timed_out_pieces(&self) -> Vec<u32> {
170 let now = Instant::now();
171 self.piece_request_times
172 .iter()
173 .filter(|(piece, requested_at)| {
174 if self.pieces.contains_key(piece) {
175 return false;
176 }
177 let retries = self.piece_retry_count.get(piece).copied().unwrap_or(0);
178 let clamped = retries.min(12);
179 let timeout_secs = METADATA_BASE_TIMEOUT_SECS
180 .saturating_mul(1u64 << clamped)
181 .min(METADATA_MAX_TIMEOUT_SECS);
182 let timeout = Duration::from_secs(timeout_secs);
183 now.duration_since(**requested_at) >= timeout
184 })
185 .map(|(piece, _)| *piece)
186 .collect()
187 }
188
189 pub fn reset_request_time(&mut self, piece: u32) {
191 self.piece_request_times.insert(piece, Instant::now());
192 *self.piece_retry_count.entry(piece).or_insert(0) += 1;
193 }
194
195 #[must_use]
197 pub fn has_active_peers(&self) -> bool {
198 self.requested_peers
199 .keys()
200 .any(|peer| !self.rejected_peers.contains(peer))
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use irontide_core::Id20;
208
209 #[test]
210 fn new_empty() {
211 let info_hash = Id20::ZERO;
212 let dl = MetadataDownloader::new(info_hash);
213 assert!(dl.total_size.is_none());
214 assert!(dl.num_pieces.is_none());
215 assert!(dl.pieces.is_empty());
216 assert!(dl.requested_peers.is_empty());
217 assert!(dl.rejected_peers.is_empty());
218 assert!(dl.piece_request_times.is_empty());
219 }
220
221 #[test]
222 fn set_total_size_calculates_num_pieces() {
223 let mut dl = MetadataDownloader::new(Id20::ZERO);
224
225 dl.set_total_size(32768);
226 assert_eq!(dl.num_pieces, Some(2));
227
228 dl.set_total_size(16384);
229 assert_eq!(dl.num_pieces, Some(1));
230
231 dl.set_total_size(16385);
232 assert_eq!(dl.num_pieces, Some(2));
233 }
234
235 #[test]
236 fn single_piece_metadata() {
237 let mut dl = MetadataDownloader::new(Id20::ZERO);
238 dl.set_total_size(100);
239 let complete = dl.piece_received(0, Bytes::from(vec![0u8; 100]));
240 assert!(complete);
241 }
242
243 #[test]
244 fn multi_piece_metadata() {
245 let mut dl = MetadataDownloader::new(Id20::ZERO);
246 dl.set_total_size(32768); let complete = dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
249 assert!(!complete);
250
251 let complete = dl.piece_received(1, Bytes::from(vec![0u8; 16384]));
252 assert!(complete);
253 }
254
255 #[test]
256 fn piece_received_returns_false_when_incomplete() {
257 let mut dl = MetadataDownloader::new(Id20::ZERO);
258 dl.set_total_size(32768); let complete = dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
261 assert!(!complete);
262 }
263
264 #[test]
265 fn assemble_and_verify_correct_hash() {
266 let data = b"hello world metadata test data!!";
268 let info_hash = irontide_core::sha1(data);
269
270 let mut dl = MetadataDownloader::new(info_hash);
271 dl.set_total_size(data.len() as u64);
272 dl.piece_received(0, Bytes::from(data.to_vec()));
273
274 let result = dl.assemble_and_verify().unwrap();
275 assert_eq!(result, data);
276 }
277
278 #[test]
279 fn assemble_and_verify_wrong_hash() {
280 let data = b"hello world metadata test data!!";
281 let wrong_hash = Id20::ZERO;
283
284 let mut dl = MetadataDownloader::new(wrong_hash);
285 dl.set_total_size(data.len() as u64);
286 dl.piece_received(0, Bytes::from(data.to_vec()));
287
288 let result = dl.assemble_and_verify();
289 assert!(result.is_err());
290 let err = result.unwrap_err();
291 assert!(
292 matches!(err, crate::Error::MetadataHashMismatch),
293 "expected MetadataHashMismatch, got: {err:?}"
294 );
295 }
296
297 #[test]
300 fn metadata_full_redundancy_all_pieces_to_each_peer() {
301 let mut dl = MetadataDownloader::new(Id20::ZERO);
302 dl.set_total_size(32768); let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
305 let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
306
307 let pieces_a = dl.request_all_from_peer(peer_a);
309 assert_eq!(pieces_a, vec![0, 1]);
310
311 let pieces_b = dl.request_all_from_peer(peer_b);
312 assert_eq!(pieces_b, vec![0, 1]);
313
314 assert!(dl.requested_peers.contains_key(&peer_a));
316 assert!(dl.requested_peers.contains_key(&peer_b));
317 assert_eq!(dl.requested_peers[&peer_a].len(), 2);
318 assert_eq!(dl.requested_peers[&peer_b].len(), 2);
319
320 assert!(dl.piece_request_times.contains_key(&0));
322 assert!(dl.piece_request_times.contains_key(&1));
323 }
324
325 #[test]
326 fn metadata_reject_blacklists_peer() {
327 let mut dl = MetadataDownloader::new(Id20::ZERO);
328 dl.set_total_size(32768); let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
331 let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
332
333 let _ = dl.request_all_from_peer(peer_a);
335 let _ = dl.request_all_from_peer(peer_b);
336
337 dl.mark_rejected(peer_a);
339
340 assert!(dl.is_rejected(&peer_a));
341 assert!(!dl.is_rejected(&peer_b));
342
343 assert!(!dl.requested_peers.contains_key(&peer_a));
345
346 let pieces = dl.request_all_from_peer(peer_a);
348 assert!(pieces.is_empty());
349
350 let pieces = dl.request_all_from_peer(peer_b);
352 assert_eq!(pieces, vec![0, 1]);
353 }
354
355 #[test]
356 fn metadata_timeout_triggers_rerequest() {
357 let mut dl = MetadataDownloader::new(Id20::ZERO);
358 dl.set_total_size(32768); let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
361 let _ = dl.request_all_from_peer(peer_a);
362
363 let old_time = Instant::now() - Duration::from_secs(10);
365 dl.piece_request_times.insert(0, old_time);
366 dl.piece_request_times.insert(1, old_time);
367
368 let timed_out = dl.timed_out_pieces();
370 assert_eq!(timed_out.len(), 2);
371 assert!(timed_out.contains(&0));
372 assert!(timed_out.contains(&1));
373
374 dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
376 let timed_out = dl.timed_out_pieces();
377 assert_eq!(timed_out, vec![1]);
378 }
379
380 #[test]
381 fn metadata_parallel_fetch_from_multiple_peers() {
382 let data = b"hello world metadata test data!!";
384 let info_hash = irontide_core::sha1(data);
385
386 let mut dl = MetadataDownloader::new(info_hash);
387 dl.set_total_size(data.len() as u64); let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
390 let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
391
392 let pieces_a = dl.request_all_from_peer(peer_a);
394 let pieces_b = dl.request_all_from_peer(peer_b);
395 assert_eq!(pieces_a, vec![0]);
396 assert_eq!(pieces_b, vec![0]);
397
398 let complete = dl.piece_received(0, Bytes::from(data.to_vec()));
400 assert!(complete);
401
402 let result = dl.assemble_and_verify().unwrap();
404 assert_eq!(result, data);
405 }
406
407 #[test]
408 fn metadata_parallel_multi_piece_assembly() {
409 let data = vec![0xAA_u8; 16384 * 2 + 100]; let info_hash = irontide_core::sha1(&data);
413
414 let mut dl = MetadataDownloader::new(info_hash);
415 dl.set_total_size(data.len() as u64);
416 assert_eq!(dl.num_pieces, Some(3));
417
418 let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
419 let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
420
421 let _ = dl.request_all_from_peer(peer_a);
422 let _ = dl.request_all_from_peer(peer_b);
423
424 assert!(!dl.piece_received(0, Bytes::from(data[..16384].to_vec())));
426 assert!(!dl.piece_received(1, Bytes::from(data[16384..32768].to_vec())));
428 assert!(dl.piece_received(2, Bytes::from(data[32768..].to_vec())));
430
431 let result = dl.assemble_and_verify().unwrap();
432 assert_eq!(result, data);
433 }
434
435 #[test]
436 fn has_active_peers_reflects_state() {
437 let mut dl = MetadataDownloader::new(Id20::ZERO);
438 dl.set_total_size(16384);
439
440 assert!(!dl.has_active_peers());
442
443 let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
444 let _ = dl.request_all_from_peer(peer_a);
445 assert!(dl.has_active_peers());
446
447 dl.mark_rejected(peer_a);
449 assert!(!dl.has_active_peers());
450 }
451
452 #[test]
453 fn request_all_from_peer_skips_received_pieces() {
454 let mut dl = MetadataDownloader::new(Id20::ZERO);
455 dl.set_total_size(32768); dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
459
460 let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
461 let pieces = dl.request_all_from_peer(peer);
462 assert_eq!(pieces, vec![1]);
464 }
465
466 #[test]
467 fn reset_request_time_updates_timestamp() {
468 let mut dl = MetadataDownloader::new(Id20::ZERO);
469 dl.set_total_size(16384);
470
471 let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
472 let _ = dl.request_all_from_peer(peer);
473
474 let old_time = Instant::now() - Duration::from_secs(10);
476 dl.piece_request_times.insert(0, old_time);
477
478 assert_eq!(dl.timed_out_pieces().len(), 1);
480
481 dl.reset_request_time(0);
483
484 assert!(dl.timed_out_pieces().is_empty());
486 }
487
488 #[test]
489 fn backoff_increases_timeout_per_retry() {
490 let mut dl = MetadataDownloader::new(Id20::ZERO);
491 dl.set_total_size(16384);
492
493 let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
494 let _ = dl.request_all_from_peer(peer);
495
496 let expected = [5, 10, 20, 40, 60];
498 for (i, &expected_secs) in expected.iter().enumerate() {
499 let backdate = Duration::from_secs(expected_secs);
500 dl.piece_request_times.insert(0, Instant::now() - backdate);
501 assert_eq!(
502 dl.timed_out_pieces().len(),
503 1,
504 "retry {i}: should time out after {expected_secs}s"
505 );
506 dl.reset_request_time(0);
507 }
508 }
509
510 #[test]
511 fn backoff_capped_at_60s() {
512 let mut dl = MetadataDownloader::new(Id20::ZERO);
513 dl.set_total_size(16384);
514
515 let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
516 let _ = dl.request_all_from_peer(peer);
517
518 for _ in 0..20 {
520 dl.reset_request_time(0);
521 }
522
523 dl.piece_request_times
525 .insert(0, Instant::now() - Duration::from_secs(59));
526 assert!(dl.timed_out_pieces().is_empty());
527
528 dl.piece_request_times
530 .insert(0, Instant::now() - Duration::from_secs(61));
531 assert_eq!(dl.timed_out_pieces().len(), 1);
532 }
533
534 #[test]
535 fn independent_retry_counts_per_piece() {
536 let mut dl = MetadataDownloader::new(Id20::ZERO);
537 dl.set_total_size(32768); let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
540 let _ = dl.request_all_from_peer(peer);
541
542 dl.reset_request_time(0);
544 dl.reset_request_time(0);
545 dl.reset_request_time(0);
546
547 let backdate = Instant::now() - Duration::from_secs(6);
550 dl.piece_request_times.insert(0, backdate);
551 dl.piece_request_times.insert(1, backdate);
552
553 let timed_out = dl.timed_out_pieces();
554 assert_eq!(timed_out, vec![1]);
556 }
557}