1use crate::config::SubscriptionConfig;
2use crate::error::McrxError;
3#[cfg(feature = "metrics")]
4use crate::metrics::SubscriptionMetricsSnapshot;
5use crate::packet::{Packet, PacketWithMetadata};
6use crate::platform::{ReceiveSocket, recv_packet, recv_packet_with_metadata, socket_local_addr};
7use socket2::Socket;
8use std::net::SocketAddr;
9#[cfg(feature = "metrics")]
10use std::sync::Mutex;
11#[cfg(feature = "metrics")]
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13#[cfg(feature = "metrics")]
14use std::time::SystemTime;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct SubscriptionId(pub u64);
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SubscriptionState {
26 Bound,
28 Joined,
30}
31
32#[derive(Debug)]
35pub struct SubscriptionParts {
36 pub id: SubscriptionId,
38 pub config: SubscriptionConfig,
40 pub socket: Socket,
42 pub state: SubscriptionState,
44}
45
46#[cfg(feature = "metrics")]
47#[derive(Debug)]
48struct SubscriptionMetricsInner {
49 packets_received: AtomicU64,
50 bytes_received: AtomicU64,
51 would_block_count: AtomicU64,
52 receive_errors: AtomicU64,
53 join_count: AtomicU64,
54 leave_count: AtomicU64,
55 last_payload_len: AtomicUsize,
56 last_source: Mutex<Option<SocketAddr>>,
57 last_receive_at: Mutex<Option<SystemTime>>,
58}
59
60#[cfg(feature = "metrics")]
61impl Default for SubscriptionMetricsInner {
62 fn default() -> Self {
63 Self {
64 packets_received: AtomicU64::new(0),
65 bytes_received: AtomicU64::new(0),
66 would_block_count: AtomicU64::new(0),
67 receive_errors: AtomicU64::new(0),
68 join_count: AtomicU64::new(0),
69 leave_count: AtomicU64::new(0),
70 last_payload_len: AtomicUsize::new(usize::MAX),
71 last_source: Mutex::new(None),
72 last_receive_at: Mutex::new(None),
73 }
74 }
75}
76
77#[derive(Debug)]
79pub struct Subscription {
80 id: SubscriptionId,
81 config: SubscriptionConfig,
82 socket: ReceiveSocket,
83 state: SubscriptionState,
84 #[cfg(feature = "metrics")]
85 metrics: SubscriptionMetricsInner,
86}
87
88impl Subscription {
89 #[cfg(feature = "metrics")]
90 fn lock_unpoisoned<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
91 match mutex.lock() {
92 Ok(guard) => guard,
93 Err(poisoned) => poisoned.into_inner(),
94 }
95 }
96
97 fn with_socket(id: SubscriptionId, config: SubscriptionConfig, socket: ReceiveSocket) -> Self {
98 Self {
99 id,
100 config,
101 socket,
102 state: SubscriptionState::Bound,
103 #[cfg(feature = "metrics")]
104 metrics: SubscriptionMetricsInner::default(),
105 }
106 }
107
108 #[cfg(feature = "metrics")]
109 fn record_received_packet(&self, packet: &Packet) {
110 self.metrics
111 .packets_received
112 .fetch_add(1, Ordering::Relaxed);
113 self.metrics
114 .bytes_received
115 .fetch_add(packet.payload.len() as u64, Ordering::Relaxed);
116 self.metrics
117 .last_payload_len
118 .store(packet.payload.len(), Ordering::Relaxed);
119 *Self::lock_unpoisoned(&self.metrics.last_source) = Some(packet.source);
120 *Self::lock_unpoisoned(&self.metrics.last_receive_at) = Some(SystemTime::now());
121 }
122
123 #[cfg(feature = "metrics")]
124 fn record_would_block(&self) {
125 self.metrics
126 .would_block_count
127 .fetch_add(1, Ordering::Relaxed);
128 }
129
130 #[cfg(feature = "metrics")]
131 fn record_receive_error(&self) {
132 self.metrics.receive_errors.fetch_add(1, Ordering::Relaxed);
133 }
134
135 pub fn new(id: SubscriptionId, config: SubscriptionConfig, socket: Socket) -> Self {
141 Self::with_socket(id, config, ReceiveSocket::adopt(socket))
142 }
143
144 pub(crate) fn from_receive_socket(
145 id: SubscriptionId,
146 config: SubscriptionConfig,
147 socket: ReceiveSocket,
148 ) -> Self {
149 Self::with_socket(id, config, socket)
150 }
151
152 pub fn id(&self) -> SubscriptionId {
154 self.id
155 }
156
157 pub fn config(&self) -> &SubscriptionConfig {
159 &self.config
160 }
161
162 pub fn socket(&self) -> &Socket {
164 self.socket.socket()
165 }
166
167 pub fn socket_mut(&mut self) -> &mut Socket {
172 self.socket.socket_mut()
173 }
174
175 pub fn try_recv(&self) -> Result<Option<Packet>, McrxError> {
182 if !self.is_joined() {
183 return Err(McrxError::SubscriptionNotJoined);
184 }
185 match recv_packet(&self.socket, self.id, &self.config) {
186 Ok(Some(packet)) => {
187 #[cfg(feature = "metrics")]
188 self.record_received_packet(&packet);
189
190 Ok(Some(packet))
191 }
192 Ok(None) => {
193 #[cfg(feature = "metrics")]
194 self.record_would_block();
195
196 Ok(None)
197 }
198 Err(err) => {
199 #[cfg(feature = "metrics")]
200 self.record_receive_error();
201
202 Err(err)
203 }
204 }
205 }
206
207 pub fn try_recv_with_metadata(&self) -> Result<Option<PacketWithMetadata>, McrxError> {
210 if !self.is_joined() {
211 return Err(McrxError::SubscriptionNotJoined);
212 }
213
214 match recv_packet_with_metadata(&self.socket, self.id, &self.config) {
215 Ok(Some(packet)) => {
216 #[cfg(feature = "metrics")]
217 self.record_received_packet(&packet.packet);
218
219 Ok(Some(packet))
220 }
221 Ok(None) => {
222 #[cfg(feature = "metrics")]
223 self.record_would_block();
224
225 Ok(None)
226 }
227 Err(err) => {
228 #[cfg(feature = "metrics")]
229 self.record_receive_error();
230
231 Err(err)
232 }
233 }
234 }
235
236 #[cfg(unix)]
240 pub fn as_raw_fd(&self) -> std::os::fd::RawFd {
241 use std::os::fd::AsRawFd;
242 self.socket().as_raw_fd()
243 }
244
245 #[cfg(windows)]
249 pub fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
250 use std::os::windows::io::AsRawSocket;
251 self.socket().as_raw_socket()
252 }
253
254 pub fn local_addr(&self) -> Result<SocketAddr, McrxError> {
256 self.socket
257 .local_addr()
258 .or_else(|_| socket_local_addr(self.socket()))
259 }
260
261 pub fn into_socket(self) -> Socket {
266 self.socket.into_socket()
267 }
268
269 pub fn into_parts(self) -> SubscriptionParts {
271 SubscriptionParts {
272 id: self.id,
273 config: self.config,
274 socket: self.socket.into_socket(),
275 state: self.state,
276 }
277 }
278
279 pub fn state(&self) -> SubscriptionState {
284 self.state
285 }
286
287 #[cfg(feature = "metrics")]
292 pub fn metrics_snapshot(&self) -> SubscriptionMetricsSnapshot {
293 let last_payload_len = match self.metrics.last_payload_len.load(Ordering::Relaxed) {
294 usize::MAX => None,
295 payload_len => Some(payload_len),
296 };
297
298 SubscriptionMetricsSnapshot {
299 packets_received: self.metrics.packets_received.load(Ordering::Relaxed),
300 bytes_received: self.metrics.bytes_received.load(Ordering::Relaxed),
301 would_block_count: self.metrics.would_block_count.load(Ordering::Relaxed),
302 receive_errors: self.metrics.receive_errors.load(Ordering::Relaxed),
303 join_count: self.metrics.join_count.load(Ordering::Relaxed),
304 leave_count: self.metrics.leave_count.load(Ordering::Relaxed),
305 last_payload_len,
306 last_source: *Self::lock_unpoisoned(&self.metrics.last_source),
307 last_receive_at: *Self::lock_unpoisoned(&self.metrics.last_receive_at),
308 captured_at: SystemTime::now(),
309 }
310 }
311
312 pub fn is_joined(&self) -> bool {
317 matches!(self.state, SubscriptionState::Joined)
318 }
319
320 pub fn mark_joined(&mut self) -> Result<(), McrxError> {
327 if self.state == SubscriptionState::Joined {
328 return Err(McrxError::SubscriptionAlreadyJoined);
329 }
330
331 self.state = SubscriptionState::Joined;
332
333 #[cfg(feature = "metrics")]
334 self.metrics.join_count.fetch_add(1, Ordering::Relaxed);
335
336 Ok(())
337 }
338
339 pub fn mark_bound(&mut self) -> Result<(), McrxError> {
346 if self.state == SubscriptionState::Bound {
347 return Err(McrxError::SubscriptionNotJoined);
348 }
349
350 self.state = SubscriptionState::Bound;
351
352 #[cfg(feature = "metrics")]
353 self.metrics.leave_count.fetch_add(1, Ordering::Relaxed);
354
355 Ok(())
356 }
357}
358
359#[cfg(unix)]
360impl std::os::fd::AsFd for Subscription {
361 fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
362 self.socket().as_fd()
363 }
364}
365
366#[cfg(unix)]
367impl std::os::fd::AsRawFd for Subscription {
368 fn as_raw_fd(&self) -> std::os::fd::RawFd {
369 self.socket().as_raw_fd()
370 }
371}
372
373#[cfg(windows)]
374impl std::os::windows::io::AsSocket for Subscription {
375 fn as_socket(&self) -> std::os::windows::io::BorrowedSocket<'_> {
376 self.socket().as_socket()
377 }
378}
379
380#[cfg(windows)]
381impl std::os::windows::io::AsRawSocket for Subscription {
382 fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
383 self.socket().as_raw_socket()
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::config::{SourceFilter, SubscriptionConfig};
391 use crate::platform;
392 use crate::test_support::{
393 ipv6_group_socket_addr, make_multicast_sender, make_multicast_sender_v6,
394 make_multicast_sender_v6_for_source, sample_config_on_unused_port,
395 sample_config_v6_on_unused_port, sample_ssm_receive_config_v6_on_unused_port,
396 unused_udp_port_v4,
397 };
398 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket};
399 use std::time::{Duration, Instant};
400
401 fn test_ssm_config(port: u16, interface: Ipv4Addr) -> SubscriptionConfig {
402 SubscriptionConfig {
403 group: IpAddr::V4(Ipv4Addr::new(232, 1, 2, 3)),
404 source: SourceFilter::Source(IpAddr::V4(interface)),
405 dst_port: port,
406 interface: Some(IpAddr::V4(interface)),
407 interface_index: None,
408 }
409 }
410
411 fn ipv4_group(config: &SubscriptionConfig) -> Ipv4Addr {
412 config.ipv4_membership().unwrap().group
413 }
414
415 fn ipv4_group_socket_addr(config: &SubscriptionConfig) -> SocketAddrV4 {
416 SocketAddrV4::new(ipv4_group(config), config.dst_port)
417 }
418
419 fn primary_ipv4() -> Ipv4Addr {
420 let probe = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)).unwrap();
421 probe
422 .connect(SocketAddrV4::new(Ipv4Addr::new(8, 8, 8, 8), 9))
423 .unwrap();
424
425 match probe.local_addr().unwrap() {
426 SocketAddr::V4(addr) => *addr.ip(),
427 SocketAddr::V6(_) => panic!("expected an IPv4 local address for SSM test"),
428 }
429 }
430
431 fn recv_next_subscription_packet(subscription: &Subscription, deadline: Instant) -> Packet {
432 loop {
433 match subscription.try_recv().unwrap() {
434 Some(packet) => return packet,
435 None if Instant::now() < deadline => {
436 std::thread::sleep(Duration::from_millis(10));
437 }
438 None => panic!("timed out waiting for packet"),
439 }
440 }
441 }
442
443 fn assert_pktinfo_metadata(packet: &PacketWithMetadata, expected_destination: IpAddr) {
444 #[cfg(any(
445 target_os = "linux",
446 target_os = "android",
447 windows,
448 target_vendor = "apple",
449 target_os = "freebsd",
450 target_os = "dragonfly",
451 target_os = "netbsd",
452 target_os = "openbsd"
453 ))]
454 {
455 assert_eq!(
456 packet.metadata.destination_local_ip,
457 Some(expected_destination)
458 );
459 assert!(packet.metadata.ingress_interface_index.is_some());
460 }
461
462 #[cfg(not(any(
463 target_os = "linux",
464 target_os = "android",
465 windows,
466 target_vendor = "apple",
467 target_os = "freebsd",
468 target_os = "dragonfly",
469 target_os = "netbsd",
470 target_os = "openbsd"
471 )))]
472 {
473 let _ = expected_destination;
474 assert_eq!(packet.metadata.destination_local_ip, None);
475 assert_eq!(packet.metadata.ingress_interface_index, None);
476 }
477 }
478
479 #[test]
480 fn try_recv_returns_none_when_no_packet_is_available() {
481 let config = sample_config_on_unused_port();
482 let socket = platform::open_bound_socket(&config).unwrap();
483 let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
484 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
485 subscription.mark_joined().unwrap();
486
487 let result = subscription.try_recv().unwrap();
488
489 assert!(result.is_none());
490 }
491
492 #[test]
493 fn try_recv_receives_packet_sent_to_bound_port() {
494 let config = sample_config_on_unused_port();
495 let socket = platform::open_bound_socket(&config).unwrap();
496 let mut subscription =
497 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
498 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
499 subscription.mark_joined().unwrap();
500
501 let sender = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap();
502 let payload = b"hello multicast core";
503
504 sender
505 .send_to(
506 payload,
507 SocketAddrV4::new(Ipv4Addr::LOCALHOST, config.dst_port),
508 )
509 .unwrap();
510
511 let deadline = Instant::now() + Duration::from_secs(1);
512 let packet = recv_next_subscription_packet(&subscription, deadline);
513
514 assert_eq!(packet.subscription_id, SubscriptionId(1));
515 assert_eq!(packet.group, IpAddr::V4(ipv4_group(&config)));
516 assert_eq!(packet.dst_port, config.dst_port);
517 assert_eq!(&packet.payload[..], payload);
518 assert_eq!(packet.source.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
519 }
520
521 #[test]
522 fn try_recv_with_metadata_exposes_current_socket_context() {
523 let config = sample_config_on_unused_port();
524 let socket = platform::open_bound_socket(&config).unwrap();
525 let mut subscription =
526 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
527 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
528 subscription.mark_joined().unwrap();
529
530 let sender = make_multicast_sender();
531 let payload = b"hello detailed receive";
532
533 sender
534 .send_to(payload, ipv4_group_socket_addr(&config))
535 .unwrap();
536
537 let deadline = Instant::now() + Duration::from_secs(1);
538 let packet = loop {
539 match subscription.try_recv_with_metadata().unwrap() {
540 Some(packet) => break packet,
541 None if Instant::now() < deadline => {
542 std::thread::sleep(Duration::from_millis(10));
543 }
544 None => panic!("timed out waiting for packet with metadata"),
545 }
546 };
547
548 assert_eq!(packet.packet.subscription_id, SubscriptionId(1));
549 assert_eq!(packet.packet.group, IpAddr::V4(ipv4_group(&config)));
550 assert_eq!(packet.packet.dst_port, config.dst_port);
551 assert_eq!(&packet.packet.payload[..], payload);
552 assert_pktinfo_metadata(&packet, IpAddr::V4(ipv4_group(&config)));
553 assert_eq!(
554 packet.metadata.socket_local_addr,
555 Some(SocketAddr::V4(SocketAddrV4::new(
556 Ipv4Addr::UNSPECIFIED,
557 config.dst_port,
558 )))
559 );
560 assert_eq!(packet.metadata.configured_interface, None);
561 }
562
563 #[test]
564 fn try_recv_with_metadata_exposes_current_ipv6_socket_context() {
565 let config = sample_config_v6_on_unused_port();
566 let socket = platform::open_bound_socket(&config).unwrap();
567 let mut subscription =
568 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
569 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
570 subscription.mark_joined().unwrap();
571
572 let sender = make_multicast_sender_v6(Ipv6Addr::LOCALHOST);
573 let payload = b"hello detailed receive ipv6";
574
575 sender
576 .send_to(payload, ipv6_group_socket_addr(&config))
577 .unwrap();
578
579 let deadline = Instant::now() + Duration::from_secs(1);
580 let packet = loop {
581 match subscription.try_recv_with_metadata().unwrap() {
582 Some(packet) => break packet,
583 None if Instant::now() < deadline => {
584 std::thread::sleep(Duration::from_millis(10));
585 }
586 None => panic!("timed out waiting for IPv6 packet with metadata"),
587 }
588 };
589
590 assert_eq!(packet.packet.subscription_id, SubscriptionId(1));
591 assert_eq!(packet.packet.group, config.group);
592 assert_eq!(packet.packet.dst_port, config.dst_port);
593 assert_eq!(&packet.packet.payload[..], payload);
594 assert_pktinfo_metadata(&packet, config.group);
595 assert_eq!(
596 packet.metadata.socket_local_addr,
597 Some(SocketAddr::V6(SocketAddrV6::new(
598 Ipv6Addr::UNSPECIFIED,
599 config.dst_port,
600 0,
601 0,
602 )))
603 );
604 assert_eq!(packet.metadata.configured_interface, config.interface);
605 assert_eq!(
606 packet.metadata.configured_interface_index,
607 config.interface_index
608 );
609 }
610
611 #[test]
612 fn try_recv_receives_multicast_packet_from_joined_group() {
613 let config = sample_config_on_unused_port();
614 let socket = platform::open_bound_socket(&config).unwrap();
615 let mut subscription =
616 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
617 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
618 subscription.mark_joined().unwrap();
619
620 let sender = make_multicast_sender();
621
622 let sender_port = sender.local_addr().unwrap().port();
623 let payload = b"hello real asm multicast";
624
625 sender
626 .send_to(payload, ipv4_group_socket_addr(&config))
627 .unwrap();
628
629 let deadline = Instant::now() + Duration::from_secs(1);
630 let packet = recv_next_subscription_packet(&subscription, deadline);
631
632 assert_eq!(packet.subscription_id, SubscriptionId(1));
633 assert_eq!(packet.group, IpAddr::V4(ipv4_group(&config)));
634 assert_eq!(packet.dst_port, config.dst_port);
635 assert_eq!(&packet.payload[..], payload);
636 assert_eq!(packet.source.port(), sender_port);
637 }
638
639 #[test]
640 fn try_recv_receives_ssm_packet_from_allowed_source() {
641 let interface = primary_ipv4();
642 let config = test_ssm_config(unused_udp_port_v4(), interface);
643 let socket = platform::open_bound_socket(&config).unwrap();
644 let mut subscription =
645 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
646 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
647 subscription.mark_joined().unwrap();
648
649 let sender = UdpSocket::bind(SocketAddrV4::new(interface, 0)).unwrap();
650 sender.set_multicast_loop_v4(true).unwrap();
651 sender.set_multicast_ttl_v4(1).unwrap();
652
653 let sender_port = sender.local_addr().unwrap().port();
654 let payload = b"hello real ssm multicast";
655
656 sender
657 .send_to(payload, ipv4_group_socket_addr(&config))
658 .unwrap();
659
660 let deadline = Instant::now() + Duration::from_secs(1);
661 let packet = recv_next_subscription_packet(&subscription, deadline);
662
663 assert_eq!(packet.subscription_id, SubscriptionId(1));
664 assert_eq!(packet.group, IpAddr::V4(ipv4_group(&config)));
665 assert_eq!(packet.dst_port, config.dst_port);
666 assert_eq!(&packet.payload[..], payload);
667 assert_eq!(packet.source.port(), sender_port);
668 assert_eq!(packet.source.ip(), IpAddr::V4(interface));
669 }
670
671 #[test]
672 fn try_recv_receives_ipv6_ssm_packet_from_allowed_source() {
673 let Some(config) = sample_ssm_receive_config_v6_on_unused_port() else {
674 return;
675 };
676 let interface = match config.source_addr().unwrap() {
677 IpAddr::V6(source) => source,
678 IpAddr::V4(_) => panic!("expected an IPv6 source for IPv6 SSM test"),
679 };
680 let socket = platform::open_bound_socket(&config).unwrap();
681 let mut subscription =
682 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
683 platform::join_multicast_group(subscription.socket(), subscription.config()).unwrap();
684 subscription.mark_joined().unwrap();
685
686 let sender = make_multicast_sender_v6_for_source(interface);
687 let sender_port = sender.local_addr().unwrap().port();
688 let payload = b"hello real ipv6 ssm multicast";
689
690 sender
691 .send_to(payload, ipv6_group_socket_addr(&config))
692 .unwrap();
693
694 let deadline = Instant::now() + Duration::from_secs(1);
695 let packet = recv_next_subscription_packet(&subscription, deadline);
696
697 assert_eq!(packet.subscription_id, SubscriptionId(1));
698 assert_eq!(packet.group, config.group);
699 assert_eq!(packet.dst_port, config.dst_port);
700 assert_eq!(&packet.payload[..], payload);
701 assert_eq!(packet.source.port(), sender_port);
702 assert_eq!(packet.source.ip(), IpAddr::V6(interface));
703 }
704
705 #[test]
706 fn mark_joined_transitions_bound_to_joined_state() {
707 let config = sample_config_on_unused_port();
708 let socket = platform::open_bound_socket(&config).unwrap();
709 let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
710
711 subscription.mark_joined().unwrap();
712
713 assert_eq!(subscription.state(), SubscriptionState::Joined);
714 }
715
716 #[test]
717 fn mark_joined_rejects_already_joined_subscription() {
718 let config = sample_config_on_unused_port();
719 let socket = platform::open_bound_socket(&config).unwrap();
720 let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
721
722 subscription.mark_joined().unwrap();
723 let result = subscription.mark_joined();
724
725 assert!(matches!(result, Err(McrxError::SubscriptionAlreadyJoined)));
726 }
727
728 #[test]
729 fn mark_bound_transitions_joined_to_bound_state() {
730 let config = sample_config_on_unused_port();
731 let socket = platform::open_bound_socket(&config).unwrap();
732 let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
733
734 subscription.mark_joined().unwrap();
735 subscription.mark_bound().unwrap();
736
737 assert_eq!(subscription.state(), SubscriptionState::Bound);
738 }
739
740 #[test]
741 fn mark_bound_rejects_already_bound_subscription() {
742 let config = sample_config_on_unused_port();
743 let socket = platform::open_bound_socket(&config).unwrap();
744 let mut subscription = Subscription::from_receive_socket(SubscriptionId(1), config, socket);
745
746 let result = subscription.mark_bound();
747
748 assert!(matches!(result, Err(McrxError::SubscriptionNotJoined)));
749 }
750
751 #[test]
752 fn local_addr_returns_bound_socket_address() {
753 let config = sample_config_on_unused_port();
754 let socket = platform::open_bound_socket(&config).unwrap();
755 let subscription =
756 Subscription::from_receive_socket(SubscriptionId(1), config.clone(), socket);
757
758 let local_addr = subscription.local_addr().unwrap();
759
760 assert_eq!(
761 local_addr,
762 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.dst_port))
763 );
764 }
765}