#![allow(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
reason = "M175: BEP 9 metadata resolver — piece counts bounded by metadata size"
)]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use parking_lot::Mutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{Semaphore, mpsc};
use tracing::{debug, trace};
use irontide_core::{Id20, TorrentMetaV1, torrent_from_bytes};
use irontide_wire::{ExtHandshake, Handshake, Message, MetadataMessage, MetadataMessageType};
use crate::metadata::MetadataDownloader;
use crate::transport::NetworkFactory;
pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 128;
const RESOLVE_TIMEOUT: Duration = Duration::from_mins(1);
const PER_PEER_TIMEOUT: Duration = Duration::from_secs(15);
const MAX_MESSAGE_SIZE: usize = 256 * 1024;
const HANDSHAKE_SIZE: usize = 68;
struct PeerResult {
complete: bool,
}
pub(crate) async fn resolve_metadata(
info_hash: Id20,
peer_id: Id20,
mut peer_rx: mpsc::UnboundedReceiver<Vec<SocketAddr>>,
factory: Arc<NetworkFactory>,
connect_timeout: Duration,
max_concurrent: usize,
) -> crate::Result<(TorrentMetaV1, Vec<SocketAddr>)> {
let semaphore = Arc::new(Semaphore::new(max_concurrent));
let downloader = Arc::new(Mutex::new(MetadataDownloader::new(info_hash)));
let connected_peers = Arc::new(Mutex::new(Vec::<SocketAddr>::new()));
let mut futures = FuturesUnordered::new();
let mut seen = std::collections::HashSet::<SocketAddr>::new();
let result = tokio::time::timeout(RESOLVE_TIMEOUT, async {
loop {
tokio::select! {
Some(batch) = peer_rx.recv() => {
for addr in batch {
if !seen.insert(addr) {
continue;
}
let Ok(permit) = semaphore.clone().try_acquire_owned() else {
trace!(%addr, "metadata resolver: semaphore full, skipping peer");
continue;
};
let factory = Arc::clone(&factory);
let dl = Arc::clone(&downloader);
let peers = Arc::clone(&connected_peers);
futures.push(tokio::spawn(async move {
let result = tokio::time::timeout(
PER_PEER_TIMEOUT,
resolve_from_peer(
addr,
info_hash,
peer_id,
&factory,
connect_timeout,
&dl,
),
)
.await;
drop(permit);
match result {
Ok(Ok(peer_result)) => {
peers.lock().push(addr);
Some(peer_result)
}
Ok(Err(e)) => {
trace!(%addr, "metadata peer failed: {e}");
None
}
Err(_) => {
trace!(%addr, "metadata peer timed out");
None
}
}
}));
}
}
Some(result) = futures.next() => {
match result {
Ok(Some(peer_result)) if peer_result.complete => {
let assembled = downloader.lock().assemble_and_verify()?;
return assemble_torrent(info_hash, assembled, &connected_peers);
}
_ => {
}
}
}
else => {
break;
}
}
}
while let Some(result) = futures.next().await {
if let Ok(Some(peer_result)) = result
&& peer_result.complete
{
let assembled = downloader.lock().assemble_and_verify()?;
return assemble_torrent(info_hash, assembled, &connected_peers);
}
}
Err(crate::Error::Connection(
"metadata resolution exhausted all peers".into(),
))
})
.await;
match result {
Ok(inner) => inner,
Err(_) => Err(crate::Error::Connection(
"metadata resolution timed out".into(),
)),
}
}
fn assemble_torrent(
info_hash: Id20,
info_bytes: Vec<u8>,
connected_peers: &Arc<Mutex<Vec<SocketAddr>>>,
) -> crate::Result<(TorrentMetaV1, Vec<SocketAddr>)> {
let mut torrent_bytes = b"d4:info".to_vec();
torrent_bytes.extend_from_slice(&info_bytes);
torrent_bytes.push(b'e');
let mut meta = torrent_from_bytes(&torrent_bytes)
.map_err(|e| crate::Error::Connection(format!("failed to parse resolved metadata: {e}")))?;
meta.info_bytes = Some(Bytes::from(info_bytes));
let peers = connected_peers.lock().clone();
debug!(
%info_hash,
num_peers = peers.len(),
"metadata resolved via pre-phase"
);
Ok((meta, peers))
}
async fn resolve_from_peer(
addr: SocketAddr,
info_hash: Id20,
peer_id: Id20,
factory: &NetworkFactory,
connect_timeout: Duration,
downloader: &Mutex<MetadataDownloader>,
) -> crate::Result<PeerResult> {
let mut stream = tokio::time::timeout(connect_timeout, factory.connect_tcp(addr))
.await
.map_err(|_| crate::Error::Connection("connect timed out".into()))??;
let our_hs = Handshake::new(info_hash, peer_id);
let hs_bytes = our_hs.to_bytes();
stream.write_all(&hs_bytes).await?;
stream.flush().await?;
let mut hs_buf = [0u8; HANDSHAKE_SIZE];
stream.read_exact(&mut hs_buf).await?;
let their_hs = Handshake::from_bytes(&hs_buf)?;
if their_hs.info_hash != info_hash {
return Err(crate::Error::Connection("info_hash mismatch".into()));
}
if !their_hs.supports_extensions() {
return Err(crate::Error::Connection(
"peer does not support BEP 10 extensions".into(),
));
}
let our_ext = ExtHandshake::new();
let ext_payload = our_ext.to_bytes().map_err(crate::Error::Wire)?;
let ext_msg = Message::Extended {
ext_id: 0,
payload: ext_payload,
};
write_message(&mut stream, &ext_msg).await?;
let their_ext = loop {
let msg = read_message(&mut stream).await?;
match msg {
Message::Extended { ext_id: 0, payload } => {
break ExtHandshake::from_bytes(&payload).map_err(crate::Error::Wire)?;
}
Message::KeepAlive
| Message::Bitfield(_)
| Message::Have { .. }
| Message::Unchoke
| Message::Choke
| Message::HaveAll
| Message::HaveNone => {}
other => {
trace!(%addr, ?other, "unexpected message before ext handshake");
}
}
};
let their_ut_metadata_id = their_ext
.ext_id("ut_metadata")
.ok_or_else(|| crate::Error::Connection("peer does not support ut_metadata".into()))?;
let metadata_size = their_ext
.metadata_size
.ok_or_else(|| crate::Error::Connection("peer did not advertise metadata_size".into()))?;
if metadata_size == 0 || metadata_size > 16 * 1024 * 1024 {
return Err(crate::Error::Connection(format!(
"invalid metadata_size: {metadata_size}"
)));
}
let our_ut_metadata_id = our_ext
.ext_id("ut_metadata")
.expect("we always advertise ut_metadata");
let pieces_to_request = {
let mut dl = downloader.lock();
dl.set_total_size(metadata_size);
dl.request_all_from_peer(addr)
};
if pieces_to_request.is_empty() {
return Ok(PeerResult { complete: false });
}
for &piece_idx in &pieces_to_request {
let req = MetadataMessage::request(piece_idx);
let req_bytes = req.to_bytes().map_err(crate::Error::Wire)?;
let msg = Message::Extended {
ext_id: their_ut_metadata_id,
payload: req_bytes,
};
write_message(&mut stream, &msg).await?;
}
loop {
let msg = read_message(&mut stream).await?;
match msg {
Message::Extended { ext_id, payload } if ext_id == our_ut_metadata_id => {
let meta_msg = MetadataMessage::from_bytes(&payload).map_err(crate::Error::Wire)?;
match meta_msg.msg_type {
MetadataMessageType::Data => {
let piece_data = meta_msg.data.ok_or_else(|| {
crate::Error::Connection("metadata data message missing payload".into())
})?;
let complete = {
let mut dl = downloader.lock();
dl.piece_received(meta_msg.piece, piece_data)
};
if complete {
return Ok(PeerResult { complete: true });
}
}
MetadataMessageType::Reject => {
downloader.lock().mark_rejected(addr);
return Err(crate::Error::Connection(
"peer rejected metadata request".into(),
));
}
MetadataMessageType::Request => {
}
}
}
_ => {}
}
}
}
async fn write_message(
stream: &mut (impl AsyncWriteExt + Unpin),
msg: &Message,
) -> crate::Result<()> {
let wire_bytes = msg.to_bytes();
stream.write_all(&wire_bytes).await?;
stream.flush().await?;
Ok(())
}
async fn read_message(stream: &mut (impl AsyncReadExt + Unpin)) -> crate::Result<Message<Bytes>> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let length = u32::from_be_bytes(len_buf) as usize;
if length == 0 {
return Ok(Message::KeepAlive);
}
if length > MAX_MESSAGE_SIZE {
return Err(crate::Error::Connection(format!(
"message too large during metadata resolution: {length} bytes"
)));
}
let mut payload = vec![0u8; length];
stream.read_exact(&mut payload).await?;
let payload = Bytes::from(payload);
Message::from_payload(payload).map_err(crate::Error::Wire)
}
#[cfg(test)]
mod tests {
use super::*;
use irontide_core::PeerId;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
fn test_metadata() -> (Vec<u8>, Id20) {
let pieces = [0u8; 20];
let mut info_bytes = Vec::new();
info_bytes.extend_from_slice(b"d");
info_bytes.extend_from_slice(b"6:lengthi1024e");
info_bytes.extend_from_slice(b"4:name4:test");
info_bytes.extend_from_slice(b"12:piece lengthi16384e");
info_bytes.extend_from_slice(b"6:pieces20:");
info_bytes.extend_from_slice(&pieces);
info_bytes.extend_from_slice(b"e");
let hash = irontide_core::sha1(&info_bytes);
(info_bytes, hash)
}
async fn run_mock_peer(
mut stream: impl AsyncReadExt + AsyncWriteExt + Unpin,
info_hash: Id20,
metadata: Vec<u8>,
) {
let remote_id = PeerId::generate().0;
let mut hs_buf = [0u8; HANDSHAKE_SIZE];
stream.read_exact(&mut hs_buf).await.unwrap();
let _their_hs = Handshake::from_bytes(&hs_buf).unwrap();
let our_hs = Handshake::new(info_hash, remote_id);
stream.write_all(&our_hs.to_bytes()).await.unwrap();
stream.flush().await.unwrap();
let msg = read_message_raw(&mut stream).await;
let their_ut_metadata_id = match msg {
Message::Extended { ext_id: 0, payload } => {
let hs = ExtHandshake::from_bytes(&payload).unwrap();
hs.ext_id("ut_metadata").unwrap_or(1)
}
_ => 1,
};
let mut ext_hs = ExtHandshake::new();
ext_hs.metadata_size = Some(metadata.len() as u64);
let payload = ext_hs.to_bytes().unwrap();
let msg = Message::Extended { ext_id: 0, payload };
write_message_raw(&mut stream, &msg).await;
loop {
let Ok(msg) =
tokio::time::timeout(Duration::from_secs(5), read_message_raw(&mut stream)).await
else {
break;
};
if let Message::Extended { ext_id, payload } = msg
&& ext_id == 1
{
if let Ok(meta_msg) = MetadataMessage::from_bytes(&payload)
&& meta_msg.msg_type == MetadataMessageType::Request
{
let piece_idx = meta_msg.piece;
let piece_size: u64 = 16384;
let start = u64::from(piece_idx) * piece_size;
let end = ((start + piece_size) as usize).min(metadata.len());
let data = Bytes::copy_from_slice(&metadata[start as usize..end]);
let resp = MetadataMessage::data(piece_idx, metadata.len() as u64, data);
let resp_bytes = resp.to_bytes().unwrap();
let resp_msg = Message::Extended {
ext_id: their_ut_metadata_id,
payload: resp_bytes,
};
write_message_raw(&mut stream, &resp_msg).await;
}
}
}
}
async fn write_message_raw(stream: &mut (impl AsyncWriteExt + Unpin), msg: &Message) {
let bytes = msg.to_bytes();
stream.write_all(&bytes).await.unwrap();
stream.flush().await.unwrap();
}
async fn read_message_raw(stream: &mut (impl AsyncReadExt + Unpin)) -> Message<Bytes> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await.unwrap();
let length = u32::from_be_bytes(len_buf) as usize;
if length == 0 {
return Message::KeepAlive;
}
let mut payload = vec![0u8; length];
stream.read_exact(&mut payload).await.unwrap();
Message::from_payload(Bytes::from(payload)).unwrap()
}
#[tokio::test]
async fn resolve_metadata_happy_path() {
let (info_bytes, info_hash) = test_metadata();
let peer_id = PeerId::generate().0;
let factory = Arc::new(NetworkFactory::tokio());
let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
let actual_addr = listener.local_addr().unwrap();
let metadata_clone = info_bytes.clone();
tokio::spawn(async move {
let (stream, _addr) = listener.accept().await.unwrap();
run_mock_peer(stream, info_hash, metadata_clone).await;
});
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
peer_tx.send(vec![actual_addr]).unwrap();
drop(peer_tx);
let result = resolve_metadata(
info_hash,
peer_id,
peer_rx,
factory,
Duration::from_secs(5),
DEFAULT_MAX_CONCURRENT,
)
.await;
let (meta, peers) = result.expect("metadata resolution should succeed");
assert_eq!(meta.info_hash, info_hash);
assert!(!peers.is_empty());
assert!(peers.contains(&actual_addr));
}
#[tokio::test]
async fn resolve_metadata_timeout_with_no_peers() {
let info_hash = Id20::ZERO;
let peer_id = PeerId::generate().0;
let factory = Arc::new(NetworkFactory::tokio());
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
drop(peer_tx);
let result = resolve_metadata(
info_hash,
peer_id,
peer_rx,
factory,
Duration::from_secs(1),
DEFAULT_MAX_CONCURRENT,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn resolve_metadata_skips_duplicate_peers() {
let (info_bytes, info_hash) = test_metadata();
let peer_id = PeerId::generate().0;
let factory = Arc::new(NetworkFactory::tokio());
let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
let actual_addr = listener.local_addr().unwrap();
let metadata_clone = info_bytes.clone();
tokio::spawn(async move {
let (stream, _addr) = listener.accept().await.unwrap();
run_mock_peer(stream, info_hash, metadata_clone).await;
});
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
peer_tx.send(vec![actual_addr]).unwrap();
peer_tx.send(vec![actual_addr]).unwrap();
drop(peer_tx);
let result = resolve_metadata(
info_hash,
peer_id,
peer_rx,
factory,
Duration::from_secs(5),
DEFAULT_MAX_CONCURRENT,
)
.await;
let (meta, peers) = result.expect("metadata resolution should succeed");
assert_eq!(meta.info_hash, info_hash);
assert_eq!(peers.len(), 1);
}
#[tokio::test]
async fn resolve_metadata_peer_without_extensions() {
let (_info_bytes, info_hash) = test_metadata();
let peer_id = PeerId::generate().0;
let factory = Arc::new(NetworkFactory::tokio());
let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
let actual_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (mut stream, _addr) = listener.accept().await.unwrap();
let remote_id = PeerId::generate().0;
let mut hs_buf = [0u8; HANDSHAKE_SIZE];
stream.read_exact(&mut hs_buf).await.unwrap();
let mut hs = Handshake::new(info_hash, remote_id);
hs.reserved = [0u8; 8]; stream.write_all(&hs.to_bytes()).await.unwrap();
stream.flush().await.unwrap();
});
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
peer_tx.send(vec![actual_addr]).unwrap();
drop(peer_tx);
let result = resolve_metadata(
info_hash,
peer_id,
peer_rx,
factory,
Duration::from_secs(5),
DEFAULT_MAX_CONCURRENT,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn resolve_metadata_concurrent_limit() {
let (info_bytes, info_hash) = test_metadata();
let peer_id = PeerId::generate().0;
let factory = Arc::new(NetworkFactory::tokio());
let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
let actual_addr = listener.local_addr().unwrap();
let metadata_clone = info_bytes.clone();
tokio::spawn(async move {
let (stream, _addr) = listener.accept().await.unwrap();
run_mock_peer(stream, info_hash, metadata_clone).await;
});
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
peer_tx.send(vec![actual_addr]).unwrap();
drop(peer_tx);
let result = resolve_metadata(
info_hash,
peer_id,
peer_rx,
factory,
Duration::from_secs(5),
1,
)
.await;
let (meta, _peers) = result.expect("metadata resolution should succeed with limit 1");
assert_eq!(meta.info_hash, info_hash);
}
}