use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::time::Instant;
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, warn};
use irontide_core::Id20;
const LSD_MULTICAST: Ipv4Addr = Ipv4Addr::new(239, 192, 152, 143);
const LSD_MULTICAST_V6: Ipv6Addr = Ipv6Addr::new(0xff15, 0, 0, 0, 0, 0, 0xefc0, 0x988f);
const LSD_PORT: u16 = 6771;
const MAX_PACKET_SIZE: usize = 1400;
const ANNOUNCE_INTERVAL: std::time::Duration = std::time::Duration::from_mins(5);
const LSD_HOST_V4: &str = "239.192.152.143:6771";
const LSD_HOST_V6: &str = "[ff15::efc0:988f]:6771";
pub(crate) fn format_announce(
info_hashes: &[Id20],
listen_port: u16,
host: &str,
cookie: [u8; 4],
) -> Vec<Vec<u8>> {
if info_hashes.is_empty() {
return Vec::new();
}
let mut messages = Vec::new();
let mut current_hashes: Vec<&Id20> = Vec::new();
for ih in info_hashes {
let header_size = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: {host}\r\nPort: {listen_port}\r\ncookie: 00000000\r\n"
)
.len();
let per_hash = 52; let footer = 2; let estimated = header_size + (current_hashes.len() + 1) * per_hash + footer;
if estimated > MAX_PACKET_SIZE && !current_hashes.is_empty() {
messages.push(build_message(¤t_hashes, listen_port, host, cookie));
current_hashes.clear();
}
current_hashes.push(ih);
}
if !current_hashes.is_empty() {
messages.push(build_message(¤t_hashes, listen_port, host, cookie));
}
messages
}
fn build_message(info_hashes: &[&Id20], listen_port: u16, host: &str, cookie: [u8; 4]) -> Vec<u8> {
use std::fmt::Write;
let mut msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: {host}\r\nPort: {listen_port}\r\ncookie: {}\r\n",
hex::encode(cookie),
);
for ih in info_hashes {
let _ = write!(msg, "Infohash: {}\r\n", ih.to_hex());
}
msg.push_str("\r\n");
msg.into_bytes()
}
#[derive(Debug, Clone)]
pub(crate) struct LsdAnnounce {
pub port: u16,
pub info_hashes: Vec<Id20>,
pub cookie: Option<[u8; 4]>,
}
pub(crate) fn parse_announce(data: &[u8]) -> Option<LsdAnnounce> {
let text = std::str::from_utf8(data).ok()?;
if !text.starts_with("BT-SEARCH * HTTP/1.1\r\n") {
return None;
}
let mut port: Option<u16> = None;
let mut info_hashes = Vec::new();
let mut cookie: Option<[u8; 4]> = None;
for line in text.split("\r\n") {
if let Some(value) = line.strip_prefix("Port: ") {
port = value.trim().parse().ok();
} else if let Some(value) = line.strip_prefix("Infohash: ")
&& let Ok(ih) = Id20::from_hex(value.trim())
{
info_hashes.push(ih);
} else if let Some(value) = line.strip_prefix("cookie: ") {
let trimmed = value.trim();
if trimmed.len() == 8
&& let Ok(bytes) = hex::decode(trimmed)
&& bytes.len() == 4
{
cookie = Some([bytes[0], bytes[1], bytes[2], bytes[3]]);
}
}
}
let port = port?;
if info_hashes.is_empty() {
return None;
}
Some(LsdAnnounce {
port,
info_hashes,
cookie,
})
}
pub(crate) struct LsdRateLimiter {
last_announce: HashMap<Id20, Instant>,
}
impl LsdRateLimiter {
pub fn new() -> Self {
Self {
last_announce: HashMap::new(),
}
}
pub fn filter_eligible(&mut self, info_hashes: &[Id20]) -> Vec<Id20> {
let now = Instant::now();
let mut eligible = Vec::new();
for ih in info_hashes {
let can_announce = self
.last_announce
.get(ih)
.is_none_or(|t| now.duration_since(*t) >= ANNOUNCE_INTERVAL);
if can_announce {
eligible.push(*ih);
self.last_announce.insert(*ih, now);
}
}
eligible
}
pub fn multicast_addr() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(LSD_MULTICAST, LSD_PORT))
}
pub fn multicast_addr_v6() -> SocketAddr {
SocketAddr::V6(SocketAddrV6::new(LSD_MULTICAST_V6, LSD_PORT, 0, 0))
}
}
pub(crate) enum LsdCommand {
Announce {
info_hashes: Vec<Id20>,
},
Shutdown {
reply: Option<oneshot::Sender<()>>,
},
}
#[derive(Clone)]
pub(crate) struct LsdHandle {
cmd_tx: mpsc::Sender<LsdCommand>,
}
impl LsdHandle {
#[allow(clippy::unused_async)]
pub async fn start(
listen_port: u16,
enable_ipv6: bool,
) -> std::io::Result<(Self, mpsc::Receiver<(Id20, SocketAddr)>)> {
use socket2::{Domain, Protocol, Socket, Type};
let sock4 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
sock4.set_reuse_address(true)?;
sock4.set_broadcast(true)?;
sock4.set_nonblocking(true)?;
sock4.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, LSD_PORT).into())?;
let std_sock: std::net::UdpSocket = sock4.into();
let socket = UdpSocket::from_std(std_sock)?;
socket.join_multicast_v4(LSD_MULTICAST, Ipv4Addr::UNSPECIFIED)?;
let socket_v6 = if enable_ipv6 {
match Self::bind_ipv6_socket() {
Ok(s) => Some(s),
Err(e) => {
warn!("LSD IPv6 unavailable: {e}");
None
}
}
} else {
None
};
let cookie = generate_cookie();
let (cmd_tx, cmd_rx) = mpsc::channel(64);
let (peer_tx, peer_rx) = mpsc::channel(256);
let actor = LsdActor {
socket,
socket_v6,
listen_port,
cookie,
rate_limiter: LsdRateLimiter::new(),
cmd_rx,
peer_tx,
};
tokio::spawn(actor.run());
Ok((Self { cmd_tx }, peer_rx))
}
fn bind_ipv6_socket() -> std::io::Result<UdpSocket> {
use socket2::{Domain, Protocol, Socket, Type};
let sock6 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
sock6.set_only_v6(true)?;
sock6.set_reuse_address(true)?;
sock6.set_nonblocking(true)?;
sock6.bind(&SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, LSD_PORT, 0, 0).into())?;
sock6.join_multicast_v6(&LSD_MULTICAST_V6, 0)?;
let std_sock: std::net::UdpSocket = sock6.into();
UdpSocket::from_std(std_sock)
}
pub async fn announce(&self, info_hashes: Vec<Id20>) {
let _ = self.cmd_tx.send(LsdCommand::Announce { info_hashes }).await;
}
pub async fn shutdown(&self) {
let _ = self.cmd_tx.send(LsdCommand::Shutdown { reply: None }).await;
}
#[allow(dead_code)]
pub async fn shutdown_and_wait(&self) {
let (reply_tx, reply_rx) = oneshot::channel();
if self
.cmd_tx
.send(LsdCommand::Shutdown {
reply: Some(reply_tx),
})
.await
.is_err()
{
return;
}
let _ = reply_rx.await;
}
}
fn generate_cookie() -> [u8; 4] {
#[allow(clippy::cast_possible_truncation)]
let mut seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
seed ^= seed << 13;
seed ^= seed >> 7;
seed ^= seed << 17;
let bytes = seed.to_le_bytes();
[bytes[0], bytes[1], bytes[2], bytes[3]]
}
struct LsdActor {
socket: UdpSocket,
socket_v6: Option<UdpSocket>,
listen_port: u16,
cookie: [u8; 4],
rate_limiter: LsdRateLimiter,
cmd_rx: mpsc::Receiver<LsdCommand>,
peer_tx: mpsc::Sender<(Id20, SocketAddr)>,
}
impl LsdActor {
async fn run(mut self) {
let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf_v6 = [0u8; MAX_PACKET_SIZE];
let mut shutdown_reply: Option<oneshot::Sender<()>> = None;
loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
match cmd {
Some(LsdCommand::Announce { info_hashes }) => {
self.do_announce(&info_hashes).await;
}
Some(LsdCommand::Shutdown { reply }) => {
shutdown_reply = reply;
break;
}
None => break,
}
}
result = self.socket.recv_from(&mut buf) => {
if let Ok((len, src)) = result {
self.handle_incoming(&buf[..len], src).await;
}
}
result = async {
match &self.socket_v6 {
Some(s) => s.recv_from(&mut buf_v6).await,
None => std::future::pending().await,
}
} => {
if let Ok((len, src)) = result {
self.handle_incoming(&buf_v6[..len], src).await;
}
}
}
}
std::mem::drop(self);
if let Some(tx) = shutdown_reply {
let _ = tx.send(());
}
}
async fn do_announce(&mut self, info_hashes: &[Id20]) {
let eligible = self.rate_limiter.filter_eligible(info_hashes);
if eligible.is_empty() {
return;
}
let messages = format_announce(&eligible, self.listen_port, LSD_HOST_V4, self.cookie);
let dest = LsdRateLimiter::multicast_addr();
for msg in &messages {
if let Err(e) = self.socket.send_to(msg, dest).await {
warn!("LSD IPv4 send failed: {e}");
break;
}
}
if let Some(ref sock_v6) = self.socket_v6 {
let messages_v6 =
format_announce(&eligible, self.listen_port, LSD_HOST_V6, self.cookie);
let dest_v6 = LsdRateLimiter::multicast_addr_v6();
for msg in &messages_v6 {
if let Err(e) = sock_v6.send_to(msg, dest_v6).await {
warn!("LSD IPv6 send failed: {e}");
break;
}
}
}
debug!(count = eligible.len(), "LSD announce sent");
}
async fn handle_incoming(&self, data: &[u8], src: SocketAddr) {
let Some(announce) = parse_announce(data) else {
return;
};
if announce.cookie == Some(self.cookie) {
return;
}
let peer_addr = SocketAddr::new(src.ip(), announce.port);
for ih in announce.info_hashes {
if self.peer_tx.send((ih, peer_addr)).await.is_err() {
return;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_hash() -> Id20 {
Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap()
}
fn test_hash2() -> Id20 {
Id20::from_hex("0102030405060708091011121314151617181920").unwrap()
}
fn test_cookie() -> [u8; 4] {
[0xDE, 0xAD, 0xBE, 0xEF]
}
#[test]
fn format_single_announce() {
let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V4, test_cookie());
assert_eq!(msgs.len(), 1);
let text = String::from_utf8(msgs[0].clone()).unwrap();
assert!(text.starts_with("BT-SEARCH * HTTP/1.1\r\n"));
assert!(text.contains("Host: 239.192.152.143:6771\r\n"));
assert!(text.contains("Port: 6881\r\n"));
assert!(text.contains("cookie: deadbeef\r\n"));
assert!(text.contains(&format!("Infohash: {}\r\n", test_hash().to_hex())));
assert!(text.ends_with("\r\n\r\n"));
}
#[test]
fn format_batch_announce() {
let hashes = vec![test_hash(), test_hash2()];
let msgs = format_announce(&hashes, 6881, LSD_HOST_V4, test_cookie());
assert_eq!(msgs.len(), 1);
let text = String::from_utf8(msgs[0].clone()).unwrap();
assert!(text.contains(&format!("Infohash: {}\r\n", test_hash().to_hex())));
assert!(text.contains(&format!("Infohash: {}\r\n", test_hash2().to_hex())));
}
#[test]
fn format_empty() {
let msgs = format_announce(&[], 6881, LSD_HOST_V4, test_cookie());
assert!(msgs.is_empty());
}
#[test]
fn parse_valid_announce() {
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex()
);
let parsed = parse_announce(msg.as_bytes()).unwrap();
assert_eq!(parsed.port, 6881);
assert_eq!(parsed.info_hashes.len(), 1);
assert_eq!(parsed.info_hashes[0], test_hash());
assert_eq!(parsed.cookie, None);
}
#[test]
fn parse_multiple_infohashes() {
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 9999\r\nInfohash: {}\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex(),
test_hash2().to_hex()
);
let parsed = parse_announce(msg.as_bytes()).unwrap();
assert_eq!(parsed.port, 9999);
assert_eq!(parsed.info_hashes.len(), 2);
}
#[test]
fn parse_invalid_no_port() {
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex()
);
assert!(parse_announce(msg.as_bytes()).is_none());
}
#[test]
fn parse_invalid_not_bt_search() {
let msg = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
assert!(parse_announce(msg).is_none());
}
#[test]
fn rate_limiter_first_announce_allowed() {
let mut limiter = LsdRateLimiter::new();
let eligible = limiter.filter_eligible(&[test_hash()]);
assert_eq!(eligible.len(), 1);
}
#[test]
fn rate_limiter_immediate_reannounce_blocked() {
let mut limiter = LsdRateLimiter::new();
let _ = limiter.filter_eligible(&[test_hash()]);
let eligible = limiter.filter_eligible(&[test_hash()]);
assert!(eligible.is_empty());
}
#[test]
fn build_message_ipv4_with_cookie() {
let ih = test_hash();
let msg = build_message(&[&ih], 6881, LSD_HOST_V4, test_cookie());
let text = String::from_utf8(msg).unwrap();
assert!(text.contains("Host: 239.192.152.143:6771\r\n"));
assert!(text.contains("Port: 6881\r\n"));
assert!(text.contains("cookie: deadbeef\r\n"));
}
#[test]
fn build_message_ipv6_with_cookie() {
let ih = test_hash();
let msg = build_message(&[&ih], 6881, LSD_HOST_V6, test_cookie());
let text = String::from_utf8(msg).unwrap();
assert!(text.contains("Host: [ff15::efc0:988f]:6771\r\n"));
assert!(text.contains("cookie: deadbeef\r\n"));
}
#[test]
fn format_announce_includes_cookie() {
let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V4, test_cookie());
let text = String::from_utf8(msgs[0].clone()).unwrap();
assert!(text.contains("cookie: deadbeef\r\n"));
}
#[test]
fn parse_announce_extracts_cookie() {
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\ncookie: deadbeef\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex()
);
let parsed = parse_announce(msg.as_bytes()).unwrap();
assert_eq!(parsed.cookie, Some([0xDE, 0xAD, 0xBE, 0xEF]));
}
#[test]
fn parse_announce_no_cookie_backward_compat() {
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex()
);
let parsed = parse_announce(msg.as_bytes()).unwrap();
assert_eq!(parsed.cookie, None);
assert_eq!(parsed.port, 6881);
}
#[test]
fn actor_drops_own_cookie() {
let cookie = test_cookie();
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\ncookie: deadbeef\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex()
);
let parsed = parse_announce(msg.as_bytes()).unwrap();
assert!(parsed.cookie == Some(cookie));
}
#[test]
fn format_announce_ipv6_host() {
let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V6, test_cookie());
assert_eq!(msgs.len(), 1);
let text = String::from_utf8(msgs[0].clone()).unwrap();
assert!(text.contains("Host: [ff15::efc0:988f]:6771\r\n"));
}
#[test]
fn parse_announce_ipv6_host() {
let msg = format!(
"BT-SEARCH * HTTP/1.1\r\nHost: [ff15::efc0:988f]:6771\r\nPort: 7000\r\ncookie: 01020304\r\nInfohash: {}\r\n\r\n",
test_hash().to_hex()
);
let parsed = parse_announce(msg.as_bytes()).unwrap();
assert_eq!(parsed.port, 7000);
assert_eq!(parsed.cookie, Some([0x01, 0x02, 0x03, 0x04]));
}
#[tokio::test]
async fn lsd_actor_start_and_shutdown() {
let result = LsdHandle::start(6881, false).await;
match result {
Ok((handle, _peer_rx)) => {
handle.announce(vec![test_hash()]).await;
handle.shutdown().await;
}
Err(e) => {
eprintln!("LSD actor test skipped (port unavailable): {e}");
}
}
}
#[tokio::test]
async fn lsd_actor_starts_with_ipv6_enabled() {
let result = LsdHandle::start(6881, true).await;
match result {
Ok((handle, _peer_rx)) => {
handle.announce(vec![test_hash()]).await;
handle.shutdown().await;
}
Err(e) => {
eprintln!("LSD actor IPv6 test skipped (port unavailable): {e}");
}
}
}
#[tokio::test]
async fn lsd_actor_starts_without_ipv6() {
let result = LsdHandle::start(6881, false).await;
match result {
Ok((handle, _peer_rx)) => {
handle.shutdown().await;
}
Err(e) => {
eprintln!("LSD actor test skipped (port unavailable): {e}");
}
}
}
#[tokio::test]
async fn lsd_shutdown_and_wait_returns_promptly() {
let Ok((handle, _peer_rx)) = LsdHandle::start(6881, false).await else {
eprintln!("LSD test skipped (port 6771 unavailable)");
return;
};
let start = std::time::Instant::now();
tokio::time::timeout(
std::time::Duration::from_secs(5),
handle.shutdown_and_wait(),
)
.await
.expect("shutdown_and_wait must complete within 5s");
assert!(
start.elapsed() < std::time::Duration::from_secs(5),
"shutdown_and_wait took {:?}",
start.elapsed()
);
}
#[tokio::test]
async fn lsd_shutdown_and_wait_releases_port_for_rebind() {
let Ok((handle, _peer_rx)) = LsdHandle::start(6881, false).await else {
eprintln!("LSD test skipped (port 6771 unavailable)");
return;
};
handle.shutdown_and_wait().await;
let rebind = LsdHandle::start(6882, false).await;
match rebind {
Ok((new_handle, _new_peer_rx)) => {
new_handle.shutdown_and_wait().await;
}
Err(e) => {
panic!(
"rebind on LSD port failed after shutdown_and_wait: {e}. \
This means the multicast socket was not dropped before \
the shutdown reply was acked."
);
}
}
}
}