1use std::collections::HashMap;
6use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
7use std::time::Instant;
8
9use tokio::net::UdpSocket;
10use tokio::sync::{mpsc, oneshot};
11use tracing::{debug, warn};
12
13use irontide_core::Id20;
14
15const LSD_MULTICAST: Ipv4Addr = Ipv4Addr::new(239, 192, 152, 143);
17const LSD_MULTICAST_V6: Ipv6Addr = Ipv6Addr::new(0xff15, 0, 0, 0, 0, 0, 0xefc0, 0x988f);
19const LSD_PORT: u16 = 6771;
20const MAX_PACKET_SIZE: usize = 1400;
22const ANNOUNCE_INTERVAL: std::time::Duration = std::time::Duration::from_mins(5);
24
25const LSD_HOST_V4: &str = "239.192.152.143:6771";
26const LSD_HOST_V6: &str = "[ff15::efc0:988f]:6771";
27
28pub(crate) fn format_announce(
33 info_hashes: &[Id20],
34 listen_port: u16,
35 host: &str,
36 cookie: [u8; 4],
37) -> Vec<Vec<u8>> {
38 if info_hashes.is_empty() {
39 return Vec::new();
40 }
41
42 let mut messages = Vec::new();
43 let mut current_hashes: Vec<&Id20> = Vec::new();
44
45 for ih in info_hashes {
46 let header_size = format!(
47 "BT-SEARCH * HTTP/1.1\r\nHost: {host}\r\nPort: {listen_port}\r\ncookie: 00000000\r\n"
48 )
49 .len();
50 let per_hash = 52; let footer = 2; let estimated = header_size + (current_hashes.len() + 1) * per_hash + footer;
53
54 if estimated > MAX_PACKET_SIZE && !current_hashes.is_empty() {
55 messages.push(build_message(¤t_hashes, listen_port, host, cookie));
56 current_hashes.clear();
57 }
58 current_hashes.push(ih);
59 }
60
61 if !current_hashes.is_empty() {
62 messages.push(build_message(¤t_hashes, listen_port, host, cookie));
63 }
64
65 messages
66}
67
68fn build_message(info_hashes: &[&Id20], listen_port: u16, host: &str, cookie: [u8; 4]) -> Vec<u8> {
69 use std::fmt::Write;
70 let mut msg = format!(
71 "BT-SEARCH * HTTP/1.1\r\nHost: {host}\r\nPort: {listen_port}\r\ncookie: {}\r\n",
72 hex::encode(cookie),
73 );
74 for ih in info_hashes {
75 let _ = write!(msg, "Infohash: {}\r\n", ih.to_hex());
76 }
77 msg.push_str("\r\n");
78 msg.into_bytes()
79}
80
81#[derive(Debug, Clone)]
83pub(crate) struct LsdAnnounce {
84 pub port: u16,
85 pub info_hashes: Vec<Id20>,
86 pub cookie: Option<[u8; 4]>,
87}
88
89pub(crate) fn parse_announce(data: &[u8]) -> Option<LsdAnnounce> {
93 let text = std::str::from_utf8(data).ok()?;
94
95 if !text.starts_with("BT-SEARCH * HTTP/1.1\r\n") {
96 return None;
97 }
98
99 let mut port: Option<u16> = None;
100 let mut info_hashes = Vec::new();
101 let mut cookie: Option<[u8; 4]> = None;
102
103 for line in text.split("\r\n") {
104 if let Some(value) = line.strip_prefix("Port: ") {
105 port = value.trim().parse().ok();
106 } else if let Some(value) = line.strip_prefix("Infohash: ")
107 && let Ok(ih) = Id20::from_hex(value.trim())
108 {
109 info_hashes.push(ih);
110 } else if let Some(value) = line.strip_prefix("cookie: ") {
111 let trimmed = value.trim();
112 if trimmed.len() == 8
113 && let Ok(bytes) = hex::decode(trimmed)
114 && bytes.len() == 4
115 {
116 cookie = Some([bytes[0], bytes[1], bytes[2], bytes[3]]);
117 }
118 }
119 }
120
121 let port = port?;
122 if info_hashes.is_empty() {
123 return None;
124 }
125
126 Some(LsdAnnounce {
127 port,
128 info_hashes,
129 cookie,
130 })
131}
132
133pub(crate) struct LsdRateLimiter {
135 last_announce: HashMap<Id20, Instant>,
136}
137
138impl LsdRateLimiter {
139 pub fn new() -> Self {
140 Self {
141 last_announce: HashMap::new(),
142 }
143 }
144
145 pub fn filter_eligible(&mut self, info_hashes: &[Id20]) -> Vec<Id20> {
147 let now = Instant::now();
148 let mut eligible = Vec::new();
149 for ih in info_hashes {
150 let can_announce = self
151 .last_announce
152 .get(ih)
153 .is_none_or(|t| now.duration_since(*t) >= ANNOUNCE_INTERVAL);
154 if can_announce {
155 eligible.push(*ih);
156 self.last_announce.insert(*ih, now);
157 }
158 }
159 eligible
160 }
161
162 pub fn multicast_addr() -> SocketAddr {
164 SocketAddr::V4(SocketAddrV4::new(LSD_MULTICAST, LSD_PORT))
165 }
166
167 pub fn multicast_addr_v6() -> SocketAddr {
168 SocketAddr::V6(SocketAddrV6::new(LSD_MULTICAST_V6, LSD_PORT, 0, 0))
169 }
170}
171
172pub(crate) enum LsdCommand {
177 Announce {
178 info_hashes: Vec<Id20>,
179 },
180 Shutdown {
186 reply: Option<oneshot::Sender<()>>,
187 },
188}
189
190#[derive(Clone)]
192pub struct LsdHandle {
193 cmd_tx: mpsc::Sender<LsdCommand>,
194}
195
196impl LsdHandle {
197 #[allow(clippy::unused_async)]
203 pub async fn start(
204 listen_port: u16,
205 enable_ipv6: bool,
206 ) -> std::io::Result<(Self, mpsc::Receiver<(Id20, SocketAddr)>)> {
207 use socket2::{Domain, Protocol, Socket, Type};
208
209 let sock4 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
211 sock4.set_reuse_address(true)?;
212 sock4.set_broadcast(true)?;
213 sock4.set_nonblocking(true)?;
214 sock4.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, LSD_PORT).into())?;
215 let std_sock: std::net::UdpSocket = sock4.into();
216 let socket = UdpSocket::from_std(std_sock)?;
217 socket.join_multicast_v4(LSD_MULTICAST, Ipv4Addr::UNSPECIFIED)?;
218
219 let socket_v6 = if enable_ipv6 {
221 match Self::bind_ipv6_socket() {
222 Ok(s) => Some(s),
223 Err(e) => {
224 warn!("LSD IPv6 unavailable: {e}");
225 None
226 }
227 }
228 } else {
229 None
230 };
231
232 let cookie = generate_cookie();
233
234 let (cmd_tx, cmd_rx) = mpsc::channel(64);
235 let (peer_tx, peer_rx) = mpsc::channel(256);
236
237 let actor = LsdActor {
238 socket,
239 socket_v6,
240 listen_port,
241 cookie,
242 rate_limiter: LsdRateLimiter::new(),
243 cmd_rx,
244 peer_tx,
245 };
246 tokio::spawn(actor.run());
247
248 Ok((Self { cmd_tx }, peer_rx))
249 }
250
251 fn bind_ipv6_socket() -> std::io::Result<UdpSocket> {
252 use socket2::{Domain, Protocol, Socket, Type};
253
254 let sock6 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
255 sock6.set_only_v6(true)?;
256 sock6.set_reuse_address(true)?;
257 sock6.set_nonblocking(true)?;
258 sock6.bind(&SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, LSD_PORT, 0, 0).into())?;
259 sock6.join_multicast_v6(&LSD_MULTICAST_V6, 0)?;
260 let std_sock: std::net::UdpSocket = sock6.into();
261 UdpSocket::from_std(std_sock)
262 }
263
264 pub async fn announce(&self, info_hashes: Vec<Id20>) {
266 let _ = self.cmd_tx.send(LsdCommand::Announce { info_hashes }).await;
267 }
268
269 pub async fn shutdown(&self) {
271 let _ = self.cmd_tx.send(LsdCommand::Shutdown { reply: None }).await;
272 }
273
274 #[allow(dead_code)]
277 pub async fn shutdown_and_wait(&self) {
278 let (reply_tx, reply_rx) = oneshot::channel();
279 if self
280 .cmd_tx
281 .send(LsdCommand::Shutdown {
282 reply: Some(reply_tx),
283 })
284 .await
285 .is_err()
286 {
287 return;
288 }
289 let _ = reply_rx.await;
290 }
291}
292
293fn generate_cookie() -> [u8; 4] {
294 #[allow(clippy::cast_possible_truncation)]
295 let mut seed = std::time::SystemTime::now()
296 .duration_since(std::time::UNIX_EPOCH)
297 .unwrap_or_default()
298 .as_nanos() as u64;
299 seed ^= seed << 13;
300 seed ^= seed >> 7;
301 seed ^= seed << 17;
302 let bytes = seed.to_le_bytes();
303 [bytes[0], bytes[1], bytes[2], bytes[3]]
304}
305
306struct LsdActor {
307 socket: UdpSocket,
308 socket_v6: Option<UdpSocket>,
309 listen_port: u16,
310 cookie: [u8; 4],
311 rate_limiter: LsdRateLimiter,
312 cmd_rx: mpsc::Receiver<LsdCommand>,
313 peer_tx: mpsc::Sender<(Id20, SocketAddr)>,
314}
315
316impl LsdActor {
317 async fn run(mut self) {
318 let mut buf = [0u8; MAX_PACKET_SIZE];
319 let mut buf_v6 = [0u8; MAX_PACKET_SIZE];
320 let mut shutdown_reply: Option<oneshot::Sender<()>> = None;
321
322 loop {
323 tokio::select! {
324 cmd = self.cmd_rx.recv() => {
325 match cmd {
326 Some(LsdCommand::Announce { info_hashes }) => {
327 self.do_announce(&info_hashes).await;
328 }
329 Some(LsdCommand::Shutdown { reply }) => {
330 shutdown_reply = reply;
331 break;
332 }
333 None => break,
334 }
335 }
336 result = self.socket.recv_from(&mut buf) => {
337 if let Ok((len, src)) = result {
338 self.handle_incoming(&buf[..len], src).await;
339 }
340 }
341 result = async {
342 match &self.socket_v6 {
343 Some(s) => s.recv_from(&mut buf_v6).await,
344 None => std::future::pending().await,
345 }
346 } => {
347 if let Ok((len, src)) = result {
348 self.handle_incoming(&buf_v6[..len], src).await;
349 }
350 }
351 }
352 }
353
354 std::mem::drop(self);
355 if let Some(tx) = shutdown_reply {
356 let _ = tx.send(());
357 }
358 }
359
360 async fn do_announce(&mut self, info_hashes: &[Id20]) {
361 let eligible = self.rate_limiter.filter_eligible(info_hashes);
362 if eligible.is_empty() {
363 return;
364 }
365
366 let messages = format_announce(&eligible, self.listen_port, LSD_HOST_V4, self.cookie);
367 let dest = LsdRateLimiter::multicast_addr();
368 for msg in &messages {
369 if let Err(e) = self.socket.send_to(msg, dest).await {
370 warn!("LSD IPv4 send failed: {e}");
371 break;
372 }
373 }
374
375 if let Some(ref sock_v6) = self.socket_v6 {
376 let messages_v6 =
377 format_announce(&eligible, self.listen_port, LSD_HOST_V6, self.cookie);
378 let dest_v6 = LsdRateLimiter::multicast_addr_v6();
379 for msg in &messages_v6 {
380 if let Err(e) = sock_v6.send_to(msg, dest_v6).await {
381 warn!("LSD IPv6 send failed: {e}");
382 break;
383 }
384 }
385 }
386
387 debug!(count = eligible.len(), "LSD announce sent");
388 }
389
390 async fn handle_incoming(&self, data: &[u8], src: SocketAddr) {
391 let Some(announce) = parse_announce(data) else {
392 return;
393 };
394 if announce.cookie == Some(self.cookie) {
395 return;
396 }
397 let peer_addr = SocketAddr::new(src.ip(), announce.port);
398 for ih in announce.info_hashes {
399 if self.peer_tx.send((ih, peer_addr)).await.is_err() {
400 return;
401 }
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 fn test_hash() -> Id20 {
411 Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap()
412 }
413
414 fn test_hash2() -> Id20 {
415 Id20::from_hex("0102030405060708091011121314151617181920").unwrap()
416 }
417
418 fn test_cookie() -> [u8; 4] {
419 [0xDE, 0xAD, 0xBE, 0xEF]
420 }
421
422 #[test]
423 fn format_single_announce() {
424 let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V4, test_cookie());
425 assert_eq!(msgs.len(), 1);
426 let text = String::from_utf8(msgs[0].clone()).unwrap();
427 assert!(text.starts_with("BT-SEARCH * HTTP/1.1\r\n"));
428 assert!(text.contains("Host: 239.192.152.143:6771\r\n"));
429 assert!(text.contains("Port: 6881\r\n"));
430 assert!(text.contains("cookie: deadbeef\r\n"));
431 assert!(text.contains(&format!("Infohash: {}\r\n", test_hash().to_hex())));
432 assert!(text.ends_with("\r\n\r\n"));
433 }
434
435 #[test]
436 fn format_batch_announce() {
437 let hashes = vec![test_hash(), test_hash2()];
438 let msgs = format_announce(&hashes, 6881, LSD_HOST_V4, test_cookie());
439 assert_eq!(msgs.len(), 1);
440 let text = String::from_utf8(msgs[0].clone()).unwrap();
441 assert!(text.contains(&format!("Infohash: {}\r\n", test_hash().to_hex())));
442 assert!(text.contains(&format!("Infohash: {}\r\n", test_hash2().to_hex())));
443 }
444
445 #[test]
446 fn format_empty() {
447 let msgs = format_announce(&[], 6881, LSD_HOST_V4, test_cookie());
448 assert!(msgs.is_empty());
449 }
450
451 #[test]
452 fn parse_valid_announce() {
453 let msg = format!(
454 "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\nInfohash: {}\r\n\r\n",
455 test_hash().to_hex()
456 );
457 let parsed = parse_announce(msg.as_bytes()).unwrap();
458 assert_eq!(parsed.port, 6881);
459 assert_eq!(parsed.info_hashes.len(), 1);
460 assert_eq!(parsed.info_hashes[0], test_hash());
461 assert_eq!(parsed.cookie, None);
462 }
463
464 #[test]
465 fn parse_multiple_infohashes() {
466 let msg = format!(
467 "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 9999\r\nInfohash: {}\r\nInfohash: {}\r\n\r\n",
468 test_hash().to_hex(),
469 test_hash2().to_hex()
470 );
471 let parsed = parse_announce(msg.as_bytes()).unwrap();
472 assert_eq!(parsed.port, 9999);
473 assert_eq!(parsed.info_hashes.len(), 2);
474 }
475
476 #[test]
477 fn parse_invalid_no_port() {
478 let msg = format!(
479 "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nInfohash: {}\r\n\r\n",
480 test_hash().to_hex()
481 );
482 assert!(parse_announce(msg.as_bytes()).is_none());
483 }
484
485 #[test]
486 fn parse_invalid_not_bt_search() {
487 let msg = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
488 assert!(parse_announce(msg).is_none());
489 }
490
491 #[test]
492 fn rate_limiter_first_announce_allowed() {
493 let mut limiter = LsdRateLimiter::new();
494 let eligible = limiter.filter_eligible(&[test_hash()]);
495 assert_eq!(eligible.len(), 1);
496 }
497
498 #[test]
499 fn rate_limiter_immediate_reannounce_blocked() {
500 let mut limiter = LsdRateLimiter::new();
501 let _ = limiter.filter_eligible(&[test_hash()]);
502 let eligible = limiter.filter_eligible(&[test_hash()]);
503 assert!(eligible.is_empty());
504 }
505
506 #[test]
507 fn build_message_ipv4_with_cookie() {
508 let ih = test_hash();
509 let msg = build_message(&[&ih], 6881, LSD_HOST_V4, test_cookie());
510 let text = String::from_utf8(msg).unwrap();
511 assert!(text.contains("Host: 239.192.152.143:6771\r\n"));
512 assert!(text.contains("Port: 6881\r\n"));
513 assert!(text.contains("cookie: deadbeef\r\n"));
514 }
515
516 #[test]
517 fn build_message_ipv6_with_cookie() {
518 let ih = test_hash();
519 let msg = build_message(&[&ih], 6881, LSD_HOST_V6, test_cookie());
520 let text = String::from_utf8(msg).unwrap();
521 assert!(text.contains("Host: [ff15::efc0:988f]:6771\r\n"));
522 assert!(text.contains("cookie: deadbeef\r\n"));
523 }
524
525 #[test]
526 fn format_announce_includes_cookie() {
527 let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V4, test_cookie());
528 let text = String::from_utf8(msgs[0].clone()).unwrap();
529 assert!(text.contains("cookie: deadbeef\r\n"));
530 }
531
532 #[test]
533 fn parse_announce_extracts_cookie() {
534 let msg = format!(
535 "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\ncookie: deadbeef\r\nInfohash: {}\r\n\r\n",
536 test_hash().to_hex()
537 );
538 let parsed = parse_announce(msg.as_bytes()).unwrap();
539 assert_eq!(parsed.cookie, Some([0xDE, 0xAD, 0xBE, 0xEF]));
540 }
541
542 #[test]
543 fn parse_announce_no_cookie_backward_compat() {
544 let msg = format!(
545 "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\nInfohash: {}\r\n\r\n",
546 test_hash().to_hex()
547 );
548 let parsed = parse_announce(msg.as_bytes()).unwrap();
549 assert_eq!(parsed.cookie, None);
550 assert_eq!(parsed.port, 6881);
551 }
552
553 #[test]
554 fn actor_drops_own_cookie() {
555 let cookie = test_cookie();
556 let msg = format!(
557 "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\ncookie: deadbeef\r\nInfohash: {}\r\n\r\n",
558 test_hash().to_hex()
559 );
560 let parsed = parse_announce(msg.as_bytes()).unwrap();
561 assert!(parsed.cookie == Some(cookie));
562 }
563
564 #[test]
565 fn format_announce_ipv6_host() {
566 let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V6, test_cookie());
567 assert_eq!(msgs.len(), 1);
568 let text = String::from_utf8(msgs[0].clone()).unwrap();
569 assert!(text.contains("Host: [ff15::efc0:988f]:6771\r\n"));
570 }
571
572 #[test]
573 fn parse_announce_ipv6_host() {
574 let msg = format!(
575 "BT-SEARCH * HTTP/1.1\r\nHost: [ff15::efc0:988f]:6771\r\nPort: 7000\r\ncookie: 01020304\r\nInfohash: {}\r\n\r\n",
576 test_hash().to_hex()
577 );
578 let parsed = parse_announce(msg.as_bytes()).unwrap();
579 assert_eq!(parsed.port, 7000);
580 assert_eq!(parsed.cookie, Some([0x01, 0x02, 0x03, 0x04]));
581 }
582
583 #[tokio::test]
584 async fn lsd_actor_start_and_shutdown() {
585 let result = LsdHandle::start(6881, false).await;
586 match result {
587 Ok((handle, _peer_rx)) => {
588 handle.announce(vec![test_hash()]).await;
589 handle.shutdown().await;
590 }
591 Err(e) => {
592 eprintln!("LSD actor test skipped (port unavailable): {e}");
593 }
594 }
595 }
596
597 #[tokio::test]
598 async fn lsd_actor_starts_with_ipv6_enabled() {
599 let result = LsdHandle::start(6881, true).await;
600 match result {
601 Ok((handle, _peer_rx)) => {
602 handle.announce(vec![test_hash()]).await;
603 handle.shutdown().await;
604 }
605 Err(e) => {
606 eprintln!("LSD actor IPv6 test skipped (port unavailable): {e}");
607 }
608 }
609 }
610
611 #[tokio::test]
612 async fn lsd_actor_starts_without_ipv6() {
613 let result = LsdHandle::start(6881, false).await;
614 match result {
615 Ok((handle, _peer_rx)) => {
616 handle.shutdown().await;
617 }
618 Err(e) => {
619 eprintln!("LSD actor test skipped (port unavailable): {e}");
620 }
621 }
622 }
623
624 #[tokio::test]
625 async fn lsd_shutdown_and_wait_returns_promptly() {
626 let Ok((handle, _peer_rx)) = LsdHandle::start(6881, false).await else {
627 eprintln!("LSD test skipped (port 6771 unavailable)");
628 return;
629 };
630
631 let start = std::time::Instant::now();
632 tokio::time::timeout(
633 std::time::Duration::from_secs(5),
634 handle.shutdown_and_wait(),
635 )
636 .await
637 .expect("shutdown_and_wait must complete within 5s");
638 assert!(
639 start.elapsed() < std::time::Duration::from_secs(5),
640 "shutdown_and_wait took {:?}",
641 start.elapsed()
642 );
643 }
644
645 #[tokio::test]
646 async fn lsd_shutdown_and_wait_releases_port_for_rebind() {
647 let Ok((handle, _peer_rx)) = LsdHandle::start(6881, false).await else {
648 eprintln!("LSD test skipped (port 6771 unavailable)");
649 return;
650 };
651
652 handle.shutdown_and_wait().await;
653
654 let rebind = LsdHandle::start(6882, false).await;
655 match rebind {
656 Ok((new_handle, _new_peer_rx)) => {
657 new_handle.shutdown_and_wait().await;
658 }
659 Err(e) => {
660 panic!(
661 "rebind on LSD port failed after shutdown_and_wait: {e}. \
662 This means the multicast socket was not dropped before \
663 the shutdown reply was acked."
664 );
665 }
666 }
667 }
668}