use std::net::SocketAddr;
use std::time::Instant;
use crate::bencode::{decode as bencode_decode, encode as bencode_encode};
use crate::error::{Error, ErrorKind};
use crate::peer::{ExtensionNegotiation, PeerMessage, PexMessage};
use super::DownloadLoop;
use super::types::{PIPELINE_SIZE, UT_PEX};
impl DownloadLoop {
pub(super) async fn handle_ltep_handshake(
&mut self, addr: SocketAddr, data: &[u8],
) -> Result<(), Error> {
let peer = match self.peers.get_mut(&addr) {
Some(p) => p,
None => return Ok(()),
};
let (val, _) = bencode_decode(data).map_err(|e| {
tracing::warn!("invalid LTEP bencode from {}: {}", addr, e);
Error::new(ErrorKind::PeerInvalidExtendedMessage)
})?;
let neg = ExtensionNegotiation::from_bencode(&val).map_err(|e| {
tracing::warn!("invalid LTEP dict from {}: {}", addr, e);
Error::new(ErrorKind::PeerInvalidExtendedMessage)
})?;
peer.remote_extension_ids = neg.m;
peer.client_version = neg.v;
peer.metadata_size = neg.metadata_size;
if let Some(reqq) = neg.reqq {
let limit = usize::try_from(reqq).unwrap_or(PIPELINE_SIZE);
peer.max_requests = limit.min(PIPELINE_SIZE);
}
tracing::debug!(
"LTEP handshake from {}: {:?}",
addr,
peer.remote_extension_ids
);
if self.pex_enabled {
if let Err(e) = self.send_pex_message(addr, &[]).await {
tracing::warn!("failed to send initial PEX to {}: {}", addr, e);
}
}
Ok(())
}
pub(super) async fn handle_extended_message(
&mut self, addr: SocketAddr, ext_id: u8, data: Vec<u8>,
) -> Result<(), Error> {
let peer = match self.peers.get_mut(&addr) {
Some(p) => p,
None => return Ok(()),
};
let is_pex = peer.our_extension_ids.get(UT_PEX) == Some(&ext_id);
if is_pex {
let (val, _) =
bencode_decode(&data).map_err(|_| Error::new(ErrorKind::PeerInvalidPexMessage))?;
let pex_msg = PexMessage::from_bencode(&val)?;
let added_count = pex_msg.added.len();
let dropped_count = pex_msg.dropped.len();
let added6_count = pex_msg.added6.len();
let dropped6_count = pex_msg.dropped6.len();
let mut pm = self.peer_mgr.write().await;
if !pex_msg.added.is_empty() {
pm.add_peers(pex_msg.added);
}
if !pex_msg.added6.is_empty() {
pm.add_peers(pex_msg.added6);
}
peer.last_pex_received = Some(Instant::now());
tracing::debug!(
"received PEX from {}: +{}/-{} (IPv4), +{}/-{} (IPv6)",
addr,
added_count,
dropped_count,
added6_count,
dropped6_count,
);
}
Ok(())
}
pub(super) async fn send_pex_message(
&mut self, addr: SocketAddr, dropped: &[SocketAddr],
) -> Result<(), Error> {
let peer = match self.peers.get(&addr) {
Some(p) => p,
None => return Ok(()),
};
let pex_id = match peer.remote_extension_ids.get(UT_PEX) {
Some(&id) => id,
None => return Ok(()), };
if peer
.last_pex_sent
.is_some_and(|t| t.elapsed() < self.pex_interval)
{
return Ok(());
}
let connected = self.peer_mgr.read().await.connection_addrs();
let (added, added6): (Vec<_>, Vec<_>) = connected
.into_iter()
.filter(|a| *a != addr)
.partition(|a| a.is_ipv4());
let added: Vec<SocketAddr> = added.into_iter().take(50).collect();
let added6: Vec<SocketAddr> = added6.into_iter().take(50).collect();
let (dropped_v4, dropped_v6): (Vec<_>, Vec<_>) = dropped.iter().partition(|a| a.is_ipv4());
let dropped: Vec<SocketAddr> = dropped_v4.into_iter().copied().collect();
let dropped6: Vec<SocketAddr> = dropped_v6.into_iter().copied().collect();
let mut pex_msg = PexMessage::new();
pex_msg.added = added;
pex_msg.added6 = added6;
pex_msg.dropped = dropped;
pex_msg.dropped6 = dropped6;
let payload = bencode_encode(&pex_msg.to_bencode());
self.peer_mgr
.read()
.await
.send_to(
&addr,
&PeerMessage::Extended {
ext_id: pex_id,
data: payload,
},
)
.await?;
if let Some(peer) = self.peers.get_mut(&addr) {
peer.last_pex_sent = Some(Instant::now());
}
Ok(())
}
pub(super) async fn broadcast_pex(&mut self) -> Result<(), Error> {
let dropped_snapshot: Vec<SocketAddr> = self.recently_dropped.drain(..).collect();
let addresses: Vec<SocketAddr> = self
.peers
.iter()
.filter(|(_, info)| info.remote_extension_ids.contains_key(UT_PEX))
.map(|(addr, _)| *addr)
.collect();
for addr in addresses {
let dropped = dropped_snapshot.iter().filter(|a| **a != addr).copied();
let dropped: Vec<SocketAddr> = dropped.collect();
if let Err(e) = self.send_pex_message(addr, &dropped).await {
tracing::warn!("failed to send PEX to {}: {}", addr, e);
}
}
Ok(())
}
}