#![allow(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
clippy::unchecked_time_subtraction,
reason = "M175: BEP 9 ut_metadata — piece counts bounded by metadata size; remaining time-sub sites are test fixtures"
)]
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use bytes::Bytes;
use irontide_core::Id20;
const METADATA_PIECE_SIZE: u64 = 16384;
const METADATA_BASE_TIMEOUT_SECS: u64 = 5;
const METADATA_MAX_TIMEOUT_SECS: u64 = 60;
#[allow(dead_code)]
pub(crate) struct MetadataDownloader {
info_hash: Id20,
total_size: Option<u64>,
pieces: HashMap<u32, Bytes>,
num_pieces: Option<u32>,
requested_peers: HashMap<SocketAddr, HashSet<u32>>,
rejected_peers: HashSet<SocketAddr>,
piece_request_times: HashMap<u32, Instant>,
piece_retry_count: HashMap<u32, u32>,
}
#[allow(dead_code)]
impl MetadataDownloader {
pub fn new(info_hash: Id20) -> Self {
Self {
info_hash,
total_size: None,
pieces: HashMap::new(),
num_pieces: None,
requested_peers: HashMap::new(),
rejected_peers: HashSet::new(),
piece_request_times: HashMap::new(),
piece_retry_count: HashMap::new(),
}
}
pub fn set_total_size(&mut self, size: u64) {
self.total_size = Some(size);
self.num_pieces = Some(size.div_ceil(METADATA_PIECE_SIZE) as u32);
}
pub fn piece_received(&mut self, piece: u32, data: Bytes) -> bool {
self.pieces.insert(piece, data);
match self.num_pieces {
Some(n) => self.pieces.len() == n as usize,
None => false,
}
}
pub fn assemble_and_verify(&self) -> crate::Result<Vec<u8>> {
let num_pieces = self
.num_pieces
.ok_or_else(|| crate::Error::Connection("metadata incomplete".to_string()))?;
if self.pieces.len() != num_pieces as usize {
return Err(crate::Error::Connection("metadata incomplete".to_string()));
}
let mut assembled = Vec::with_capacity(self.total_size.unwrap_or(0) as usize);
for i in 0..num_pieces {
let piece = self
.pieces
.get(&i)
.ok_or_else(|| crate::Error::Connection("metadata incomplete".to_string()))?;
assembled.extend_from_slice(piece);
}
let hash = irontide_core::sha1(&assembled);
if hash != self.info_hash {
return Err(crate::Error::MetadataHashMismatch);
}
Ok(assembled)
}
pub fn missing_pieces(&self) -> Vec<u32> {
match self.num_pieces {
None => Vec::new(),
Some(n) => (0..n).filter(|i| !self.pieces.contains_key(i)).collect(),
}
}
pub fn mark_rejected(&mut self, peer: SocketAddr) {
self.rejected_peers.insert(peer);
self.requested_peers.remove(&peer);
}
pub fn is_rejected(&self, peer: &SocketAddr) -> bool {
self.rejected_peers.contains(peer)
}
pub fn request_all_from_peer(&mut self, peer: SocketAddr) -> Vec<u32> {
if self.rejected_peers.contains(&peer) {
return Vec::new();
}
let missing = self.missing_pieces();
if missing.is_empty() {
return Vec::new();
}
let now = Instant::now();
let peer_set = self.requested_peers.entry(peer).or_default();
for &piece in &missing {
peer_set.insert(piece);
self.piece_request_times.insert(piece, now);
self.piece_retry_count.remove(&piece);
}
missing
}
pub fn timed_out_pieces(&self) -> Vec<u32> {
let now = Instant::now();
self.piece_request_times
.iter()
.filter(|(piece, requested_at)| {
if self.pieces.contains_key(piece) {
return false;
}
let retries = self.piece_retry_count.get(piece).copied().unwrap_or(0);
let clamped = retries.min(12);
let timeout_secs = METADATA_BASE_TIMEOUT_SECS
.saturating_mul(1u64 << clamped)
.min(METADATA_MAX_TIMEOUT_SECS);
let timeout = Duration::from_secs(timeout_secs);
now.duration_since(**requested_at) >= timeout
})
.map(|(piece, _)| *piece)
.collect()
}
pub fn reset_request_time(&mut self, piece: u32) {
self.piece_request_times.insert(piece, Instant::now());
*self.piece_retry_count.entry(piece).or_insert(0) += 1;
}
pub fn has_active_peers(&self) -> bool {
self.requested_peers
.keys()
.any(|peer| !self.rejected_peers.contains(peer))
}
}
#[cfg(test)]
mod tests {
use super::*;
use irontide_core::Id20;
#[test]
fn new_empty() {
let info_hash = Id20::ZERO;
let dl = MetadataDownloader::new(info_hash);
assert!(dl.total_size.is_none());
assert!(dl.num_pieces.is_none());
assert!(dl.pieces.is_empty());
assert!(dl.requested_peers.is_empty());
assert!(dl.rejected_peers.is_empty());
assert!(dl.piece_request_times.is_empty());
}
#[test]
fn set_total_size_calculates_num_pieces() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
assert_eq!(dl.num_pieces, Some(2));
dl.set_total_size(16384);
assert_eq!(dl.num_pieces, Some(1));
dl.set_total_size(16385);
assert_eq!(dl.num_pieces, Some(2));
}
#[test]
fn single_piece_metadata() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(100);
let complete = dl.piece_received(0, Bytes::from(vec![0u8; 100]));
assert!(complete);
}
#[test]
fn multi_piece_metadata() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
let complete = dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
assert!(!complete);
let complete = dl.piece_received(1, Bytes::from(vec![0u8; 16384]));
assert!(complete);
}
#[test]
fn piece_received_returns_false_when_incomplete() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
let complete = dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
assert!(!complete);
}
#[test]
fn assemble_and_verify_correct_hash() {
let data = b"hello world metadata test data!!";
let info_hash = irontide_core::sha1(data);
let mut dl = MetadataDownloader::new(info_hash);
dl.set_total_size(data.len() as u64);
dl.piece_received(0, Bytes::from(data.to_vec()));
let result = dl.assemble_and_verify().unwrap();
assert_eq!(result, data);
}
#[test]
fn assemble_and_verify_wrong_hash() {
let data = b"hello world metadata test data!!";
let wrong_hash = Id20::ZERO;
let mut dl = MetadataDownloader::new(wrong_hash);
dl.set_total_size(data.len() as u64);
dl.piece_received(0, Bytes::from(data.to_vec()));
let result = dl.assemble_and_verify();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, crate::Error::MetadataHashMismatch),
"expected MetadataHashMismatch, got: {err:?}"
);
}
#[test]
fn metadata_full_redundancy_all_pieces_to_each_peer() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
let pieces_a = dl.request_all_from_peer(peer_a);
assert_eq!(pieces_a, vec![0, 1]);
let pieces_b = dl.request_all_from_peer(peer_b);
assert_eq!(pieces_b, vec![0, 1]);
assert!(dl.requested_peers.contains_key(&peer_a));
assert!(dl.requested_peers.contains_key(&peer_b));
assert_eq!(dl.requested_peers[&peer_a].len(), 2);
assert_eq!(dl.requested_peers[&peer_b].len(), 2);
assert!(dl.piece_request_times.contains_key(&0));
assert!(dl.piece_request_times.contains_key(&1));
}
#[test]
fn metadata_reject_blacklists_peer() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer_a);
let _ = dl.request_all_from_peer(peer_b);
dl.mark_rejected(peer_a);
assert!(dl.is_rejected(&peer_a));
assert!(!dl.is_rejected(&peer_b));
assert!(!dl.requested_peers.contains_key(&peer_a));
let pieces = dl.request_all_from_peer(peer_a);
assert!(pieces.is_empty());
let pieces = dl.request_all_from_peer(peer_b);
assert_eq!(pieces, vec![0, 1]);
}
#[test]
fn metadata_timeout_triggers_rerequest() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer_a);
let old_time = Instant::now() - Duration::from_secs(10);
dl.piece_request_times.insert(0, old_time);
dl.piece_request_times.insert(1, old_time);
let timed_out = dl.timed_out_pieces();
assert_eq!(timed_out.len(), 2);
assert!(timed_out.contains(&0));
assert!(timed_out.contains(&1));
dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
let timed_out = dl.timed_out_pieces();
assert_eq!(timed_out, vec![1]);
}
#[test]
fn metadata_parallel_fetch_from_multiple_peers() {
let data = b"hello world metadata test data!!";
let info_hash = irontide_core::sha1(data);
let mut dl = MetadataDownloader::new(info_hash);
dl.set_total_size(data.len() as u64);
let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
let pieces_a = dl.request_all_from_peer(peer_a);
let pieces_b = dl.request_all_from_peer(peer_b);
assert_eq!(pieces_a, vec![0]);
assert_eq!(pieces_b, vec![0]);
let complete = dl.piece_received(0, Bytes::from(data.to_vec()));
assert!(complete);
let result = dl.assemble_and_verify().unwrap();
assert_eq!(result, data);
}
#[test]
fn metadata_parallel_multi_piece_assembly() {
let data = vec![0xAA_u8; 16384 * 2 + 100]; let info_hash = irontide_core::sha1(&data);
let mut dl = MetadataDownloader::new(info_hash);
dl.set_total_size(data.len() as u64);
assert_eq!(dl.num_pieces, Some(3));
let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let peer_b: SocketAddr = "10.0.0.2:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer_a);
let _ = dl.request_all_from_peer(peer_b);
assert!(!dl.piece_received(0, Bytes::from(data[..16384].to_vec())));
assert!(!dl.piece_received(1, Bytes::from(data[16384..32768].to_vec())));
assert!(dl.piece_received(2, Bytes::from(data[32768..].to_vec())));
let result = dl.assemble_and_verify().unwrap();
assert_eq!(result, data);
}
#[test]
fn has_active_peers_reflects_state() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(16384);
assert!(!dl.has_active_peers());
let peer_a: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer_a);
assert!(dl.has_active_peers());
dl.mark_rejected(peer_a);
assert!(!dl.has_active_peers());
}
#[test]
fn request_all_from_peer_skips_received_pieces() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
dl.piece_received(0, Bytes::from(vec![0u8; 16384]));
let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let pieces = dl.request_all_from_peer(peer);
assert_eq!(pieces, vec![1]);
}
#[test]
fn reset_request_time_updates_timestamp() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(16384);
let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer);
let old_time = Instant::now() - Duration::from_secs(10);
dl.piece_request_times.insert(0, old_time);
assert_eq!(dl.timed_out_pieces().len(), 1);
dl.reset_request_time(0);
assert!(dl.timed_out_pieces().is_empty());
}
#[test]
fn backoff_increases_timeout_per_retry() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(16384);
let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer);
let expected = [5, 10, 20, 40, 60];
for (i, &expected_secs) in expected.iter().enumerate() {
let backdate = Duration::from_secs(expected_secs);
dl.piece_request_times.insert(0, Instant::now() - backdate);
assert_eq!(
dl.timed_out_pieces().len(),
1,
"retry {i}: should time out after {expected_secs}s"
);
dl.reset_request_time(0);
}
}
#[test]
fn backoff_capped_at_60s() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(16384);
let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer);
for _ in 0..20 {
dl.reset_request_time(0);
}
dl.piece_request_times
.insert(0, Instant::now() - Duration::from_secs(59));
assert!(dl.timed_out_pieces().is_empty());
dl.piece_request_times
.insert(0, Instant::now() - Duration::from_secs(61));
assert_eq!(dl.timed_out_pieces().len(), 1);
}
#[test]
fn independent_retry_counts_per_piece() {
let mut dl = MetadataDownloader::new(Id20::ZERO);
dl.set_total_size(32768);
let peer: SocketAddr = "10.0.0.1:6881".parse().expect("valid addr");
let _ = dl.request_all_from_peer(peer);
dl.reset_request_time(0);
dl.reset_request_time(0);
dl.reset_request_time(0);
let backdate = Instant::now() - Duration::from_secs(6);
dl.piece_request_times.insert(0, backdate);
dl.piece_request_times.insert(1, backdate);
let timed_out = dl.timed_out_pieces();
assert_eq!(timed_out, vec![1]);
}
}