1#[cfg(feature = "metrics")]
2use crate::metrics::PublicationMetricsSnapshot;
3use crate::platform::resolve_ipv6_interface_index;
4use crate::{
5 MctxError, SendReport,
6 config::{Ipv6MulticastScope, OutgoingInterface, PublicationConfig, ipv6_destination_scope_id},
7};
8use socket2::{Domain, Protocol, SockAddr, Socket, Type};
9use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket};
10#[cfg(unix)]
11use std::os::fd::{AsRawFd, RawFd};
12#[cfg(windows)]
13use std::os::windows::io::{AsRawSocket, RawSocket};
14use std::sync::OnceLock;
15#[cfg(feature = "metrics")]
16use std::time::SystemTime;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub struct PublicationId(pub u64);
21
22#[derive(Debug)]
24pub struct PublicationParts {
25 pub id: PublicationId,
26 pub config: PublicationConfig,
27 pub socket: Socket,
28}
29
30#[derive(Debug)]
32pub struct Publication {
33 id: PublicationId,
34 config: PublicationConfig,
35 socket: Socket,
36 destination: SocketAddr,
37 cached_local_addr: OnceLock<SocketAddr>,
38 #[cfg(feature = "metrics")]
39 metrics: PublicationMetricsState,
40}
41
42impl Publication {
43 pub fn new(id: PublicationId, config: PublicationConfig) -> Result<Self, MctxError> {
45 config.validate()?;
46
47 let domain = match config.family() {
48 crate::PublicationAddressFamily::Ipv4 => Domain::IPV4,
49 crate::PublicationAddressFamily::Ipv6 => Domain::IPV6,
50 };
51 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
52 .map_err(MctxError::SocketCreateFailed)?;
53
54 let destination = Self::configure_socket(&socket, &config, false)?;
55
56 Ok(Self {
57 id,
58 config,
59 socket,
60 destination,
61 cached_local_addr: OnceLock::new(),
62 #[cfg(feature = "metrics")]
63 metrics: PublicationMetricsState::default(),
64 })
65 }
66
67 pub fn new_with_socket(
69 id: PublicationId,
70 config: PublicationConfig,
71 socket: Socket,
72 ) -> Result<Self, MctxError> {
73 config.validate()?;
74 let destination = Self::configure_socket(&socket, &config, true)?;
75
76 Ok(Self {
77 id,
78 config,
79 socket,
80 destination,
81 cached_local_addr: OnceLock::new(),
82 #[cfg(feature = "metrics")]
83 metrics: PublicationMetricsState::default(),
84 })
85 }
86
87 pub fn new_with_udp_socket(
89 id: PublicationId,
90 config: PublicationConfig,
91 socket: UdpSocket,
92 ) -> Result<Self, MctxError> {
93 Self::new_with_socket(id, config, Socket::from(socket))
94 }
95
96 pub fn id(&self) -> PublicationId {
98 self.id
99 }
100
101 pub fn config(&self) -> &PublicationConfig {
103 &self.config
104 }
105
106 pub fn destination(&self) -> SocketAddr {
108 self.destination
109 }
110
111 pub fn destination_v4(&self) -> Result<SocketAddrV4, MctxError> {
113 match self.destination {
114 SocketAddr::V4(destination) => Ok(destination),
115 SocketAddr::V6(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
116 }
117 }
118
119 pub fn destination_v6(&self) -> Result<SocketAddrV6, MctxError> {
121 match self.destination {
122 SocketAddr::V4(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
123 SocketAddr::V6(destination) => Ok(destination),
124 }
125 }
126
127 pub fn socket(&self) -> &Socket {
134 &self.socket
135 }
136
137 pub fn socket_mut(&mut self) -> &mut Socket {
142 self.cached_local_addr = OnceLock::new();
143 &mut self.socket
144 }
145
146 pub fn send(&self, payload: &[u8]) -> Result<SendReport, MctxError> {
148 match self.socket.send(payload) {
149 Ok(bytes_sent) => {
150 #[cfg(feature = "metrics")]
151 self.metrics.record_success(bytes_sent);
152
153 let local_addr = self.cached_local_addr().ok();
154
155 Ok(SendReport {
156 publication_id: self.id,
157 destination: self.destination,
158 local_addr,
159 source_addr: local_addr.map(|addr| addr.ip()),
160 bytes_sent,
161 })
162 }
163 Err(error) => {
164 #[cfg(feature = "metrics")]
165 self.metrics.record_error();
166
167 Err(MctxError::SendFailed(error))
168 }
169 }
170 }
171
172 pub fn local_addr(&self) -> Result<SocketAddr, MctxError> {
174 self.cached_local_addr()
175 }
176
177 fn cached_local_addr(&self) -> Result<SocketAddr, MctxError> {
178 if let Some(local_addr) = self.cached_local_addr.get() {
179 return Ok(*local_addr);
180 }
181
182 let local_addr = Self::read_local_addr(&self.socket)?;
183 let _ = self.cached_local_addr.set(local_addr);
184 Ok(local_addr)
185 }
186
187 fn read_local_addr(socket: &Socket) -> Result<SocketAddr, MctxError> {
188 socket
189 .local_addr()
190 .map_err(MctxError::SocketLocalAddrFailed)?
191 .as_socket()
192 .ok_or_else(|| {
193 MctxError::SocketLocalAddrFailed(std::io::Error::other(
194 "socket local address was not an IP address",
195 ))
196 })
197 }
198
199 pub fn local_addr_v4(&self) -> Result<SocketAddrV4, MctxError> {
201 match self.local_addr()? {
202 SocketAddr::V4(local_addr) => Ok(local_addr),
203 SocketAddr::V6(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
204 }
205 }
206
207 pub fn local_addr_v6(&self) -> Result<SocketAddrV6, MctxError> {
209 match self.local_addr()? {
210 SocketAddr::V4(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
211 SocketAddr::V6(local_addr) => Ok(local_addr),
212 }
213 }
214
215 pub fn source_addr(&self) -> Result<IpAddr, MctxError> {
217 Ok(self.local_addr()?.ip())
218 }
219
220 pub fn announce_tuple(&self) -> Result<(IpAddr, IpAddr, u16), MctxError> {
222 Ok((self.source_addr()?, self.config.group, self.config.dst_port))
223 }
224
225 pub fn into_socket(self) -> Socket {
227 self.socket
228 }
229
230 pub fn into_parts(self) -> PublicationParts {
232 PublicationParts {
233 id: self.id,
234 config: self.config,
235 socket: self.socket,
236 }
237 }
238
239 #[cfg(feature = "metrics")]
241 pub fn metrics_snapshot(&self) -> PublicationMetricsSnapshot {
242 self.metrics.snapshot()
243 }
244
245 fn configure_socket(
246 socket: &Socket,
247 config: &PublicationConfig,
248 existing_socket: bool,
249 ) -> Result<SocketAddr, MctxError> {
250 if existing_socket {
251 Self::validate_existing_socket(socket, config)?;
252 }
253
254 socket
255 .set_nonblocking(true)
256 .map_err(MctxError::SocketOptionFailed)?;
257
258 match config.family() {
259 crate::PublicationAddressFamily::Ipv4 => Self::configure_socket_v4(socket, config),
260 crate::PublicationAddressFamily::Ipv6 => {
261 if !existing_socket {
262 socket
263 .set_only_v6(true)
264 .map_err(MctxError::SocketOptionFailed)?;
265 }
266 Self::configure_socket_v6(socket, config)
267 }
268 }
269 }
270
271 fn configure_socket_v4(
272 socket: &Socket,
273 config: &PublicationConfig,
274 ) -> Result<SocketAddr, MctxError> {
275 if let Some(bind_addr) = Self::bind_addr_v4(config) {
276 Self::bind_if_needed(socket, SocketAddr::V4(bind_addr))?;
277 }
278
279 if let Some(OutgoingInterface::Ipv4Addr(interface)) = config.outgoing_interface {
280 socket
281 .set_multicast_if_v4(&interface)
282 .map_err(MctxError::SocketOptionFailed)?;
283 }
284
285 socket
286 .set_multicast_loop_v4(config.loopback)
287 .map_err(MctxError::SocketOptionFailed)?;
288 socket
289 .set_multicast_ttl_v4(config.ttl)
290 .map_err(MctxError::SocketOptionFailed)?;
291
292 let destination = SocketAddrV4::new(Self::group_v4(config), config.dst_port);
293 socket
294 .connect(&SockAddr::from(destination))
295 .map_err(MctxError::SocketConnectFailed)?;
296
297 Ok(SocketAddr::V4(destination))
298 }
299
300 fn configure_socket_v6(
301 socket: &Socket,
302 config: &PublicationConfig,
303 ) -> Result<SocketAddr, MctxError> {
304 let group = Self::group_v6(config);
305 let explicit_source = Self::source_addr_v6(config);
306 let interface_addr = Self::interface_addr_v6(config);
307 let explicit_interface_index = Self::explicit_interface_index_v6(config, interface_addr)?;
308 let source_interface_index = match explicit_source {
309 Some(source) if source.is_unicast_link_local() => match explicit_interface_index {
310 Some(interface_index) => Some(interface_index),
311 None => Some(resolve_ipv6_interface_index(source)?),
312 },
313 Some(source) => Some(resolve_ipv6_interface_index(source)?),
314 None => None,
315 };
316
317 if let (Some(source), Some(source_interface_index), Some(outgoing_interface_index)) = (
318 explicit_source,
319 source_interface_index,
320 explicit_interface_index,
321 ) && source_interface_index != outgoing_interface_index
322 {
323 return Err(MctxError::Ipv6SourceInterfaceMismatch {
324 source_addr: IpAddr::V6(source),
325 source_interface_index,
326 outgoing_interface_index,
327 });
328 }
329
330 let effective_interface_index = explicit_interface_index.or(source_interface_index);
331 let bind_source = explicit_source.or(interface_addr);
332 let bind_scope_id = match bind_source {
333 Some(source) if source.is_unicast_link_local() => effective_interface_index
334 .ok_or_else(|| {
335 MctxError::InterfaceDiscoveryFailed(format!(
336 "failed to determine interface index for link-local IPv6 address {source}"
337 ))
338 })?,
339 _ => 0,
340 };
341
342 if let Some(bind_addr) = Self::bind_addr_v6(config, bind_source, bind_scope_id) {
343 Self::bind_if_needed(socket, SocketAddr::V6(bind_addr))?;
344 }
345
346 if let Some(interface_index) = effective_interface_index {
347 socket
348 .set_multicast_if_v6(interface_index)
349 .map_err(MctxError::SocketOptionFailed)?;
350 }
351
352 socket
353 .set_multicast_loop_v6(config.loopback)
354 .map_err(MctxError::SocketOptionFailed)?;
355 socket
356 .set_multicast_hops_v6(config.ttl)
357 .map_err(MctxError::SocketOptionFailed)?;
358
359 let destination_scope_id = match config.ipv6_scope() {
360 Some(Ipv6MulticastScope::InterfaceLocal | Ipv6MulticastScope::LinkLocal) => {
361 effective_interface_index.ok_or(MctxError::Ipv6ScopedMulticastRequiresInterface)?
362 }
363 _ => ipv6_destination_scope_id(group, effective_interface_index.unwrap_or(0)),
364 };
365 let destination = SocketAddrV6::new(group, config.dst_port, 0, destination_scope_id);
366 socket
367 .connect(&SockAddr::from(destination))
368 .map_err(MctxError::SocketConnectFailed)?;
369
370 Ok(SocketAddr::V6(destination))
371 }
372
373 fn validate_existing_socket(
374 socket: &Socket,
375 config: &PublicationConfig,
376 ) -> Result<(), MctxError> {
377 let Ok(local_addr) = socket.local_addr() else {
378 return Ok(());
379 };
380
381 match local_addr.as_socket() {
382 Some(SocketAddr::V4(local_v4)) => {
383 if config.is_ipv6() {
384 return Err(MctxError::ExistingSocketAddressFamilyMismatch);
385 }
386
387 if let Some(IpAddr::V4(expected)) = config.source_addr
388 && local_v4.ip() != &Ipv4Addr::UNSPECIFIED
389 && local_v4.ip() != &expected
390 {
391 return Err(MctxError::ExistingSocketAddressMismatch {
392 expected: IpAddr::V4(expected),
393 actual: IpAddr::V4(*local_v4.ip()),
394 });
395 }
396
397 if let Some(expected) = config.source_port
398 && local_v4.port() != 0
399 && local_v4.port() != expected
400 {
401 return Err(MctxError::ExistingSocketPortMismatch {
402 expected,
403 actual: local_v4.port(),
404 });
405 }
406
407 Ok(())
408 }
409 Some(SocketAddr::V6(local_v6)) => {
410 if config.is_ipv4() {
411 return Err(MctxError::ExistingSocketAddressFamilyMismatch);
412 }
413
414 if let Some(IpAddr::V6(expected)) = config.source_addr
415 && local_v6.ip() != &Ipv6Addr::UNSPECIFIED
416 && local_v6.ip() != &expected
417 {
418 return Err(MctxError::ExistingSocketAddressMismatch {
419 expected: IpAddr::V6(expected),
420 actual: IpAddr::V6(*local_v6.ip()),
421 });
422 }
423
424 if let Some(expected) = config.source_port
425 && local_v6.port() != 0
426 && local_v6.port() != expected
427 {
428 return Err(MctxError::ExistingSocketPortMismatch {
429 expected,
430 actual: local_v6.port(),
431 });
432 }
433
434 Ok(())
435 }
436 None => Err(MctxError::ExistingSocketAddressFamilyMismatch),
437 }
438 }
439
440 fn bind_if_needed(socket: &Socket, bind_addr: SocketAddr) -> Result<(), MctxError> {
441 let needs_bind = match socket.local_addr() {
442 Ok(local_addr) => match local_addr.as_socket() {
443 Some(local_addr) => local_addr != bind_addr,
444 None => return Err(MctxError::ExistingSocketAddressFamilyMismatch),
445 },
446 Err(_) => true,
447 };
448
449 if needs_bind {
450 socket
451 .bind(&SockAddr::from(bind_addr))
452 .map_err(MctxError::SocketBindFailed)?;
453 }
454
455 Ok(())
456 }
457
458 fn bind_addr_v4(config: &PublicationConfig) -> Option<SocketAddrV4> {
459 if config.source_addr.is_none() && config.source_port.is_none() {
460 return None;
461 }
462
463 Some(SocketAddrV4::new(
464 match config.source_addr {
465 Some(IpAddr::V4(source_addr)) => source_addr,
466 Some(IpAddr::V6(_)) => unreachable!("validated as IPv4"),
467 None => Ipv4Addr::UNSPECIFIED,
468 },
469 config.source_port.unwrap_or(0),
470 ))
471 }
472
473 fn bind_addr_v6(
474 config: &PublicationConfig,
475 bind_source: Option<Ipv6Addr>,
476 bind_scope_id: u32,
477 ) -> Option<SocketAddrV6> {
478 if bind_source.is_none() && config.source_port.is_none() {
479 return None;
480 }
481
482 Some(SocketAddrV6::new(
483 bind_source.unwrap_or(Ipv6Addr::UNSPECIFIED),
484 config.source_port.unwrap_or(0),
485 0,
486 bind_scope_id,
487 ))
488 }
489
490 fn group_v4(config: &PublicationConfig) -> Ipv4Addr {
491 match config.group {
492 IpAddr::V4(group) => group,
493 IpAddr::V6(_) => unreachable!("validated as IPv4"),
494 }
495 }
496
497 fn group_v6(config: &PublicationConfig) -> Ipv6Addr {
498 match config.group {
499 IpAddr::V4(_) => unreachable!("validated as IPv6"),
500 IpAddr::V6(group) => group,
501 }
502 }
503
504 fn source_addr_v6(config: &PublicationConfig) -> Option<Ipv6Addr> {
505 match config.source_addr {
506 Some(IpAddr::V4(_)) => unreachable!("validated as IPv6"),
507 Some(IpAddr::V6(source)) => Some(source),
508 None => None,
509 }
510 }
511
512 fn interface_addr_v6(config: &PublicationConfig) -> Option<Ipv6Addr> {
513 match config.outgoing_interface {
514 Some(OutgoingInterface::Ipv4Addr(_)) => unreachable!("validated as IPv6"),
515 Some(OutgoingInterface::Ipv6Addr(interface)) => Some(interface),
516 Some(OutgoingInterface::Ipv6Index(_)) | None => None,
517 }
518 }
519
520 fn explicit_interface_index_v6(
521 config: &PublicationConfig,
522 interface_addr: Option<Ipv6Addr>,
523 ) -> Result<Option<u32>, MctxError> {
524 match config.outgoing_interface {
525 Some(OutgoingInterface::Ipv4Addr(_)) => unreachable!("validated as IPv6"),
526 Some(OutgoingInterface::Ipv6Index(index)) => Ok(Some(index)),
527 Some(OutgoingInterface::Ipv6Addr(_)) => {
528 interface_addr.map(resolve_ipv6_interface_index).transpose()
529 }
530 None => Ok(None),
531 }
532 }
533}
534
535#[cfg(unix)]
536impl AsRawFd for Publication {
537 fn as_raw_fd(&self) -> RawFd {
538 self.socket.as_raw_fd()
539 }
540}
541
542#[cfg(windows)]
543impl AsRawSocket for Publication {
544 fn as_raw_socket(&self) -> RawSocket {
545 self.socket.as_raw_socket()
546 }
547}
548
549#[cfg(feature = "metrics")]
550#[derive(Debug, Default)]
551struct PublicationMetricsState {
552 send_calls: std::sync::atomic::AtomicU64,
553 packets_sent: std::sync::atomic::AtomicU64,
554 bytes_sent: std::sync::atomic::AtomicU64,
555 send_errors: std::sync::atomic::AtomicU64,
556}
557
558#[cfg(feature = "metrics")]
559impl PublicationMetricsState {
560 fn record_success(&self, bytes_sent: usize) {
561 use std::sync::atomic::Ordering::Relaxed;
562
563 self.send_calls.fetch_add(1, Relaxed);
564 self.packets_sent.fetch_add(1, Relaxed);
565 self.bytes_sent.fetch_add(bytes_sent as u64, Relaxed);
566 }
567
568 fn record_error(&self) {
569 use std::sync::atomic::Ordering::Relaxed;
570
571 self.send_calls.fetch_add(1, Relaxed);
572 self.send_errors.fetch_add(1, Relaxed);
573 }
574
575 fn snapshot(&self) -> PublicationMetricsSnapshot {
576 use std::sync::atomic::Ordering::Relaxed;
577
578 PublicationMetricsSnapshot {
579 send_calls: self.send_calls.load(Relaxed),
580 packets_sent: self.packets_sent.load(Relaxed),
581 bytes_sent: self.bytes_sent.load(Relaxed),
582 send_errors: self.send_errors.load(Relaxed),
583 captured_at: SystemTime::now(),
584 }
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591 #[cfg(feature = "metrics")]
592 use crate::metrics::PublicationMetricsSampler;
593 use crate::test_support::{
594 TEST_GROUP, TEST_GROUP_V6_GLOBAL, TEST_GROUP_V6_SAME_HOST, recv_payload,
595 recv_payload_with_source, test_multicast_receiver, test_multicast_receiver_v6,
596 };
597 use socket2::{Domain, Protocol, SockAddr, Type};
598
599 #[test]
600 fn publication_send_reaches_a_local_receiver() {
601 let (receiver, port) = test_multicast_receiver();
602 let publication =
603 Publication::new(PublicationId(1), PublicationConfig::new(TEST_GROUP, port)).unwrap();
604
605 let report = publication.send(b"hello multicast").unwrap();
606 let payload = recv_payload(&receiver);
607 let announce = publication.announce_tuple().unwrap();
608
609 assert_eq!(
610 report.destination,
611 SocketAddr::V4(SocketAddrV4::new(TEST_GROUP, port))
612 );
613 assert!(report.local_addr.is_some());
614 assert_eq!(report.source_addr, report.local_addr.map(|addr| addr.ip()));
615 assert_eq!(announce.1, IpAddr::V4(TEST_GROUP));
616 assert_eq!(announce.2, port);
617 assert_eq!(payload, b"hello multicast");
618 }
619
620 #[test]
621 fn publication_send_reaches_a_local_ipv6_receiver_with_configured_source() {
622 let interface = Ipv6Addr::LOCALHOST;
623 let source = Ipv6Addr::LOCALHOST;
624 let (receiver, port) = test_multicast_receiver_v6(TEST_GROUP_V6_SAME_HOST, interface);
625 let publication = Publication::new(
626 PublicationId(1),
627 PublicationConfig::new(TEST_GROUP_V6_SAME_HOST, port)
628 .with_source_addr(source)
629 .with_outgoing_interface(interface),
630 )
631 .unwrap();
632
633 let report = publication.send(b"hello multicast v6").unwrap();
634 let (payload, sender) = recv_payload_with_source(&receiver);
635
636 assert_eq!(
637 report.destination,
638 SocketAddr::V6(SocketAddrV6::new(
639 TEST_GROUP_V6_SAME_HOST,
640 port,
641 0,
642 publication.destination_v6().unwrap().scope_id()
643 ))
644 );
645 assert_eq!(report.source_addr, Some(IpAddr::V6(source)));
646 assert_eq!(sender.ip(), IpAddr::V6(source));
647 assert_eq!(payload, b"hello multicast v6");
648 }
649
650 #[test]
651 fn ipv6_interface_address_auto_binds_the_sender_source() {
652 let interface = Ipv6Addr::LOCALHOST;
653 let (receiver, port) = test_multicast_receiver_v6(TEST_GROUP_V6_SAME_HOST, interface);
654 let publication = Publication::new(
655 PublicationId(1),
656 PublicationConfig::new(TEST_GROUP_V6_SAME_HOST, port)
657 .with_outgoing_interface(interface),
658 )
659 .unwrap();
660
661 let report = publication.send(b"auto-bind v6").unwrap();
662 let (_payload, sender) = recv_payload_with_source(&receiver);
663
664 assert_eq!(report.source_addr, Some(IpAddr::V6(interface)));
665 assert_eq!(sender.ip(), IpAddr::V6(interface));
666 }
667
668 #[test]
669 fn wider_scope_ipv6_group_clears_destination_scope_id() {
670 let publication = Publication::new(
671 PublicationId(1),
672 PublicationConfig::new(TEST_GROUP_V6_GLOBAL, 5000)
673 .with_source_addr(Ipv6Addr::LOCALHOST),
674 )
675 .unwrap();
676
677 assert_eq!(publication.destination_v6().unwrap().scope_id(), 0);
678 }
679
680 #[cfg(feature = "metrics")]
681 #[test]
682 fn publication_metrics_track_successful_sends() {
683 let (_receiver, port) = test_multicast_receiver();
684 let publication =
685 Publication::new(PublicationId(1), PublicationConfig::new(TEST_GROUP, port)).unwrap();
686 let mut sampler = PublicationMetricsSampler::new(&publication);
687
688 assert!(sampler.sample().is_none());
689 publication.send(b"metrics packet").unwrap();
690
691 let snapshot = publication.metrics_snapshot();
692 let delta = sampler.sample().unwrap();
693
694 assert_eq!(snapshot.send_calls, 1);
695 assert_eq!(snapshot.packets_sent, 1);
696 assert_eq!(snapshot.bytes_sent, b"metrics packet".len() as u64);
697 assert_eq!(snapshot.send_errors, 0);
698 assert_eq!(delta.send_calls, 1);
699 assert_eq!(delta.packets_sent, 1);
700 assert_eq!(delta.bytes_sent, b"metrics packet".len() as u64);
701 assert_eq!(delta.send_errors, 0);
702 }
703
704 #[test]
705 fn existing_socket_source_addr_mismatch_is_rejected() {
706 let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
707 socket
708 .bind(&SockAddr::from(SocketAddrV4::new(
709 Ipv4Addr::new(127, 0, 0, 1),
710 0,
711 )))
712 .unwrap();
713
714 let result = Publication::new_with_socket(
715 PublicationId(1),
716 PublicationConfig::new(TEST_GROUP, 5000).with_source_addr(Ipv4Addr::new(127, 0, 0, 2)),
717 socket,
718 );
719
720 assert!(matches!(
721 result,
722 Err(MctxError::ExistingSocketAddressMismatch { expected, actual })
723 if expected == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2))
724 && actual == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
725 ));
726 }
727}