Skip to main content

mcrx_core/
subscription.rs

1use crate::config::SubscriptionConfig;
2use crate::error::McrxError;
3#[cfg(feature = "metrics")]
4use crate::metrics::SubscriptionMetricsSnapshot;
5use crate::packet::{Packet, PacketWithMetadata};
6use crate::platform::{ReceiveSocket, recv_packet, recv_packet_with_metadata, socket_local_addr};
7use socket2::Socket;
8use std::net::SocketAddr;
9#[cfg(feature = "metrics")]
10use std::sync::Mutex;
11#[cfg(feature = "metrics")]
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13#[cfg(feature = "metrics")]
14use std::time::SystemTime;
15
16/// Identifies a subscription within a context.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct SubscriptionId(pub u64);
19
20/// Represents the lifecycle state of a subscription.
21///
22/// A subscription is always associated with a bound socket, but may or may not
23/// currently be joined to a multicast group.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SubscriptionState {
26    /// Socket is bound but multicast group is not joined.
27    Bound,
28    /// Multicast group is joined.
29    Joined,
30}
31
32/// Owned subscription state that can be extracted from a context and moved into
33/// another event loop or runtime.
34#[derive(Debug)]
35pub struct SubscriptionParts {
36    /// The subscription's ID inside the originating context.
37    pub id: SubscriptionId,
38    /// The multicast configuration associated with the socket.
39    pub config: SubscriptionConfig,
40    /// The owned socket for external integration.
41    pub socket: Socket,
42    /// The lifecycle state at the time of extraction.
43    pub state: SubscriptionState,
44}
45
46#[cfg(feature = "metrics")]
47#[derive(Debug)]
48struct SubscriptionMetricsInner {
49    packets_received: AtomicU64,
50    bytes_received: AtomicU64,
51    would_block_count: AtomicU64,
52    receive_errors: AtomicU64,
53    join_count: AtomicU64,
54    leave_count: AtomicU64,
55    last_payload_len: AtomicUsize,
56    last_source: Mutex<Option<SocketAddr>>,
57    last_receive_at: Mutex<Option<SystemTime>>,
58}
59
60#[cfg(feature = "metrics")]
61impl Default for SubscriptionMetricsInner {
62    fn default() -> Self {
63        Self {
64            packets_received: AtomicU64::new(0),
65            bytes_received: AtomicU64::new(0),
66            would_block_count: AtomicU64::new(0),
67            receive_errors: AtomicU64::new(0),
68            join_count: AtomicU64::new(0),
69            leave_count: AtomicU64::new(0),
70            last_payload_len: AtomicUsize::new(usize::MAX),
71            last_source: Mutex::new(None),
72            last_receive_at: Mutex::new(None),
73        }
74    }
75}
76
77/// Represents a registered subscription stored inside a context.
78#[derive(Debug)]
79pub struct Subscription {
80    id: SubscriptionId,
81    config: SubscriptionConfig,
82    socket: ReceiveSocket,
83    state: SubscriptionState,
84    #[cfg(feature = "metrics")]
85    metrics: SubscriptionMetricsInner,
86}
87
88impl Subscription {
89    #[cfg(feature = "metrics")]
90    fn lock_unpoisoned<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
91        match mutex.lock() {
92            Ok(guard) => guard,
93            Err(poisoned) => poisoned.into_inner(),
94        }
95    }
96
97    fn with_socket(id: SubscriptionId, config: SubscriptionConfig, socket: ReceiveSocket) -> Self {
98        Self {
99            id,
100            config,
101            socket,
102            state: SubscriptionState::Bound,
103            #[cfg(feature = "metrics")]
104            metrics: SubscriptionMetricsInner::default(),
105        }
106    }
107
108    #[cfg(feature = "metrics")]
109    fn record_received_packet(&self, packet: &Packet) {
110        self.metrics
111            .packets_received
112            .fetch_add(1, Ordering::Relaxed);
113        self.metrics
114            .bytes_received
115            .fetch_add(packet.payload.len() as u64, Ordering::Relaxed);
116        self.metrics
117            .last_payload_len
118            .store(packet.payload.len(), Ordering::Relaxed);
119        *Self::lock_unpoisoned(&self.metrics.last_source) = Some(packet.source);
120        *Self::lock_unpoisoned(&self.metrics.last_receive_at) = Some(SystemTime::now());
121    }
122
123    #[cfg(feature = "metrics")]
124    fn record_would_block(&self) {
125        self.metrics
126            .would_block_count
127            .fetch_add(1, Ordering::Relaxed);
128    }
129
130    #[cfg(feature = "metrics")]
131    fn record_receive_error(&self) {
132        self.metrics.receive_errors.fetch_add(1, Ordering::Relaxed);
133    }
134
135    /// Creates a new subscription from an ID, configuration, and socket.
136    ///
137    /// This is a low-level constructor. Callers are responsible for providing a
138    /// socket that is compatible with the subscription configuration. For the
139    /// checked convenience path, prefer `Context::add_subscription_with_socket()`.
140    pub fn new(id: SubscriptionId, config: SubscriptionConfig, socket: Socket) -> Self {
141        Self::with_socket(id, config, ReceiveSocket::adopt(socket))
142    }
143
144    pub(crate) fn from_receive_socket(
145        id: SubscriptionId,
146        config: SubscriptionConfig,
147        socket: ReceiveSocket,
148    ) -> Self {
149        Self::with_socket(id, config, socket)
150    }
151
152    /// Returns the subscription's ID.
153    pub fn id(&self) -> SubscriptionId {
154        self.id
155    }
156
157    /// Returns a read-only reference to the subscription's configuration.
158    pub fn config(&self) -> &SubscriptionConfig {
159        &self.config
160    }
161
162    /// Returns a read-only reference to the subscription's socket.
163    pub fn socket(&self) -> &Socket {
164        self.socket.socket()
165    }
166
167    /// Returns a mutable reference to the subscription's socket.
168    ///
169    /// This is useful when an external event loop or registry needs mutable
170    /// socket access during registration.
171    pub fn socket_mut(&mut self) -> &mut Socket {
172        self.socket.socket_mut()
173    }
174
175    /// Attempts to receive a single packet without blocking.
176    ///
177    /// Returns:
178    /// - `Ok(Some(packet))` if a packet was received,
179    /// - `Ok(None)` if no packet is currently available,
180    /// - `Err(...)` on an actual receive failure.
181    pub fn try_recv(&self) -> Result<Option<Packet>, McrxError> {
182        if !self.is_joined() {
183            return Err(McrxError::SubscriptionNotJoined);
184        }
185        match recv_packet(&self.socket, self.id, &self.config) {
186            Ok(Some(packet)) => {
187                #[cfg(feature = "metrics")]
188                self.record_received_packet(&packet);
189
190                Ok(Some(packet))
191            }
192            Ok(None) => {
193                #[cfg(feature = "metrics")]
194                self.record_would_block();
195
196                Ok(None)
197            }
198            Err(err) => {
199                #[cfg(feature = "metrics")]
200                self.record_receive_error();
201
202                Err(err)
203            }
204        }
205    }
206
207    /// Attempts to receive a single packet together with richer receive metadata
208    /// without blocking.
209    pub fn try_recv_with_metadata(&self) -> Result<Option<PacketWithMetadata>, McrxError> {
210        if !self.is_joined() {
211            return Err(McrxError::SubscriptionNotJoined);
212        }
213
214        match recv_packet_with_metadata(&self.socket, self.id, &self.config) {
215            Ok(Some(packet)) => {
216                #[cfg(feature = "metrics")]
217                self.record_received_packet(&packet.packet);
218
219                Ok(Some(packet))
220            }
221            Ok(None) => {
222                #[cfg(feature = "metrics")]
223                self.record_would_block();
224
225                Ok(None)
226            }
227            Err(err) => {
228                #[cfg(feature = "metrics")]
229                self.record_receive_error();
230
231                Err(err)
232            }
233        }
234    }
235
236    /// Returns the raw Unix file descriptor of the underlying socket.
237    ///
238    /// This is useful for integrating subscriptions into external event loops.
239    #[cfg(unix)]
240    pub fn as_raw_fd(&self) -> std::os::fd::RawFd {
241        use std::os::fd::AsRawFd;
242        self.socket().as_raw_fd()
243    }
244
245    /// Returns the raw Windows socket handle of the underlying socket.
246    ///
247    /// This is useful for integrating subscriptions into external event loops.
248    #[cfg(windows)]
249    pub fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
250        use std::os::windows::io::AsRawSocket;
251        self.socket().as_raw_socket()
252    }
253
254    /// Returns the local socket address currently bound by this subscription.
255    pub fn local_addr(&self) -> Result<SocketAddr, McrxError> {
256        self.socket
257            .local_addr()
258            .or_else(|_| socket_local_addr(self.socket()))
259    }
260
261    /// Consumes the subscription and returns its owned socket.
262    ///
263    /// This is useful when handing a joined or bound socket off to an external
264    /// event loop or async runtime.
265    pub fn into_socket(self) -> Socket {
266        self.socket.into_socket()
267    }
268
269    /// Consumes the subscription and returns all owned parts.
270    pub fn into_parts(self) -> SubscriptionParts {
271        SubscriptionParts {
272            id: self.id,
273            config: self.config,
274            socket: self.socket.into_socket(),
275            state: self.state,
276        }
277    }
278
279    /// Returns the current lifecycle state of the subscription.
280    ///
281    /// This can be used by callers to inspect whether the subscription is
282    /// currently joined to its multicast group or only bound.
283    pub fn state(&self) -> SubscriptionState {
284        self.state
285    }
286
287    /// Returns a snapshot of the subscription's current metrics.
288    ///
289    /// Counter values in the returned snapshot are cumulative and can be
290    /// compared against a later snapshot using `delta_since()`.
291    #[cfg(feature = "metrics")]
292    pub fn metrics_snapshot(&self) -> SubscriptionMetricsSnapshot {
293        let last_payload_len = match self.metrics.last_payload_len.load(Ordering::Relaxed) {
294            usize::MAX => None,
295            payload_len => Some(payload_len),
296        };
297
298        SubscriptionMetricsSnapshot {
299            packets_received: self.metrics.packets_received.load(Ordering::Relaxed),
300            bytes_received: self.metrics.bytes_received.load(Ordering::Relaxed),
301            would_block_count: self.metrics.would_block_count.load(Ordering::Relaxed),
302            receive_errors: self.metrics.receive_errors.load(Ordering::Relaxed),
303            join_count: self.metrics.join_count.load(Ordering::Relaxed),
304            leave_count: self.metrics.leave_count.load(Ordering::Relaxed),
305            last_payload_len,
306            last_source: *Self::lock_unpoisoned(&self.metrics.last_source),
307            last_receive_at: *Self::lock_unpoisoned(&self.metrics.last_receive_at),
308            captured_at: SystemTime::now(),
309        }
310    }
311
312    /// Returns `true` if the subscription is currently joined to its multicast group.
313    ///
314    /// This is a convenience helper for checking whether the subscription is in
315    /// the `SubscriptionState::Joined` state.
316    pub fn is_joined(&self) -> bool {
317        matches!(self.state, SubscriptionState::Joined)
318    }
319
320    /// Marks the subscription as joined.
321    ///
322    /// This should be called after a successful multicast join operation on the
323    /// underlying socket.
324    ///
325    /// Returns an error if the subscription is already in the joined state.
326    pub fn mark_joined(&mut self) -> Result<(), McrxError> {
327        if self.state == SubscriptionState::Joined {
328            return Err(McrxError::SubscriptionAlreadyJoined);
329        }
330
331        self.state = SubscriptionState::Joined;
332
333        #[cfg(feature = "metrics")]
334        self.metrics.join_count.fetch_add(1, Ordering::Relaxed);
335
336        Ok(())
337    }
338
339    /// Marks the subscription as bound (not joined).
340    ///
341    /// This should be called after leaving a multicast group, while keeping the
342    /// underlying socket open and bound.
343    ///
344    /// Returns an error if the subscription is already in the bound state.
345    pub fn mark_bound(&mut self) -> Result<(), McrxError> {
346        if self.state == SubscriptionState::Bound {
347            return Err(McrxError::SubscriptionNotJoined);
348        }
349
350        self.state = SubscriptionState::Bound;
351
352        #[cfg(feature = "metrics")]
353        self.metrics.leave_count.fetch_add(1, Ordering::Relaxed);
354
355        Ok(())
356    }
357}
358
359#[cfg(unix)]
360impl std::os::fd::AsFd for Subscription {
361    fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
362        self.socket().as_fd()
363    }
364}
365
366#[cfg(unix)]
367impl std::os::fd::AsRawFd for Subscription {
368    fn as_raw_fd(&self) -> std::os::fd::RawFd {
369        self.socket().as_raw_fd()
370    }
371}
372
373#[cfg(windows)]
374impl std::os::windows::io::AsSocket for Subscription {
375    fn as_socket(&self) -> std::os::windows::io::BorrowedSocket<'_> {
376        self.socket().as_socket()
377    }
378}
379
380#[cfg(windows)]
381impl std::os::windows::io::AsRawSocket for Subscription {
382    fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
383        self.socket().as_raw_socket()
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::config::{SourceFilter, SubscriptionConfig};
391    use crate::platform;
392    use crate::test_support::{
393        ipv6_group_socket_addr, make_multicast_sender, make_multicast_sender_v6,
394        make_multicast_sender_v6_for_source, sample_config_on_unused_port,
395        sample_config_v6_on_unused_port, sample_ssm_receive_config_v6_on_unused_port,
396        unused_udp_port_v4,
397    };
398    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket};
399    use std::time::{Duration, Instant};
400
401    fn test_ssm_config(port: u16, interface: Ipv4Addr) -> SubscriptionConfig {
402        SubscriptionConfig {
403            group: IpAddr::V4(Ipv4Addr::new(232, 1, 2, 3)),
404            source: SourceFilter::Source(IpAddr::V4(interface)),
405            dst_port: port,
406            interface: Some(IpAddr::V4(interface)),
407            interface_index: None,
408        }
409    }
410
411    fn ipv4_group(config: &SubscriptionConfig) -> Ipv4Addr {
412        config.ipv4_membership().unwrap().group
413    }
414
415    fn ipv4_group_socket_addr(config: &SubscriptionConfig) -> SocketAddrV4 {
416        SocketAddrV4::new(ipv4_group(config), config.dst_port)
417    }
418
419    fn primary_ipv4() -> Ipv4Addr {
420        let probe = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).unwrap();
421        probe
422            .connect(SocketAddrV4::new(Ipv4Addr::new(8, 8, 8, 8), 9))
423            .unwrap();
424
425        match probe.local_addr().unwrap() {
426            SocketAddr::V4(addr) => *addr.ip(),
427            SocketAddr::V6(_) => panic!("expected an IPv4 local address for SSM test"),
428        }
429    }
430
431    fn recv_next_subscription_packet(subscription: &Subscription, deadline: Instant) -> Packet {
432        loop {
433            match subscription.try_recv().unwrap() {
434                Some(packet) => return packet,
435                None if Instant::now() < deadline => {
436                    std::thread::sleep(Duration::from_millis(10));
437                }
438                None => panic!("timed out waiting for packet"),
439            }
440        }
441    }
442
443    fn assert_pktinfo_metadata(packet: &PacketWithMetadata, expected_destination: IpAddr) {
444        #[cfg(any(
445            target_os = "linux",
446            target_os = "android",
447            windows,
448            target_vendor = "apple",
449            target_os = "freebsd",
450            target_os = "dragonfly",
451            target_os = "netbsd",
452            target_os = "openbsd"
453        ))]
454        {
455            assert_eq!(
456                packet.metadata.destination_local_ip,
457                Some(expected_destination)
458            );
459            assert!(packet.metadata.ingress_interface_index.is_some());
460        }
461
462        #[cfg(not(any(
463            target_os = "linux",
464            target_os = "android",
465            windows,
466            target_vendor = "apple",
467            target_os = "freebsd",
468            target_os = "dragonfly",
469            target_os = "netbsd",
470            target_os = "openbsd"
471        )))]
472        {
473            let _ = expected_destination;
474            assert_eq!(packet.metadata.destination_local_ip, None);
475            assert_eq!(packet.metadata.ingress_interface_index, None);
476        }
477    }
478
479    #[test]
480    fn try_recv_returns_none_when_no_packet_is_available() {
481        let config = sample_config_on_unused_port();
482        let socket = platform::open_bound_socket(&config).unwrap();
483        let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
484        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
485        subscription.mark_joined().unwrap();
486
487        let result = subscription.try_recv().unwrap();
488
489        assert!(result.is_none());
490    }
491
492    #[test]
493    fn try_recv_receives_packet_sent_to_bound_port() {
494        let config = sample_config_on_unused_port();
495        let socket = platform::open_bound_socket(&config).unwrap();
496        let mut subscription =
497            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
498        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
499        subscription.mark_joined().unwrap();
500
501        let sender = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap();
502        let payload = b"hello multicast core";
503
504        sender
505            .send_to(
506                payload,
507                SocketAddrV4::new(Ipv4Addr::LOCALHOST, config.dst_port),
508            )
509            .unwrap();
510
511        let deadline = Instant::now() + Duration::from_secs(1);
512        let packet = recv_next_subscription_packet(&subscription, deadline);
513
514        assert_eq!(packet.subscription_id, SubscriptionId(1));
515        assert_eq!(packet.group, IpAddr::V4(ipv4_group(&config)));
516        assert_eq!(packet.dst_port, config.dst_port);
517        assert_eq!(&packet.payload[..], payload);
518        assert_eq!(packet.source.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
519    }
520
521    #[test]
522    fn try_recv_with_metadata_exposes_current_socket_context() {
523        let config = sample_config_on_unused_port();
524        let socket = platform::open_bound_socket(&config).unwrap();
525        let mut subscription =
526            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
527        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
528        subscription.mark_joined().unwrap();
529
530        let sender = make_multicast_sender();
531        let payload = b"hello detailed receive";
532
533        sender
534            .send_to(payload, ipv4_group_socket_addr(&config))
535            .unwrap();
536
537        let deadline = Instant::now() + Duration::from_secs(1);
538        let packet = loop {
539            match subscription.try_recv_with_metadata().unwrap() {
540                Some(packet) => break packet,
541                None if Instant::now() < deadline => {
542                    std::thread::sleep(Duration::from_millis(10));
543                }
544                None => panic!("timed out waiting for packet with metadata"),
545            }
546        };
547
548        assert_eq!(packet.packet.subscription_id, SubscriptionId(1));
549        assert_eq!(packet.packet.group, IpAddr::V4(ipv4_group(&config)));
550        assert_eq!(packet.packet.dst_port, config.dst_port);
551        assert_eq!(&packet.packet.payload[..], payload);
552        assert_pktinfo_metadata(&packet, IpAddr::V4(ipv4_group(&config)));
553        assert_eq!(
554            packet.metadata.socket_local_addr,
555            Some(SocketAddr::V4(SocketAddrV4::new(
556                Ipv4Addr::UNSPECIFIED,
557                config.dst_port,
558            )))
559        );
560        assert_eq!(packet.metadata.configured_interface, None);
561    }
562
563    #[test]
564    fn try_recv_with_metadata_exposes_current_ipv6_socket_context() {
565        let config = sample_config_v6_on_unused_port();
566        let socket = platform::open_bound_socket(&config).unwrap();
567        let mut subscription =
568            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
569        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
570        subscription.mark_joined().unwrap();
571
572        let sender = make_multicast_sender_v6(Ipv6Addr::LOCALHOST);
573        let payload = b"hello detailed receive ipv6";
574
575        sender
576            .send_to(payload, ipv6_group_socket_addr(&config))
577            .unwrap();
578
579        let deadline = Instant::now() + Duration::from_secs(1);
580        let packet = loop {
581            match subscription.try_recv_with_metadata().unwrap() {
582                Some(packet) => break packet,
583                None if Instant::now() < deadline => {
584                    std::thread::sleep(Duration::from_millis(10));
585                }
586                None => panic!("timed out waiting for IPv6 packet with metadata"),
587            }
588        };
589
590        assert_eq!(packet.packet.subscription_id, SubscriptionId(1));
591        assert_eq!(packet.packet.group, config.group);
592        assert_eq!(packet.packet.dst_port, config.dst_port);
593        assert_eq!(&packet.packet.payload[..], payload);
594        assert_pktinfo_metadata(&packet, config.group);
595        assert_eq!(
596            packet.metadata.socket_local_addr,
597            Some(SocketAddr::V6(SocketAddrV6::new(
598                Ipv6Addr::UNSPECIFIED,
599                config.dst_port,
600                0,
601                0,
602            )))
603        );
604        assert_eq!(packet.metadata.configured_interface, config.interface);
605        assert_eq!(
606            packet.metadata.configured_interface_index,
607            config.interface_index
608        );
609    }
610
611    #[test]
612    fn try_recv_receives_multicast_packet_from_joined_group() {
613        let config = sample_config_on_unused_port();
614        let socket = platform::open_bound_socket(&config).unwrap();
615        let mut subscription =
616            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
617        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
618        subscription.mark_joined().unwrap();
619
620        let sender = make_multicast_sender();
621
622        let sender_port = sender.local_addr().unwrap().port();
623        let payload = b"hello real asm multicast";
624
625        sender
626            .send_to(payload, ipv4_group_socket_addr(&config))
627            .unwrap();
628
629        let deadline = Instant::now() + Duration::from_secs(1);
630        let packet = recv_next_subscription_packet(&subscription, deadline);
631
632        assert_eq!(packet.subscription_id, SubscriptionId(1));
633        assert_eq!(packet.group, IpAddr::V4(ipv4_group(&config)));
634        assert_eq!(packet.dst_port, config.dst_port);
635        assert_eq!(&packet.payload[..], payload);
636        assert_eq!(packet.source.port(), sender_port);
637    }
638
639    #[test]
640    fn try_recv_receives_ssm_packet_from_allowed_source() {
641        let interface = primary_ipv4();
642        let config = test_ssm_config(unused_udp_port_v4(), interface);
643        let socket = platform::open_bound_socket(&config).unwrap();
644        let mut subscription =
645            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
646        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
647        subscription.mark_joined().unwrap();
648
649        let sender = UdpSocket::bind(SocketAddrV4::new(interface, 0)).unwrap();
650        sender.set_multicast_loop_v4(true).unwrap();
651        sender.set_multicast_ttl_v4(1).unwrap();
652
653        let sender_port = sender.local_addr().unwrap().port();
654        let payload = b"hello real ssm multicast";
655
656        sender
657            .send_to(payload, ipv4_group_socket_addr(&config))
658            .unwrap();
659
660        let deadline = Instant::now() + Duration::from_secs(1);
661        let packet = recv_next_subscription_packet(&subscription, deadline);
662
663        assert_eq!(packet.subscription_id, SubscriptionId(1));
664        assert_eq!(packet.group, IpAddr::V4(ipv4_group(&config)));
665        assert_eq!(packet.dst_port, config.dst_port);
666        assert_eq!(&packet.payload[..], payload);
667        assert_eq!(packet.source.port(), sender_port);
668        assert_eq!(packet.source.ip(), IpAddr::V4(interface));
669    }
670
671    #[test]
672    fn try_recv_receives_ipv6_ssm_packet_from_allowed_source() {
673        let Some(config) = sample_ssm_receive_config_v6_on_unused_port() else {
674            return;
675        };
676        let interface = match config.source_addr().unwrap() {
677            IpAddr::V6(source) => source,
678            IpAddr::V4(_) => panic!("expected an IPv6 source for IPv6 SSM test"),
679        };
680        let socket = platform::open_bound_socket(&config).unwrap();
681        let mut subscription =
682            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
683        platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
684        subscription.mark_joined().unwrap();
685
686        let sender = make_multicast_sender_v6_for_source(interface);
687        let sender_port = sender.local_addr().unwrap().port();
688        let payload = b"hello real ipv6 ssm multicast";
689
690        sender
691            .send_to(payload, ipv6_group_socket_addr(&config))
692            .unwrap();
693
694        let deadline = Instant::now() + Duration::from_secs(1);
695        let packet = recv_next_subscription_packet(&subscription, deadline);
696
697        assert_eq!(packet.subscription_id, SubscriptionId(1));
698        assert_eq!(packet.group, config.group);
699        assert_eq!(packet.dst_port, config.dst_port);
700        assert_eq!(&packet.payload[..], payload);
701        assert_eq!(packet.source.port(), sender_port);
702        assert_eq!(packet.source.ip(), IpAddr::V6(interface));
703    }
704
705    #[test]
706    fn mark_joined_transitions_bound_to_joined_state() {
707        let config = sample_config_on_unused_port();
708        let socket = platform::open_bound_socket(&config).unwrap();
709        let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
710
711        subscription.mark_joined().unwrap();
712
713        assert_eq!(subscription.state(), SubscriptionState::Joined);
714    }
715
716    #[test]
717    fn mark_joined_rejects_already_joined_subscription() {
718        let config = sample_config_on_unused_port();
719        let socket = platform::open_bound_socket(&config).unwrap();
720        let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
721
722        subscription.mark_joined().unwrap();
723        let result = subscription.mark_joined();
724
725        assert!(matches!(result, Err(McrxError::SubscriptionAlreadyJoined)));
726    }
727
728    #[test]
729    fn mark_bound_transitions_joined_to_bound_state() {
730        let config = sample_config_on_unused_port();
731        let socket = platform::open_bound_socket(&config).unwrap();
732        let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
733
734        subscription.mark_joined().unwrap();
735        subscription.mark_bound().unwrap();
736
737        assert_eq!(subscription.state(), SubscriptionState::Bound);
738    }
739
740    #[test]
741    fn mark_bound_rejects_already_bound_subscription() {
742        let config = sample_config_on_unused_port();
743        let socket = platform::open_bound_socket(&config).unwrap();
744        let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
745
746        let result = subscription.mark_bound();
747
748        assert!(matches!(result, Err(McrxError::SubscriptionNotJoined)));
749    }
750
751    #[test]
752    fn local_addr_returns_bound_socket_address() {
753        let config = sample_config_on_unused_port();
754        let socket = platform::open_bound_socket(&config).unwrap();
755        let subscription =
756            Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
757
758        let local_addr = subscription.local_addr().unwrap();
759
760        assert_eq!(
761            local_addr,
762            SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.dst_port))
763        );
764    }
765}