#![allow(
clippy::cast_possible_truncation,
reason = "M175: BEP 11 PEX — flag/glyph bytes bounded by 8-bit field width per spec"
)]
use std::net::SocketAddr;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use crate::torrent::is_i2p_synthetic_addr;
const COMPACT_PEER_SIZE: usize = 6;
const COMPACT_PEER6_SIZE: usize = 18;
const MAX_PEX_ADDED: usize = 50;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub(crate) struct PexMessage {
#[serde(with = "serde_bytes", default)]
pub added: Vec<u8>,
#[serde(rename = "added.f", with = "serde_bytes", default)]
pub added_flags: Vec<u8>,
#[serde(with = "serde_bytes", default)]
pub dropped: Vec<u8>,
#[serde(with = "serde_bytes", default)]
pub added6: Vec<u8>,
#[serde(rename = "added6.f", with = "serde_bytes", default)]
pub added6_flags: Vec<u8>,
#[serde(with = "serde_bytes", default)]
pub dropped6: Vec<u8>,
}
impl PexMessage {
pub fn from_bytes(data: &[u8]) -> crate::Result<Self> {
irontide_bencode::from_bytes_lenient(data)
.map_err(|e| crate::Error::Core(irontide_core::Error::from(e)))
}
pub fn to_bytes(&self) -> crate::Result<Bytes> {
let bytes = irontide_bencode::to_bytes(self)
.map_err(|e| crate::Error::Core(irontide_core::Error::from(e)))?;
Ok(Bytes::from(bytes))
}
pub fn added_peers(&self) -> Vec<SocketAddr> {
if !self.added.len().is_multiple_of(COMPACT_PEER_SIZE) {
return Vec::new();
}
irontide_tracker::parse_compact_peers(&self.added).unwrap_or_default()
}
pub fn dropped_peers(&self) -> Vec<SocketAddr> {
if !self.dropped.len().is_multiple_of(COMPACT_PEER_SIZE) {
return Vec::new();
}
irontide_tracker::parse_compact_peers(&self.dropped).unwrap_or_default()
}
pub fn added_peers6(&self) -> Vec<SocketAddr> {
if !self.added6.len().is_multiple_of(COMPACT_PEER6_SIZE) {
return Vec::new();
}
irontide_tracker::parse_compact_peers6(&self.added6).unwrap_or_default()
}
pub fn dropped_peers6(&self) -> Vec<SocketAddr> {
if !self.dropped6.len().is_multiple_of(COMPACT_PEER6_SIZE) {
return Vec::new();
}
irontide_tracker::parse_compact_peers6(&self.dropped6).unwrap_or_default()
}
}
pub(crate) fn filter_peers_for_transport(
peers: &[SocketAddr],
allow_mixed: bool,
recipient_is_i2p: bool,
) -> Vec<SocketAddr> {
if allow_mixed {
return peers.to_vec();
}
peers
.iter()
.copied()
.filter(|addr| is_i2p_synthetic_addr(addr) == recipient_is_i2p)
.collect()
}
fn is_private_to_public(recipient: SocketAddr, peer: SocketAddr) -> bool {
let peer_ip = peer.ip();
let recipient_ip = recipient.ip();
match (peer_ip, recipient_ip) {
(std::net::IpAddr::V4(p), std::net::IpAddr::V4(r)) => {
(p.is_private() || p.is_loopback()) && !(r.is_private() || r.is_loopback())
}
_ => false, }
}
#[allow(
clippy::fn_params_excessive_bools,
reason = "PEX flags mirror wire protocol fields"
)]
pub(crate) fn peer_flags(encrypted: bool, seed: bool, utp: bool, holepunch: bool) -> u8 {
let mut f = 0u8;
if encrypted {
f |= 0x01;
}
if seed {
f |= 0x02;
}
if utp {
f |= 0x04;
}
if holepunch {
f |= 0x08;
}
f
}
pub(crate) fn build_pex_message(added: &[(SocketAddr, u8)], dropped: &[SocketAddr]) -> PexMessage {
let (v4_added, v6_added): (Vec<_>, Vec<_>) = added.iter().partition(|(a, _)| a.is_ipv4());
let (v4_dropped, v6_dropped): (Vec<_>, Vec<_>) = dropped.iter().partition(|a| a.is_ipv4());
let v4_added_addrs: Vec<SocketAddr> = v4_added.iter().map(|(a, _)| *a).collect();
let v4_added_flags: Vec<u8> = v4_added.iter().map(|(_, f)| *f).collect();
let v6_added_addrs: Vec<SocketAddr> = v6_added.iter().map(|(a, _)| *a).collect();
let v6_added_flags: Vec<u8> = v6_added.iter().map(|(_, f)| *f).collect();
let v4_dropped_addrs: Vec<SocketAddr> = v4_dropped.into_iter().copied().collect();
let v6_dropped_addrs: Vec<SocketAddr> = v6_dropped.into_iter().copied().collect();
PexMessage {
added_flags: v4_added_flags,
added: irontide_tracker::encode_compact_peers(&v4_added_addrs),
dropped: irontide_tracker::encode_compact_peers(&v4_dropped_addrs),
added6_flags: v6_added_flags,
added6: irontide_tracker::encode_compact_peers6(&v6_added_addrs),
dropped6: irontide_tracker::encode_compact_peers6(&v6_dropped_addrs),
}
}
pub(crate) async fn pex_send_task(
peer_addr: SocketAddr,
cmd_tx: tokio::sync::mpsc::Sender<crate::types::PeerCommand>,
live_peers: std::sync::Arc<parking_lot::RwLock<std::collections::HashMap<SocketAddr, u8>>>,
allow_mixed_i2p: bool,
recipient_is_i2p: bool,
) {
use std::collections::HashSet;
use tokio::time::{Duration, sleep};
let mut peer_view: HashSet<SocketAddr> = HashSet::new();
sleep(Duration::from_secs(3)).await;
loop {
sleep(Duration::from_mins(1)).await;
let live_snapshot = live_peers.read().clone();
let live_addrs: HashSet<SocketAddr> = live_snapshot.keys().copied().collect();
let mut added: Vec<(SocketAddr, u8)> = live_addrs
.difference(&peer_view)
.copied()
.filter(|a| *a != peer_addr) .filter(|a| !is_private_to_public(peer_addr, *a)) .map(|a| (a, live_snapshot.get(&a).copied().unwrap_or(0)))
.collect();
if !allow_mixed_i2p {
let addrs: Vec<SocketAddr> = added.iter().map(|(a, _)| *a).collect();
let filtered = filter_peers_for_transport(&addrs, false, recipient_is_i2p);
let filtered_set: HashSet<SocketAddr> = filtered.into_iter().collect();
added.retain(|(a, _)| filtered_set.contains(a));
}
added.truncate(MAX_PEX_ADDED);
let mut dropped: Vec<SocketAddr> = peer_view.difference(&live_addrs).copied().collect();
dropped.truncate(MAX_PEX_ADDED);
if added.is_empty() && dropped.is_empty() {
continue;
}
let msg = build_pex_message(&added, &dropped);
if cmd_tx
.send(crate::types::PeerCommand::SendPex { message: msg })
.await
.is_err()
{
return; }
for (a, _) in &added {
peer_view.insert(*a);
}
for d in &dropped {
peer_view.remove(d);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn peer_flags_all_combinations() {
for bits in 0u8..16 {
let encrypted = bits & 0x01 != 0;
let seed = bits & 0x02 != 0;
let utp = bits & 0x04 != 0;
let holepunch = bits & 0x08 != 0;
let flags = peer_flags(encrypted, seed, utp, holepunch);
assert_eq!(
flags, bits,
"flags({encrypted}, {seed}, {utp}, {holepunch}) should be {bits:#04x}"
);
}
}
#[test]
fn encode_decode_round_trip() {
let msg = PexMessage {
added: vec![192, 168, 1, 1, 0x1A, 0xE1],
added_flags: vec![0x01],
dropped: vec![10, 0, 0, 1, 0x1F, 0x90],
..Default::default()
};
let encoded = msg.to_bytes().expect("encode failed");
let decoded = PexMessage::from_bytes(&encoded).expect("decode failed");
assert_eq!(msg.added, decoded.added);
assert_eq!(msg.added_flags, decoded.added_flags);
assert_eq!(msg.dropped, decoded.dropped);
}
#[test]
fn parse_added_peers() {
let msg = PexMessage {
added: vec![192, 168, 1, 1, 0x1A, 0xE1],
added_flags: vec![0x01],
..Default::default()
};
let peers = msg.added_peers();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].to_string(), "192.168.1.1:6881");
}
#[test]
fn parse_dropped_peers() {
let msg = PexMessage {
dropped: vec![10, 0, 0, 1, 0x1F, 0x90],
..Default::default()
};
let peers = msg.dropped_peers();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].to_string(), "10.0.0.1:8080");
}
#[test]
fn empty_message() {
let msg = PexMessage::default();
let encoded = msg.to_bytes().expect("encode failed");
let decoded = PexMessage::from_bytes(&encoded).expect("decode failed");
assert!(decoded.added.is_empty());
assert!(decoded.added_flags.is_empty());
assert!(decoded.dropped.is_empty());
assert!(decoded.added_peers().is_empty());
assert!(decoded.dropped_peers().is_empty());
}
#[test]
fn malformed_added_ignored() {
let msg = PexMessage {
added: vec![192, 168, 1, 1, 0x1A],
added_flags: Vec::new(),
dropped: Vec::new(),
..Default::default()
};
let peers = msg.added_peers();
assert!(peers.is_empty());
}
#[test]
fn ipv6_round_trip() {
use std::net::Ipv6Addr;
let ip: Ipv6Addr = "2001:db8::1".parse().unwrap();
let mut added6 = Vec::new();
added6.extend_from_slice(&ip.octets());
added6.extend_from_slice(&6881u16.to_be_bytes());
let msg = PexMessage {
added6: added6.clone(),
added6_flags: vec![0x01],
..Default::default()
};
let encoded = msg.to_bytes().expect("encode failed");
let decoded = PexMessage::from_bytes(&encoded).expect("decode failed");
assert_eq!(decoded.added6, added6);
assert_eq!(decoded.added6_flags, vec![0x01]);
}
#[test]
fn parse_added_peers6() {
use std::net::Ipv6Addr;
let ip: Ipv6Addr = "2001:db8::1".parse().unwrap();
let mut added6 = Vec::new();
added6.extend_from_slice(&ip.octets());
added6.extend_from_slice(&8080u16.to_be_bytes());
let msg = PexMessage {
added6,
..Default::default()
};
let peers = msg.added_peers6();
assert_eq!(peers.len(), 1);
assert_eq!(
peers[0],
"[2001:db8::1]:8080".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn parse_dropped_peers6() {
use std::net::Ipv6Addr;
let ip: Ipv6Addr = "::1".parse().unwrap();
let mut dropped6 = Vec::new();
dropped6.extend_from_slice(&ip.octets());
dropped6.extend_from_slice(&6881u16.to_be_bytes());
let msg = PexMessage {
dropped6,
..Default::default()
};
let peers = msg.dropped_peers6();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0], "[::1]:6881".parse::<SocketAddr>().unwrap());
}
#[test]
fn malformed_added6_ignored() {
let msg = PexMessage {
added6: vec![0u8; 17], ..Default::default()
};
assert!(msg.added_peers6().is_empty());
}
#[test]
fn filter_removes_i2p_for_clearnet_recipient() {
let peers = vec![
"1.2.3.4:6881".parse().unwrap(),
"240.0.0.1:1".parse().unwrap(),
];
let filtered = filter_peers_for_transport(&peers, false, false);
assert_eq!(filtered.len(), 1);
assert!(!is_i2p_synthetic_addr(&filtered[0]));
}
#[test]
fn filter_removes_clearnet_for_i2p_recipient() {
let peers = vec![
"1.2.3.4:6881".parse().unwrap(),
"240.0.0.1:1".parse().unwrap(),
];
let filtered = filter_peers_for_transport(&peers, false, true);
assert_eq!(filtered.len(), 1);
assert!(is_i2p_synthetic_addr(&filtered[0]));
}
#[test]
fn filter_keeps_all_when_mixed_allowed() {
let peers = vec![
"1.2.3.4:6881".parse().unwrap(),
"240.0.0.1:1".parse().unwrap(),
];
assert_eq!(filter_peers_for_transport(&peers, true, false).len(), 2);
}
#[test]
fn pex_build_message_added() {
let added: Vec<(SocketAddr, u8)> = vec![
("1.2.3.4:6881".parse().unwrap(), 0x01),
("5.6.7.8:8080".parse().unwrap(), 0x04),
("9.10.11.12:9999".parse().unwrap(), 0x00),
];
let msg = build_pex_message(&added, &[]);
assert_eq!(msg.added.len(), 18);
assert_eq!(msg.added_flags.len(), 3);
assert_eq!(msg.added_flags, vec![0x01, 0x04, 0x00]);
assert!(msg.dropped.is_empty());
assert!(msg.added6.is_empty());
assert!(msg.dropped6.is_empty());
let parsed = msg.added_peers();
assert_eq!(parsed.len(), 3);
assert_eq!(parsed[0], "1.2.3.4:6881".parse::<SocketAddr>().unwrap());
assert_eq!(parsed[1], "5.6.7.8:8080".parse::<SocketAddr>().unwrap());
assert_eq!(parsed[2], "9.10.11.12:9999".parse::<SocketAddr>().unwrap());
}
#[test]
fn pex_build_message_dropped() {
let dropped: Vec<SocketAddr> = vec![
"10.0.0.1:6881".parse().unwrap(),
"10.0.0.2:7777".parse().unwrap(),
];
let msg = build_pex_message(&[], &dropped);
assert!(msg.added.is_empty());
assert!(msg.added_flags.is_empty());
assert_eq!(msg.dropped.len(), 12);
assert!(msg.added6.is_empty());
assert!(msg.dropped6.is_empty());
let parsed = msg.dropped_peers();
assert_eq!(parsed.len(), 2);
}
#[test]
fn pex_rate_limit_50_added() {
let mut added: Vec<(SocketAddr, u8)> = (0..60u16)
.map(|i| {
let a = (i / 256) as u8;
let b = (i % 256) as u8;
(SocketAddr::from(([100, 0, a, b], 6881)), 0x00)
})
.collect();
added.truncate(MAX_PEX_ADDED);
let msg = build_pex_message(&added, &[]);
assert_eq!(msg.added.len(), 300);
assert_eq!(msg.added_flags.len(), 50);
assert_eq!(msg.added_peers().len(), 50);
}
#[test]
fn pex_excludes_recipient() {
let recipient: SocketAddr = "1.2.3.4:6881".parse().unwrap();
let peers: Vec<SocketAddr> = vec![
"1.2.3.4:6881".parse().unwrap(), "5.6.7.8:6881".parse().unwrap(),
];
let filtered: Vec<SocketAddr> = peers.into_iter().filter(|a| *a != recipient).collect();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0], "5.6.7.8:6881".parse::<SocketAddr>().unwrap());
}
#[test]
fn pex_ipv6_separate_fields() {
let added: Vec<(SocketAddr, u8)> = vec![
("1.2.3.4:6881".parse().unwrap(), 0x01),
("[2001:db8::1]:8080".parse().unwrap(), 0x05),
("5.6.7.8:9999".parse().unwrap(), 0x00),
("[::1]:6881".parse().unwrap(), 0x02),
];
let msg = build_pex_message(&added, &[]);
assert_eq!(msg.added.len(), 12);
assert_eq!(msg.added_flags.len(), 2);
assert_eq!(msg.added_flags, vec![0x01, 0x00]);
assert_eq!(msg.added6.len(), 36);
assert_eq!(msg.added6_flags.len(), 2);
assert_eq!(msg.added6_flags, vec![0x05, 0x02]);
let v4 = msg.added_peers();
assert_eq!(v4.len(), 2);
let v6 = msg.added_peers6();
assert_eq!(v6.len(), 2);
}
#[test]
fn pex_privacy_filtering() {
let public_recipient: SocketAddr = "8.8.8.8:6881".parse().unwrap();
let private_peer: SocketAddr = "192.168.1.100:6881".parse().unwrap();
let loopback_peer: SocketAddr = "127.0.0.1:6881".parse().unwrap();
let public_peer: SocketAddr = "1.2.3.4:6881".parse().unwrap();
assert!(is_private_to_public(public_recipient, private_peer));
assert!(is_private_to_public(public_recipient, loopback_peer));
assert!(!is_private_to_public(public_recipient, public_peer));
let private_recipient: SocketAddr = "10.0.0.1:6881".parse().unwrap();
assert!(!is_private_to_public(private_recipient, private_peer));
}
#[tokio::test]
async fn pex_send_task_exits_on_disconnect() {
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
let live_peers: Arc<RwLock<HashMap<SocketAddr, u8>>> =
Arc::new(RwLock::new(HashMap::new()));
live_peers
.write()
.insert("1.2.3.4:6881".parse().unwrap(), 0x00);
let (tx, rx) = tokio::sync::mpsc::channel(1);
drop(rx);
let peer_addr: SocketAddr = "5.6.7.8:6881".parse().unwrap();
let handle = tokio::spawn(pex_send_task(peer_addr, tx, live_peers, false, false));
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
handle.abort();
}
#[test]
fn pex_build_empty_no_message() {
let msg = build_pex_message(&[], &[]);
assert!(msg.added.is_empty());
assert!(msg.added_flags.is_empty());
assert!(msg.dropped.is_empty());
assert!(msg.added6.is_empty());
assert!(msg.added6_flags.is_empty());
assert!(msg.dropped6.is_empty());
}
}