Skip to main content

irontide_session_core/
lsd.rs

1//! BEP 14 Local Peer Discovery (LSD).
2//!
3//! UDP multicast announcer for discovering peers on the local network.
4
5use std::collections::HashMap;
6use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
7use std::time::Instant;
8
9use tokio::net::UdpSocket;
10use tokio::sync::{mpsc, oneshot};
11use tracing::{debug, warn};
12
13use irontide_core::Id20;
14
15/// Multicast group for LSD (BEP 14).
16const LSD_MULTICAST: Ipv4Addr = Ipv4Addr::new(239, 192, 152, 143);
17/// IPv6 multicast group for LSD (BEP 14).
18const LSD_MULTICAST_V6: Ipv6Addr = Ipv6Addr::new(0xff15, 0, 0, 0, 0, 0, 0xefc0, 0x988f);
19const LSD_PORT: u16 = 6771;
20/// Maximum LSD packet size.
21const MAX_PACKET_SIZE: usize = 1400;
22/// Minimum announce interval per torrent.
23const ANNOUNCE_INTERVAL: std::time::Duration = std::time::Duration::from_mins(5);
24
25const LSD_HOST_V4: &str = "239.192.152.143:6771";
26const LSD_HOST_V6: &str = "[ff15::efc0:988f]:6771";
27
28/// Format an LSD announce message for the given info hashes and listen port.
29///
30/// Returns empty Vec if no info hashes are provided.
31/// Batches multiple info hashes per message, respecting the MTU limit.
32pub(crate) fn format_announce(
33    info_hashes: &[Id20],
34    listen_port: u16,
35    host: &str,
36    cookie: [u8; 4],
37) -> Vec<Vec<u8>> {
38    if info_hashes.is_empty() {
39        return Vec::new();
40    }
41
42    let mut messages = Vec::new();
43    let mut current_hashes: Vec<&Id20> = Vec::new();
44
45    for ih in info_hashes {
46        let header_size = format!(
47            "BT-SEARCH * HTTP/1.1\r\nHost: {host}\r\nPort: {listen_port}\r\ncookie: 00000000\r\n"
48        )
49        .len();
50        let per_hash = 52; // "Infohash: " + 40 hex + "\r\n"
51        let footer = 2; // final "\r\n"
52        let estimated = header_size + (current_hashes.len() + 1) * per_hash + footer;
53
54        if estimated > MAX_PACKET_SIZE && !current_hashes.is_empty() {
55            messages.push(build_message(&current_hashes, listen_port, host, cookie));
56            current_hashes.clear();
57        }
58        current_hashes.push(ih);
59    }
60
61    if !current_hashes.is_empty() {
62        messages.push(build_message(&current_hashes, listen_port, host, cookie));
63    }
64
65    messages
66}
67
68fn build_message(info_hashes: &[&Id20], listen_port: u16, host: &str, cookie: [u8; 4]) -> Vec<u8> {
69    use std::fmt::Write;
70    let mut msg = format!(
71        "BT-SEARCH * HTTP/1.1\r\nHost: {host}\r\nPort: {listen_port}\r\ncookie: {}\r\n",
72        hex::encode(cookie),
73    );
74    for ih in info_hashes {
75        let _ = write!(msg, "Infohash: {}\r\n", ih.to_hex());
76    }
77    msg.push_str("\r\n");
78    msg.into_bytes()
79}
80
81/// Parsed LSD announce from a remote peer.
82#[derive(Debug, Clone)]
83pub(crate) struct LsdAnnounce {
84    pub port: u16,
85    pub info_hashes: Vec<Id20>,
86    pub cookie: Option<[u8; 4]>,
87}
88
89/// Parse an incoming LSD message.
90///
91/// Returns `None` if the message is malformed.
92pub(crate) fn parse_announce(data: &[u8]) -> Option<LsdAnnounce> {
93    let text = std::str::from_utf8(data).ok()?;
94
95    if !text.starts_with("BT-SEARCH * HTTP/1.1\r\n") {
96        return None;
97    }
98
99    let mut port: Option<u16> = None;
100    let mut info_hashes = Vec::new();
101    let mut cookie: Option<[u8; 4]> = None;
102
103    for line in text.split("\r\n") {
104        if let Some(value) = line.strip_prefix("Port: ") {
105            port = value.trim().parse().ok();
106        } else if let Some(value) = line.strip_prefix("Infohash: ")
107            && let Ok(ih) = Id20::from_hex(value.trim())
108        {
109            info_hashes.push(ih);
110        } else if let Some(value) = line.strip_prefix("cookie: ") {
111            let trimmed = value.trim();
112            if trimmed.len() == 8
113                && let Ok(bytes) = hex::decode(trimmed)
114                && bytes.len() == 4
115            {
116                cookie = Some([bytes[0], bytes[1], bytes[2], bytes[3]]);
117            }
118        }
119    }
120
121    let port = port?;
122    if info_hashes.is_empty() {
123        return None;
124    }
125
126    Some(LsdAnnounce {
127        port,
128        info_hashes,
129        cookie,
130    })
131}
132
133/// Tracks LSD announce rate limiting per `info_hash`.
134pub(crate) struct LsdRateLimiter {
135    last_announce: HashMap<Id20, Instant>,
136}
137
138impl LsdRateLimiter {
139    pub fn new() -> Self {
140        Self {
141            last_announce: HashMap::new(),
142        }
143    }
144
145    /// Filter info hashes to only those eligible for announce (respecting rate limit).
146    pub fn filter_eligible(&mut self, info_hashes: &[Id20]) -> Vec<Id20> {
147        let now = Instant::now();
148        let mut eligible = Vec::new();
149        for ih in info_hashes {
150            let can_announce = self
151                .last_announce
152                .get(ih)
153                .is_none_or(|t| now.duration_since(*t) >= ANNOUNCE_INTERVAL);
154            if can_announce {
155                eligible.push(*ih);
156                self.last_announce.insert(*ih, now);
157            }
158        }
159        eligible
160    }
161
162    /// Get the multicast address for sending.
163    pub fn multicast_addr() -> SocketAddr {
164        SocketAddr::V4(SocketAddrV4::new(LSD_MULTICAST, LSD_PORT))
165    }
166
167    pub fn multicast_addr_v6() -> SocketAddr {
168        SocketAddr::V6(SocketAddrV6::new(LSD_MULTICAST_V6, LSD_PORT, 0, 0))
169    }
170}
171
172// ---------------------------------------------------------------------------
173// LSD Actor (BEP 14 UDP multicast)
174// ---------------------------------------------------------------------------
175
176pub(crate) enum LsdCommand {
177    Announce {
178        info_hashes: Vec<Id20>,
179    },
180    /// M173 Lane B (B9): optional reply lets the `apply_settings`
181    /// LSD-stop phase block until the multicast socket has been
182    /// fully dropped. Multicast group memberships do NOT release
183    /// on process exit — they rely on the `socket` field being
184    /// dropped, which happens only when the actor's `run` returns.
185    Shutdown {
186        reply: Option<oneshot::Sender<()>>,
187    },
188}
189
190/// Handle to the running Local Service Discovery (BEP 14) actor.
191#[derive(Clone)]
192pub struct LsdHandle {
193    cmd_tx: mpsc::Sender<LsdCommand>,
194}
195
196impl LsdHandle {
197    /// Start the LSD actor, binding to the multicast port.
198    ///
199    /// # Errors
200    /// Returns an [`std::io::Error`] if the IPv4 multicast socket cannot be
201    /// created, bound, or joined to the LSD multicast group.
202    #[allow(clippy::unused_async)]
203    pub async fn start(
204        listen_port: u16,
205        enable_ipv6: bool,
206    ) -> std::io::Result<(Self, mpsc::Receiver<(Id20, SocketAddr)>)> {
207        use socket2::{Domain, Protocol, Socket, Type};
208
209        // IPv4 socket with SO_REUSEADDR
210        let sock4 = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
211        sock4.set_reuse_address(true)?;
212        sock4.set_broadcast(true)?;
213        sock4.set_nonblocking(true)?;
214        sock4.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, LSD_PORT).into())?;
215        let std_sock: std::net::UdpSocket = sock4.into();
216        let socket = UdpSocket::from_std(std_sock)?;
217        socket.join_multicast_v4(LSD_MULTICAST, Ipv4Addr::UNSPECIFIED)?;
218
219        // IPv6 socket (optional)
220        let socket_v6 = if enable_ipv6 {
221            match Self::bind_ipv6_socket() {
222                Ok(s) => Some(s),
223                Err(e) => {
224                    warn!("LSD IPv6 unavailable: {e}");
225                    None
226                }
227            }
228        } else {
229            None
230        };
231
232        let cookie = generate_cookie();
233
234        let (cmd_tx, cmd_rx) = mpsc::channel(64);
235        let (peer_tx, peer_rx) = mpsc::channel(256);
236
237        let actor = LsdActor {
238            socket,
239            socket_v6,
240            listen_port,
241            cookie,
242            rate_limiter: LsdRateLimiter::new(),
243            cmd_rx,
244            peer_tx,
245        };
246        tokio::spawn(actor.run());
247
248        Ok((Self { cmd_tx }, peer_rx))
249    }
250
251    fn bind_ipv6_socket() -> std::io::Result<UdpSocket> {
252        use socket2::{Domain, Protocol, Socket, Type};
253
254        let sock6 = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
255        sock6.set_only_v6(true)?;
256        sock6.set_reuse_address(true)?;
257        sock6.set_nonblocking(true)?;
258        sock6.bind(&SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, LSD_PORT, 0, 0).into())?;
259        sock6.join_multicast_v6(&LSD_MULTICAST_V6, 0)?;
260        let std_sock: std::net::UdpSocket = sock6.into();
261        UdpSocket::from_std(std_sock)
262    }
263
264    /// Announce the given info hashes to the local network via LSD multicast.
265    pub async fn announce(&self, info_hashes: Vec<Id20>) {
266        let _ = self.cmd_tx.send(LsdCommand::Announce { info_hashes }).await;
267    }
268
269    /// Fire-and-forget shutdown.
270    pub async fn shutdown(&self) {
271        let _ = self.cmd_tx.send(LsdCommand::Shutdown { reply: None }).await;
272    }
273
274    /// M173 Lane B (B9): shut down the LSD actor and wait for the
275    /// multicast UDP socket to be released.
276    #[allow(dead_code)]
277    pub async fn shutdown_and_wait(&self) {
278        let (reply_tx, reply_rx) = oneshot::channel();
279        if self
280            .cmd_tx
281            .send(LsdCommand::Shutdown {
282                reply: Some(reply_tx),
283            })
284            .await
285            .is_err()
286        {
287            return;
288        }
289        let _ = reply_rx.await;
290    }
291}
292
293fn generate_cookie() -> [u8; 4] {
294    #[allow(clippy::cast_possible_truncation)]
295    let mut seed = std::time::SystemTime::now()
296        .duration_since(std::time::UNIX_EPOCH)
297        .unwrap_or_default()
298        .as_nanos() as u64;
299    seed ^= seed << 13;
300    seed ^= seed >> 7;
301    seed ^= seed << 17;
302    let bytes = seed.to_le_bytes();
303    [bytes[0], bytes[1], bytes[2], bytes[3]]
304}
305
306struct LsdActor {
307    socket: UdpSocket,
308    socket_v6: Option<UdpSocket>,
309    listen_port: u16,
310    cookie: [u8; 4],
311    rate_limiter: LsdRateLimiter,
312    cmd_rx: mpsc::Receiver<LsdCommand>,
313    peer_tx: mpsc::Sender<(Id20, SocketAddr)>,
314}
315
316impl LsdActor {
317    async fn run(mut self) {
318        let mut buf = [0u8; MAX_PACKET_SIZE];
319        let mut buf_v6 = [0u8; MAX_PACKET_SIZE];
320        let mut shutdown_reply: Option<oneshot::Sender<()>> = None;
321
322        loop {
323            tokio::select! {
324                cmd = self.cmd_rx.recv() => {
325                    match cmd {
326                        Some(LsdCommand::Announce { info_hashes }) => {
327                            self.do_announce(&info_hashes).await;
328                        }
329                        Some(LsdCommand::Shutdown { reply }) => {
330                            shutdown_reply = reply;
331                            break;
332                        }
333                        None => break,
334                    }
335                }
336                result = self.socket.recv_from(&mut buf) => {
337                    if let Ok((len, src)) = result {
338                        self.handle_incoming(&buf[..len], src).await;
339                    }
340                }
341                result = async {
342                    match &self.socket_v6 {
343                        Some(s) => s.recv_from(&mut buf_v6).await,
344                        None => std::future::pending().await,
345                    }
346                } => {
347                    if let Ok((len, src)) = result {
348                        self.handle_incoming(&buf_v6[..len], src).await;
349                    }
350                }
351            }
352        }
353
354        std::mem::drop(self);
355        if let Some(tx) = shutdown_reply {
356            let _ = tx.send(());
357        }
358    }
359
360    async fn do_announce(&mut self, info_hashes: &[Id20]) {
361        let eligible = self.rate_limiter.filter_eligible(info_hashes);
362        if eligible.is_empty() {
363            return;
364        }
365
366        let messages = format_announce(&eligible, self.listen_port, LSD_HOST_V4, self.cookie);
367        let dest = LsdRateLimiter::multicast_addr();
368        for msg in &messages {
369            if let Err(e) = self.socket.send_to(msg, dest).await {
370                warn!("LSD IPv4 send failed: {e}");
371                break;
372            }
373        }
374
375        if let Some(ref sock_v6) = self.socket_v6 {
376            let messages_v6 =
377                format_announce(&eligible, self.listen_port, LSD_HOST_V6, self.cookie);
378            let dest_v6 = LsdRateLimiter::multicast_addr_v6();
379            for msg in &messages_v6 {
380                if let Err(e) = sock_v6.send_to(msg, dest_v6).await {
381                    warn!("LSD IPv6 send failed: {e}");
382                    break;
383                }
384            }
385        }
386
387        debug!(count = eligible.len(), "LSD announce sent");
388    }
389
390    async fn handle_incoming(&self, data: &[u8], src: SocketAddr) {
391        let Some(announce) = parse_announce(data) else {
392            return;
393        };
394        if announce.cookie == Some(self.cookie) {
395            return;
396        }
397        let peer_addr = SocketAddr::new(src.ip(), announce.port);
398        for ih in announce.info_hashes {
399            if self.peer_tx.send((ih, peer_addr)).await.is_err() {
400                return;
401            }
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    fn test_hash() -> Id20 {
411        Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap()
412    }
413
414    fn test_hash2() -> Id20 {
415        Id20::from_hex("0102030405060708091011121314151617181920").unwrap()
416    }
417
418    fn test_cookie() -> [u8; 4] {
419        [0xDE, 0xAD, 0xBE, 0xEF]
420    }
421
422    #[test]
423    fn format_single_announce() {
424        let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V4, test_cookie());
425        assert_eq!(msgs.len(), 1);
426        let text = String::from_utf8(msgs[0].clone()).unwrap();
427        assert!(text.starts_with("BT-SEARCH * HTTP/1.1\r\n"));
428        assert!(text.contains("Host: 239.192.152.143:6771\r\n"));
429        assert!(text.contains("Port: 6881\r\n"));
430        assert!(text.contains("cookie: deadbeef\r\n"));
431        assert!(text.contains(&format!("Infohash: {}\r\n", test_hash().to_hex())));
432        assert!(text.ends_with("\r\n\r\n"));
433    }
434
435    #[test]
436    fn format_batch_announce() {
437        let hashes = vec![test_hash(), test_hash2()];
438        let msgs = format_announce(&hashes, 6881, LSD_HOST_V4, test_cookie());
439        assert_eq!(msgs.len(), 1);
440        let text = String::from_utf8(msgs[0].clone()).unwrap();
441        assert!(text.contains(&format!("Infohash: {}\r\n", test_hash().to_hex())));
442        assert!(text.contains(&format!("Infohash: {}\r\n", test_hash2().to_hex())));
443    }
444
445    #[test]
446    fn format_empty() {
447        let msgs = format_announce(&[], 6881, LSD_HOST_V4, test_cookie());
448        assert!(msgs.is_empty());
449    }
450
451    #[test]
452    fn parse_valid_announce() {
453        let msg = format!(
454            "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\nInfohash: {}\r\n\r\n",
455            test_hash().to_hex()
456        );
457        let parsed = parse_announce(msg.as_bytes()).unwrap();
458        assert_eq!(parsed.port, 6881);
459        assert_eq!(parsed.info_hashes.len(), 1);
460        assert_eq!(parsed.info_hashes[0], test_hash());
461        assert_eq!(parsed.cookie, None);
462    }
463
464    #[test]
465    fn parse_multiple_infohashes() {
466        let msg = format!(
467            "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 9999\r\nInfohash: {}\r\nInfohash: {}\r\n\r\n",
468            test_hash().to_hex(),
469            test_hash2().to_hex()
470        );
471        let parsed = parse_announce(msg.as_bytes()).unwrap();
472        assert_eq!(parsed.port, 9999);
473        assert_eq!(parsed.info_hashes.len(), 2);
474    }
475
476    #[test]
477    fn parse_invalid_no_port() {
478        let msg = format!(
479            "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nInfohash: {}\r\n\r\n",
480            test_hash().to_hex()
481        );
482        assert!(parse_announce(msg.as_bytes()).is_none());
483    }
484
485    #[test]
486    fn parse_invalid_not_bt_search() {
487        let msg = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
488        assert!(parse_announce(msg).is_none());
489    }
490
491    #[test]
492    fn rate_limiter_first_announce_allowed() {
493        let mut limiter = LsdRateLimiter::new();
494        let eligible = limiter.filter_eligible(&[test_hash()]);
495        assert_eq!(eligible.len(), 1);
496    }
497
498    #[test]
499    fn rate_limiter_immediate_reannounce_blocked() {
500        let mut limiter = LsdRateLimiter::new();
501        let _ = limiter.filter_eligible(&[test_hash()]);
502        let eligible = limiter.filter_eligible(&[test_hash()]);
503        assert!(eligible.is_empty());
504    }
505
506    #[test]
507    fn build_message_ipv4_with_cookie() {
508        let ih = test_hash();
509        let msg = build_message(&[&ih], 6881, LSD_HOST_V4, test_cookie());
510        let text = String::from_utf8(msg).unwrap();
511        assert!(text.contains("Host: 239.192.152.143:6771\r\n"));
512        assert!(text.contains("Port: 6881\r\n"));
513        assert!(text.contains("cookie: deadbeef\r\n"));
514    }
515
516    #[test]
517    fn build_message_ipv6_with_cookie() {
518        let ih = test_hash();
519        let msg = build_message(&[&ih], 6881, LSD_HOST_V6, test_cookie());
520        let text = String::from_utf8(msg).unwrap();
521        assert!(text.contains("Host: [ff15::efc0:988f]:6771\r\n"));
522        assert!(text.contains("cookie: deadbeef\r\n"));
523    }
524
525    #[test]
526    fn format_announce_includes_cookie() {
527        let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V4, test_cookie());
528        let text = String::from_utf8(msgs[0].clone()).unwrap();
529        assert!(text.contains("cookie: deadbeef\r\n"));
530    }
531
532    #[test]
533    fn parse_announce_extracts_cookie() {
534        let msg = format!(
535            "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\ncookie: deadbeef\r\nInfohash: {}\r\n\r\n",
536            test_hash().to_hex()
537        );
538        let parsed = parse_announce(msg.as_bytes()).unwrap();
539        assert_eq!(parsed.cookie, Some([0xDE, 0xAD, 0xBE, 0xEF]));
540    }
541
542    #[test]
543    fn parse_announce_no_cookie_backward_compat() {
544        let msg = format!(
545            "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\nInfohash: {}\r\n\r\n",
546            test_hash().to_hex()
547        );
548        let parsed = parse_announce(msg.as_bytes()).unwrap();
549        assert_eq!(parsed.cookie, None);
550        assert_eq!(parsed.port, 6881);
551    }
552
553    #[test]
554    fn actor_drops_own_cookie() {
555        let cookie = test_cookie();
556        let msg = format!(
557            "BT-SEARCH * HTTP/1.1\r\nHost: 239.192.152.143:6771\r\nPort: 6881\r\ncookie: deadbeef\r\nInfohash: {}\r\n\r\n",
558            test_hash().to_hex()
559        );
560        let parsed = parse_announce(msg.as_bytes()).unwrap();
561        assert!(parsed.cookie == Some(cookie));
562    }
563
564    #[test]
565    fn format_announce_ipv6_host() {
566        let msgs = format_announce(&[test_hash()], 6881, LSD_HOST_V6, test_cookie());
567        assert_eq!(msgs.len(), 1);
568        let text = String::from_utf8(msgs[0].clone()).unwrap();
569        assert!(text.contains("Host: [ff15::efc0:988f]:6771\r\n"));
570    }
571
572    #[test]
573    fn parse_announce_ipv6_host() {
574        let msg = format!(
575            "BT-SEARCH * HTTP/1.1\r\nHost: [ff15::efc0:988f]:6771\r\nPort: 7000\r\ncookie: 01020304\r\nInfohash: {}\r\n\r\n",
576            test_hash().to_hex()
577        );
578        let parsed = parse_announce(msg.as_bytes()).unwrap();
579        assert_eq!(parsed.port, 7000);
580        assert_eq!(parsed.cookie, Some([0x01, 0x02, 0x03, 0x04]));
581    }
582
583    #[tokio::test]
584    async fn lsd_actor_start_and_shutdown() {
585        let result = LsdHandle::start(6881, false).await;
586        match result {
587            Ok((handle, _peer_rx)) => {
588                handle.announce(vec![test_hash()]).await;
589                handle.shutdown().await;
590            }
591            Err(e) => {
592                eprintln!("LSD actor test skipped (port unavailable): {e}");
593            }
594        }
595    }
596
597    #[tokio::test]
598    async fn lsd_actor_starts_with_ipv6_enabled() {
599        let result = LsdHandle::start(6881, true).await;
600        match result {
601            Ok((handle, _peer_rx)) => {
602                handle.announce(vec![test_hash()]).await;
603                handle.shutdown().await;
604            }
605            Err(e) => {
606                eprintln!("LSD actor IPv6 test skipped (port unavailable): {e}");
607            }
608        }
609    }
610
611    #[tokio::test]
612    async fn lsd_actor_starts_without_ipv6() {
613        let result = LsdHandle::start(6881, false).await;
614        match result {
615            Ok((handle, _peer_rx)) => {
616                handle.shutdown().await;
617            }
618            Err(e) => {
619                eprintln!("LSD actor test skipped (port unavailable): {e}");
620            }
621        }
622    }
623
624    #[tokio::test]
625    async fn lsd_shutdown_and_wait_returns_promptly() {
626        let Ok((handle, _peer_rx)) = LsdHandle::start(6881, false).await else {
627            eprintln!("LSD test skipped (port 6771 unavailable)");
628            return;
629        };
630
631        let start = std::time::Instant::now();
632        tokio::time::timeout(
633            std::time::Duration::from_secs(5),
634            handle.shutdown_and_wait(),
635        )
636        .await
637        .expect("shutdown_and_wait must complete within 5s");
638        assert!(
639            start.elapsed() < std::time::Duration::from_secs(5),
640            "shutdown_and_wait took {:?}",
641            start.elapsed()
642        );
643    }
644
645    #[tokio::test]
646    async fn lsd_shutdown_and_wait_releases_port_for_rebind() {
647        let Ok((handle, _peer_rx)) = LsdHandle::start(6881, false).await else {
648            eprintln!("LSD test skipped (port 6771 unavailable)");
649            return;
650        };
651
652        handle.shutdown_and_wait().await;
653
654        let rebind = LsdHandle::start(6882, false).await;
655        match rebind {
656            Ok((new_handle, _new_peer_rx)) => {
657                new_handle.shutdown_and_wait().await;
658            }
659            Err(e) => {
660                panic!(
661                    "rebind on LSD port failed after shutdown_and_wait: {e}. \
662                     This means the multicast socket was not dropped before \
663                     the shutdown reply was acked."
664                );
665            }
666        }
667    }
668}