Skip to main content

irontide_engine/
metadata.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_possible_wrap,
4    clippy::cast_sign_loss,
5    clippy::unchecked_time_subtraction,
6    reason = "M175: BEP 9 ut_metadata — piece counts bounded by metadata size; remaining time-sub sites are test fixtures"
7)]
8
9use std::collections::{HashMap, HashSet};
10use std::net::SocketAddr;
11use std::time::{Duration, Instant};
12
13use bytes::Bytes;
14use irontide_core::Id20;
15
16/// BEP 9 metadata piece size: 16 KiB.
17const METADATA_PIECE_SIZE: u64 = 16384;
18
19/// Base timeout for metadata piece requests (seconds).
20const METADATA_BASE_TIMEOUT_SECS: u64 = 5;
21
22/// Maximum timeout for metadata piece requests (seconds, caps exponential backoff).
23const METADATA_MAX_TIMEOUT_SECS: u64 = 60;
24
25/// State machine for downloading torrent metadata via BEP 9 (magnet links).
26///
27/// Metadata is split into 16 KiB pieces. Once all pieces are received,
28/// they are assembled and verified against the `info_hash`.
29///
30/// Supports full-redundancy parallel fetch: every peer that advertises
31/// `ut_metadata` is sent requests for ALL missing pieces. The first
32/// complete set (SHA1-verified) wins.
33#[allow(dead_code)]
34pub struct MetadataDownloader {
35    info_hash: Id20,
36    total_size: Option<u64>,
37    pieces: HashMap<u32, Bytes>,
38    num_pieces: Option<u32>,
39    /// Peers that have been sent metadata requests (peer -> set of requested pieces).
40    requested_peers: HashMap<SocketAddr, HashSet<u32>>,
41    /// Peers that rejected our metadata requests — don't request from them again.
42    rejected_peers: HashSet<SocketAddr>,
43    /// When each piece was last requested (for timeout detection).
44    piece_request_times: HashMap<u32, Instant>,
45    /// Per-piece retry count for exponential backoff.
46    piece_retry_count: HashMap<u32, u32>,
47}
48
49#[allow(dead_code)]
50impl MetadataDownloader {
51    /// Create a new downloader with no size known yet.
52    #[must_use]
53    pub fn new(info_hash: Id20) -> Self {
54        Self {
55            info_hash,
56            total_size: None,
57            pieces: HashMap::new(),
58            num_pieces: None,
59            requested_peers: HashMap::new(),
60            rejected_peers: HashSet::new(),
61            piece_request_times: HashMap::new(),
62            piece_retry_count: HashMap::new(),
63        }
64    }
65
66    /// Set the total metadata size and calculate the number of pieces.
67    pub fn set_total_size(&mut self, size: u64) {
68        self.total_size = Some(size);
69        self.num_pieces = Some(size.div_ceil(METADATA_PIECE_SIZE) as u32);
70    }
71
72    /// Store a received piece. Returns `true` if all pieces have been received.
73    pub fn piece_received(&mut self, piece: u32, data: Bytes) -> bool {
74        self.pieces.insert(piece, data);
75        match self.num_pieces {
76            Some(n) => self.pieces.len() == n as usize,
77            None => false,
78        }
79    }
80
81    /// Concatenate all pieces in order and verify the SHA1 hash matches `info_hash`.
82    ///
83    /// Returns the assembled metadata bytes on success.
84    ///
85    /// # Errors
86    /// Returns an error if the reassembled metadata's SHA-1 does not match the info-hash.
87    pub fn assemble_and_verify(&self) -> crate::Result<Vec<u8>> {
88        let num_pieces = self
89            .num_pieces
90            .ok_or_else(|| crate::Error::Connection("metadata incomplete".to_string()))?;
91
92        if self.pieces.len() != num_pieces as usize {
93            return Err(crate::Error::Connection("metadata incomplete".to_string()));
94        }
95
96        let mut assembled = Vec::with_capacity(self.total_size.unwrap_or(0) as usize);
97        for i in 0..num_pieces {
98            let piece = self
99                .pieces
100                .get(&i)
101                .ok_or_else(|| crate::Error::Connection("metadata incomplete".to_string()))?;
102            assembled.extend_from_slice(piece);
103        }
104
105        let hash = irontide_core::sha1(&assembled);
106        if hash != self.info_hash {
107            return Err(crate::Error::MetadataHashMismatch);
108        }
109
110        Ok(assembled)
111    }
112
113    /// Return sorted list of piece indices we don't have yet.
114    ///
115    /// Returns an empty vec if `num_pieces` is not yet known.
116    #[must_use]
117    pub fn missing_pieces(&self) -> Vec<u32> {
118        match self.num_pieces {
119            None => Vec::new(),
120            Some(n) => (0..n).filter(|i| !self.pieces.contains_key(i)).collect(),
121        }
122    }
123
124    /// Mark a peer as having rejected metadata requests.
125    ///
126    /// The peer is added to the rejected set and removed from the active
127    /// requested set. No further requests will be sent to this peer.
128    pub fn mark_rejected(&mut self, peer: SocketAddr) {
129        self.rejected_peers.insert(peer);
130        self.requested_peers.remove(&peer);
131    }
132
133    /// Check whether a peer has been rejected.
134    #[must_use]
135    pub fn is_rejected(&self, peer: &SocketAddr) -> bool {
136        self.rejected_peers.contains(peer)
137    }
138
139    /// Request all missing pieces from a peer (full redundancy).
140    ///
141    /// Returns the list of piece indices to request from this peer.
142    /// Skips pieces we already have and rejects requests to blacklisted peers.
143    /// Records the peer in `requested_peers` and updates `piece_request_times`.
144    pub fn request_all_from_peer(&mut self, peer: SocketAddr) -> Vec<u32> {
145        if self.rejected_peers.contains(&peer) {
146            return Vec::new();
147        }
148
149        let missing = self.missing_pieces();
150        if missing.is_empty() {
151            return Vec::new();
152        }
153
154        let now = Instant::now();
155        let peer_set = self.requested_peers.entry(peer).or_default();
156        for &piece in &missing {
157            peer_set.insert(piece);
158            // D3: fresh peer gets fresh timer, not inherited stale backoff
159            self.piece_request_times.insert(piece, now);
160            self.piece_retry_count.remove(&piece);
161        }
162
163        missing
164    }
165
166    /// Return piece indices whose last request time exceeds their per-piece
167    /// backoff timeout. Backoff is exponential: `min(60s, 5s * 2^retry_count)`.
168    #[must_use]
169    pub fn timed_out_pieces(&self) -> Vec<u32> {
170        let now = Instant::now();
171        self.piece_request_times
172            .iter()
173            .filter(|(piece, requested_at)| {
174                if self.pieces.contains_key(piece) {
175                    return false;
176                }
177                let retries = self.piece_retry_count.get(piece).copied().unwrap_or(0);
178                let clamped = retries.min(12);
179                let timeout_secs = METADATA_BASE_TIMEOUT_SECS
180                    .saturating_mul(1u64 << clamped)
181                    .min(METADATA_MAX_TIMEOUT_SECS);
182                let timeout = Duration::from_secs(timeout_secs);
183                now.duration_since(**requested_at) >= timeout
184            })
185            .map(|(piece, _)| *piece)
186            .collect()
187    }
188
189    /// Reset the request time for a piece and increment its retry count.
190    pub fn reset_request_time(&mut self, piece: u32) {
191        self.piece_request_times.insert(piece, Instant::now());
192        *self.piece_retry_count.entry(piece).or_insert(0) += 1;
193    }
194
195    /// Whether any non-rejected peers have outstanding requests.
196    #[must_use]
197    pub fn has_active_peers(&self) -> bool {
198        self.requested_peers
199            .keys()
200            .any(|peer| !self.rejected_peers.contains(peer))
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use irontide_core::Id20;
208
209    #[test]
210    fn new_empty() {
211        let info_hash = Id20::ZERO;
212        let dl = MetadataDownloader::new(info_hash);
213        assert!(dl.total_size.is_none());
214        assert!(dl.num_pieces.is_none());
215        assert!(dl.pieces.is_empty());
216        assert!(dl.requested_peers.is_empty());
217        assert!(dl.rejected_peers.is_empty());
218        assert!(dl.piece_request_times.is_empty());
219    }
220
221    #[test]
222    fn set_total_size_calculates_num_pieces() {
223        let mut dl = MetadataDownloader::new(Id20::ZERO);
224
225        dl.set_total_size(32768);
226        assert_eq!(dl.num_pieces, Some(2));
227
228        dl.set_total_size(16384);
229        assert_eq!(dl.num_pieces, Some(1));
230
231        dl.set_total_size(16385);
232        assert_eq!(dl.num_pieces, Some(2));
233    }
234
235    #[test]
236    fn single_piece_metadata() {
237        let mut dl = MetadataDownloader::new(Id20::ZERO);
238        dl.set_total_size(100);
239        let complete = dl.piece_received(0, Bytes::from(vec![0u8; 100]));
240        assert!(complete);
241    }
242
243    #[test]
244    fn multi_piece_metadata() {
245        let mut dl = MetadataDownloader::new(Id20::ZERO);
246        dl.set_total_size(32768); // 2 pieces
247
248        let complete = dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
249        assert!(!complete);
250
251        let complete = dl.piece_received(1, Bytes::from(vec![0u8; 16384]));
252        assert!(complete);
253    }
254
255    #[test]
256    fn piece_received_returns_false_when_incomplete() {
257        let mut dl = MetadataDownloader::new(Id20::ZERO);
258        dl.set_total_size(32768); // 2 pieces
259
260        let complete = dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
261        assert!(!complete);
262    }
263
264    #[test]
265    fn assemble_and_verify_correct_hash() {
266        // Create known test data that fits in a single piece.
267        let data = b"hello world metadata test data!!";
268        let info_hash = irontide_core::sha1(data);
269
270        let mut dl = MetadataDownloader::new(info_hash);
271        dl.set_total_size(data.len() as u64);
272        dl.piece_received(0, Bytes::from(data.to_vec()));
273
274        let result = dl.assemble_and_verify().unwrap();
275        assert_eq!(result, data);
276    }
277
278    #[test]
279    fn assemble_and_verify_wrong_hash() {
280        let data = b"hello world metadata test data!!";
281        // Use a wrong info_hash (all zeros will not match).
282        let wrong_hash = Id20::ZERO;
283
284        let mut dl = MetadataDownloader::new(wrong_hash);
285        dl.set_total_size(data.len() as u64);
286        dl.piece_received(0, Bytes::from(data.to_vec()));
287
288        let result = dl.assemble_and_verify();
289        assert!(result.is_err());
290        let err = result.unwrap_err();
291        assert!(
292            matches!(err, crate::Error::MetadataHashMismatch),
293            "expected MetadataHashMismatch, got: {err:?}"
294        );
295    }
296
297    // --- M107: New tests for parallel metadata fetch ---
298
299    #[test]
300    fn metadata_full_redundancy_all_pieces_to_each_peer() {
301        let mut dl = MetadataDownloader::new(Id20::ZERO);
302        dl.set_total_size(32768); // 2 pieces
303
304        let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
305        let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
306
307        // Both peers should get all missing pieces (full redundancy).
308        let pieces_a = dl.request_all_from_peer(peer_a);
309        assert_eq!(pieces_a, vec![0, 1]);
310
311        let pieces_b = dl.request_all_from_peer(peer_b);
312        assert_eq!(pieces_b, vec![0, 1]);
313
314        // Both peers recorded in requested_peers.
315        assert!(dl.requested_peers.contains_key(&peer_a));
316        assert!(dl.requested_peers.contains_key(&peer_b));
317        assert_eq!(dl.requested_peers[&peer_a].len(), 2);
318        assert_eq!(dl.requested_peers[&peer_b].len(), 2);
319
320        // Request times set for both pieces.
321        assert!(dl.piece_request_times.contains_key(&0));
322        assert!(dl.piece_request_times.contains_key(&1));
323    }
324
325    #[test]
326    fn metadata_reject_blacklists_peer() {
327        let mut dl = MetadataDownloader::new(Id20::ZERO);
328        dl.set_total_size(32768); // 2 pieces
329
330        let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
331        let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
332
333        // Request from both peers.
334        let _ = dl.request_all_from_peer(peer_a);
335        let _ = dl.request_all_from_peer(peer_b);
336
337        // Reject peer_a.
338        dl.mark_rejected(peer_a);
339
340        assert!(dl.is_rejected(&peer_a));
341        assert!(!dl.is_rejected(&peer_b));
342
343        // Rejected peer removed from requested_peers.
344        assert!(!dl.requested_peers.contains_key(&peer_a));
345
346        // No further requests to rejected peer.
347        let pieces = dl.request_all_from_peer(peer_a);
348        assert!(pieces.is_empty());
349
350        // peer_b still works.
351        let pieces = dl.request_all_from_peer(peer_b);
352        assert_eq!(pieces, vec![0, 1]);
353    }
354
355    #[test]
356    fn metadata_timeout_triggers_rerequest() {
357        let mut dl = MetadataDownloader::new(Id20::ZERO);
358        dl.set_total_size(32768); // 2 pieces
359
360        let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
361        let _ = dl.request_all_from_peer(peer_a);
362
363        // Simulate time passing by backdating the request times.
364        let old_time = Instant::now() - Duration::from_secs(10);
365        dl.piece_request_times.insert(0, old_time);
366        dl.piece_request_times.insert(1, old_time);
367
368        // Both pieces should be timed out (base 5s, no retries yet).
369        let timed_out = dl.timed_out_pieces();
370        assert_eq!(timed_out.len(), 2);
371        assert!(timed_out.contains(&0));
372        assert!(timed_out.contains(&1));
373
374        // After receiving piece 0, only piece 1 should time out.
375        dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
376        let timed_out = dl.timed_out_pieces();
377        assert_eq!(timed_out, vec![1]);
378    }
379
380    #[test]
381    fn metadata_parallel_fetch_from_multiple_peers() {
382        // Two peers each provide different pieces — assembly succeeds.
383        let data = b"hello world metadata test data!!";
384        let info_hash = irontide_core::sha1(data);
385
386        let mut dl = MetadataDownloader::new(info_hash);
387        dl.set_total_size(data.len() as u64); // 1 piece (< 16 KiB)
388
389        let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
390        let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
391
392        // Both peers get piece 0.
393        let pieces_a = dl.request_all_from_peer(peer_a);
394        let pieces_b = dl.request_all_from_peer(peer_b);
395        assert_eq!(pieces_a, vec![0]);
396        assert_eq!(pieces_b, vec![0]);
397
398        // Peer A delivers first — completes the download.
399        let complete = dl.piece_received(0, Bytes::from(data.to_vec()));
400        assert!(complete);
401
402        // Assembly works despite peer B never delivering.
403        let result = dl.assemble_and_verify().unwrap();
404        assert_eq!(result, data);
405    }
406
407    #[test]
408    fn metadata_parallel_multi_piece_assembly() {
409        // 3-piece metadata: peer A delivers piece 0, peer B delivers piece 1,
410        // peer A delivers piece 2. Verifies cross-peer assembly.
411        let data = vec![0xAA_u8; 16384 * 2 + 100]; // 2 full pieces + partial = 3 pieces
412        let info_hash = irontide_core::sha1(&data);
413
414        let mut dl = MetadataDownloader::new(info_hash);
415        dl.set_total_size(data.len() as u64);
416        assert_eq!(dl.num_pieces, Some(3));
417
418        let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
419        let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
420
421        let _ = dl.request_all_from_peer(peer_a);
422        let _ = dl.request_all_from_peer(peer_b);
423
424        // Peer A delivers piece 0.
425        assert!(!dl.piece_received(0, Bytes::from(data[..16384].to_vec())));
426        // Peer B delivers piece 1.
427        assert!(!dl.piece_received(1, Bytes::from(data[16384..32768].to_vec())));
428        // Peer A delivers piece 2 — completes.
429        assert!(dl.piece_received(2, Bytes::from(data[32768..].to_vec())));
430
431        let result = dl.assemble_and_verify().unwrap();
432        assert_eq!(result, data);
433    }
434
435    #[test]
436    fn has_active_peers_reflects_state() {
437        let mut dl = MetadataDownloader::new(Id20::ZERO);
438        dl.set_total_size(16384);
439
440        // No peers yet.
441        assert!(!dl.has_active_peers());
442
443        let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
444        let _ = dl.request_all_from_peer(peer_a);
445        assert!(dl.has_active_peers());
446
447        // Reject the only active peer.
448        dl.mark_rejected(peer_a);
449        assert!(!dl.has_active_peers());
450    }
451
452    #[test]
453    fn request_all_from_peer_skips_received_pieces() {
454        let mut dl = MetadataDownloader::new(Id20::ZERO);
455        dl.set_total_size(32768); // 2 pieces
456
457        // Receive piece 0 first.
458        dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
459
460        let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
461        let pieces = dl.request_all_from_peer(peer);
462        // Only piece 1 should be requested.
463        assert_eq!(pieces, vec![1]);
464    }
465
466    #[test]
467    fn reset_request_time_updates_timestamp() {
468        let mut dl = MetadataDownloader::new(Id20::ZERO);
469        dl.set_total_size(16384);
470
471        let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
472        let _ = dl.request_all_from_peer(peer);
473
474        // Backdate the request time past the base 5s timeout.
475        let old_time = Instant::now() - Duration::from_secs(10);
476        dl.piece_request_times.insert(0, old_time);
477
478        // Piece should be timed out.
479        assert_eq!(dl.timed_out_pieces().len(), 1);
480
481        // Reset the time — this also increments retry count to 1 (10s backoff).
482        dl.reset_request_time(0);
483
484        // No longer timed out (fresh timestamp, need 10s to expire).
485        assert!(dl.timed_out_pieces().is_empty());
486    }
487
488    #[test]
489    fn backoff_increases_timeout_per_retry() {
490        let mut dl = MetadataDownloader::new(Id20::ZERO);
491        dl.set_total_size(16384);
492
493        let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
494        let _ = dl.request_all_from_peer(peer);
495
496        // Retry 0: 5s, Retry 1: 10s, Retry 2: 20s, Retry 3: 40s, Retry 4: 60s
497        let expected = [5, 10, 20, 40, 60];
498        for (i, &expected_secs) in expected.iter().enumerate() {
499            let backdate = Duration::from_secs(expected_secs);
500            dl.piece_request_times.insert(0, Instant::now() - backdate);
501            assert_eq!(
502                dl.timed_out_pieces().len(),
503                1,
504                "retry {i}: should time out after {expected_secs}s"
505            );
506            dl.reset_request_time(0);
507        }
508    }
509
510    #[test]
511    fn backoff_capped_at_60s() {
512        let mut dl = MetadataDownloader::new(Id20::ZERO);
513        dl.set_total_size(16384);
514
515        let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
516        let _ = dl.request_all_from_peer(peer);
517
518        // Simulate many retries
519        for _ in 0..20 {
520            dl.reset_request_time(0);
521        }
522
523        // Backdate 59s — should NOT time out (cap is 60s)
524        dl.piece_request_times
525            .insert(0, Instant::now() - Duration::from_secs(59));
526        assert!(dl.timed_out_pieces().is_empty());
527
528        // Backdate 61s — should time out
529        dl.piece_request_times
530            .insert(0, Instant::now() - Duration::from_secs(61));
531        assert_eq!(dl.timed_out_pieces().len(), 1);
532    }
533
534    #[test]
535    fn independent_retry_counts_per_piece() {
536        let mut dl = MetadataDownloader::new(Id20::ZERO);
537        dl.set_total_size(32768); // 2 pieces
538
539        let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
540        let _ = dl.request_all_from_peer(peer);
541
542        // Reset piece 0 three times (next timeout: 40s)
543        dl.reset_request_time(0);
544        dl.reset_request_time(0);
545        dl.reset_request_time(0);
546
547        // Piece 1 has zero retries (timeout: 5s)
548        // Backdate both to 6s ago
549        let backdate = Instant::now() - Duration::from_secs(6);
550        dl.piece_request_times.insert(0, backdate);
551        dl.piece_request_times.insert(1, backdate);
552
553        let timed_out = dl.timed_out_pieces();
554        // Only piece 1 should time out (5s threshold), piece 0 needs 40s
555        assert_eq!(timed_out, vec![1]);
556    }
557}