Skip to main content

mctx_core/
publication.rs

1#[cfg(feature = "metrics")]
2use crate::metrics::PublicationMetricsSnapshot;
3use crate::platform::resolve_ipv6_interface_index;
4use crate::{
5    MctxError, SendReport,
6    config::{Ipv6MulticastScope, OutgoingInterface, PublicationConfig, ipv6_destination_scope_id},
7};
8use socket2::{Domain, Protocol, SockAddr, Socket, Type};
9use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket};
10#[cfg(unix)]
11use std::os::fd::{AsRawFd, RawFd};
12#[cfg(windows)]
13use std::os::windows::io::{AsRawSocket, RawSocket};
14use std::sync::OnceLock;
15#[cfg(feature = "metrics")]
16use std::time::SystemTime;
17
18/// Stable ID for one configured publication socket.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub struct PublicationId(pub u64);
21
22/// Extracted publication parts.
23#[derive(Debug)]
24pub struct PublicationParts {
25    pub id: PublicationId,
26    pub config: PublicationConfig,
27    pub socket: Socket,
28}
29
30/// One ready-to-send multicast publication.
31#[derive(Debug)]
32pub struct Publication {
33    id: PublicationId,
34    config: PublicationConfig,
35    socket: Socket,
36    destination: SocketAddr,
37    cached_local_addr: OnceLock<SocketAddr>,
38    #[cfg(feature = "metrics")]
39    metrics: PublicationMetricsState,
40}
41
42impl Publication {
43    /// Creates and configures a new multicast publication socket.
44    pub fn new(id: PublicationId, config: PublicationConfig) -> Result<Self, MctxError> {
45        config.validate()?;
46
47        let domain = match config.family() {
48            crate::PublicationAddressFamily::Ipv4 => Domain::IPV4,
49            crate::PublicationAddressFamily::Ipv6 => Domain::IPV6,
50        };
51        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
52            .map_err(MctxError::SocketCreateFailed)?;
53
54        let destination = Self::configure_socket(&socket, &config, false)?;
55
56        Ok(Self {
57            id,
58            config,
59            socket,
60            destination,
61            cached_local_addr: OnceLock::new(),
62            #[cfg(feature = "metrics")]
63            metrics: PublicationMetricsState::default(),
64        })
65    }
66
67    /// Wraps and configures an existing multicast socket.
68    pub fn new_with_socket(
69        id: PublicationId,
70        config: PublicationConfig,
71        socket: Socket,
72    ) -> Result<Self, MctxError> {
73        config.validate()?;
74        let destination = Self::configure_socket(&socket, &config, true)?;
75
76        Ok(Self {
77            id,
78            config,
79            socket,
80            destination,
81            cached_local_addr: OnceLock::new(),
82            #[cfg(feature = "metrics")]
83            metrics: PublicationMetricsState::default(),
84        })
85    }
86
87    /// Wraps and configures an existing standard-library UDP socket.
88    pub fn new_with_udp_socket(
89        id: PublicationId,
90        config: PublicationConfig,
91        socket: UdpSocket,
92    ) -> Result<Self, MctxError> {
93        Self::new_with_socket(id, config, Socket::from(socket))
94    }
95
96    /// Returns the publication ID.
97    pub fn id(&self) -> PublicationId {
98        self.id
99    }
100
101    /// Returns the configured publication parameters.
102    pub fn config(&self) -> &PublicationConfig {
103        &self.config
104    }
105
106    /// Returns the resolved destination address.
107    pub fn destination(&self) -> SocketAddr {
108        self.destination
109    }
110
111    /// Returns the resolved destination IPv4 address.
112    pub fn destination_v4(&self) -> Result<SocketAddrV4, MctxError> {
113        match self.destination {
114            SocketAddr::V4(destination) => Ok(destination),
115            SocketAddr::V6(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
116        }
117    }
118
119    /// Returns the resolved destination IPv6 address.
120    pub fn destination_v6(&self) -> Result<SocketAddrV6, MctxError> {
121        match self.destination {
122            SocketAddr::V4(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
123            SocketAddr::V6(destination) => Ok(destination),
124        }
125    }
126
127    /// Returns a shared reference to the live socket.
128    ///
129    /// Avoid rebinding or reconnecting the socket through this handle. The
130    /// publication caches its resolved local address after the first lookup for
131    /// send reporting, and shared-reference socket mutation cannot invalidate
132    /// that cache.
133    pub fn socket(&self) -> &Socket {
134        &self.socket
135    }
136
137    /// Returns a mutable reference to the live socket.
138    ///
139    /// This clears any cached local-address metadata before handing the socket
140    /// out so the next send or `local_addr()` call refreshes it.
141    pub fn socket_mut(&mut self) -> &mut Socket {
142        self.cached_local_addr = OnceLock::new();
143        &mut self.socket
144    }
145
146    /// Sends one payload.
147    pub fn send(&self, payload: &[u8]) -> Result<SendReport, MctxError> {
148        match self.socket.send(payload) {
149            Ok(bytes_sent) => {
150                #[cfg(feature = "metrics")]
151                self.metrics.record_success(bytes_sent);
152
153                let local_addr = self.cached_local_addr().ok();
154
155                Ok(SendReport {
156                    publication_id: self.id,
157                    destination: self.destination,
158                    local_addr,
159                    source_addr: local_addr.map(|addr| addr.ip()),
160                    bytes_sent,
161                })
162            }
163            Err(error) => {
164                #[cfg(feature = "metrics")]
165                self.metrics.record_error();
166
167                Err(MctxError::SendFailed(error))
168            }
169        }
170    }
171
172    /// Returns the publication socket local address.
173    pub fn local_addr(&self) -> Result<SocketAddr, MctxError> {
174        self.cached_local_addr()
175    }
176
177    fn cached_local_addr(&self) -> Result<SocketAddr, MctxError> {
178        if let Some(local_addr) = self.cached_local_addr.get() {
179            return Ok(*local_addr);
180        }
181
182        let local_addr = Self::read_local_addr(&self.socket)?;
183        let _ = self.cached_local_addr.set(local_addr);
184        Ok(local_addr)
185    }
186
187    fn read_local_addr(socket: &Socket) -> Result<SocketAddr, MctxError> {
188        socket
189            .local_addr()
190            .map_err(MctxError::SocketLocalAddrFailed)?
191            .as_socket()
192            .ok_or_else(|| {
193                MctxError::SocketLocalAddrFailed(std::io::Error::other(
194                    "socket local address was not an IP address",
195                ))
196            })
197    }
198
199    /// Returns the publication socket local IPv4 address.
200    pub fn local_addr_v4(&self) -> Result<SocketAddrV4, MctxError> {
201        match self.local_addr()? {
202            SocketAddr::V4(local_addr) => Ok(local_addr),
203            SocketAddr::V6(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
204        }
205    }
206
207    /// Returns the publication socket local IPv6 address.
208    pub fn local_addr_v6(&self) -> Result<SocketAddrV6, MctxError> {
209        match self.local_addr()? {
210            SocketAddr::V4(_) => Err(MctxError::ExistingSocketAddressFamilyMismatch),
211            SocketAddr::V6(local_addr) => Ok(local_addr),
212        }
213    }
214
215    /// Returns the effective local source address.
216    pub fn source_addr(&self) -> Result<IpAddr, MctxError> {
217        Ok(self.local_addr()?.ip())
218    }
219
220    /// Returns the `(source, group, udp_port)` tuple needed for announce frames.
221    pub fn announce_tuple(&self) -> Result<(IpAddr, IpAddr, u16), MctxError> {
222        Ok((self.source_addr()?, self.config.group, self.config.dst_port))
223    }
224
225    /// Consumes the publication and returns the live socket.
226    pub fn into_socket(self) -> Socket {
227        self.socket
228    }
229
230    /// Consumes the publication and returns all parts.
231    pub fn into_parts(self) -> PublicationParts {
232        PublicationParts {
233            id: self.id,
234            config: self.config,
235            socket: self.socket,
236        }
237    }
238
239    /// Returns a metrics snapshot for one publication.
240    #[cfg(feature = "metrics")]
241    pub fn metrics_snapshot(&self) -> PublicationMetricsSnapshot {
242        self.metrics.snapshot()
243    }
244
245    fn configure_socket(
246        socket: &Socket,
247        config: &PublicationConfig,
248        existing_socket: bool,
249    ) -> Result<SocketAddr, MctxError> {
250        if existing_socket {
251            Self::validate_existing_socket(socket, config)?;
252        }
253
254        socket
255            .set_nonblocking(true)
256            .map_err(MctxError::SocketOptionFailed)?;
257
258        match config.family() {
259            crate::PublicationAddressFamily::Ipv4 => Self::configure_socket_v4(socket, config),
260            crate::PublicationAddressFamily::Ipv6 => {
261                if !existing_socket {
262                    socket
263                        .set_only_v6(true)
264                        .map_err(MctxError::SocketOptionFailed)?;
265                }
266                Self::configure_socket_v6(socket, config)
267            }
268        }
269    }
270
271    fn configure_socket_v4(
272        socket: &Socket,
273        config: &PublicationConfig,
274    ) -> Result<SocketAddr, MctxError> {
275        if let Some(bind_addr) = Self::bind_addr_v4(config) {
276            Self::bind_if_needed(socket, SocketAddr::V4(bind_addr))?;
277        }
278
279        if let Some(OutgoingInterface::Ipv4Addr(interface)) = config.outgoing_interface {
280            socket
281                .set_multicast_if_v4(&interface)
282                .map_err(MctxError::SocketOptionFailed)?;
283        }
284
285        socket
286            .set_multicast_loop_v4(config.loopback)
287            .map_err(MctxError::SocketOptionFailed)?;
288        socket
289            .set_multicast_ttl_v4(config.ttl)
290            .map_err(MctxError::SocketOptionFailed)?;
291
292        let destination = SocketAddrV4::new(Self::group_v4(config), config.dst_port);
293        socket
294            .connect(&SockAddr::from(destination))
295            .map_err(MctxError::SocketConnectFailed)?;
296
297        Ok(SocketAddr::V4(destination))
298    }
299
300    fn configure_socket_v6(
301        socket: &Socket,
302        config: &PublicationConfig,
303    ) -> Result<SocketAddr, MctxError> {
304        let group = Self::group_v6(config);
305        let explicit_source = Self::source_addr_v6(config);
306        let interface_addr = Self::interface_addr_v6(config);
307        let explicit_interface_index = Self::explicit_interface_index_v6(config, interface_addr)?;
308        let source_interface_index = match explicit_source {
309            Some(source) if source.is_unicast_link_local() => match explicit_interface_index {
310                Some(interface_index) => Some(interface_index),
311                None => Some(resolve_ipv6_interface_index(source)?),
312            },
313            Some(source) => Some(resolve_ipv6_interface_index(source)?),
314            None => None,
315        };
316
317        if let (Some(source), Some(source_interface_index), Some(outgoing_interface_index)) = (
318            explicit_source,
319            source_interface_index,
320            explicit_interface_index,
321        ) && source_interface_index != outgoing_interface_index
322        {
323            return Err(MctxError::Ipv6SourceInterfaceMismatch {
324                source_addr: IpAddr::V6(source),
325                source_interface_index,
326                outgoing_interface_index,
327            });
328        }
329
330        let effective_interface_index = explicit_interface_index.or(source_interface_index);
331        let bind_source = explicit_source.or(interface_addr);
332        let bind_scope_id = match bind_source {
333            Some(source) if source.is_unicast_link_local() => effective_interface_index
334                .ok_or_else(|| {
335                    MctxError::InterfaceDiscoveryFailed(format!(
336                        "failed to determine interface index for link-local IPv6 address {source}"
337                    ))
338                })?,
339            _ => 0,
340        };
341
342        if let Some(bind_addr) = Self::bind_addr_v6(config, bind_source, bind_scope_id) {
343            Self::bind_if_needed(socket, SocketAddr::V6(bind_addr))?;
344        }
345
346        if let Some(interface_index) = effective_interface_index {
347            socket
348                .set_multicast_if_v6(interface_index)
349                .map_err(MctxError::SocketOptionFailed)?;
350        }
351
352        socket
353            .set_multicast_loop_v6(config.loopback)
354            .map_err(MctxError::SocketOptionFailed)?;
355        socket
356            .set_multicast_hops_v6(config.ttl)
357            .map_err(MctxError::SocketOptionFailed)?;
358
359        let destination_scope_id = match config.ipv6_scope() {
360            Some(Ipv6MulticastScope::InterfaceLocal | Ipv6MulticastScope::LinkLocal) => {
361                effective_interface_index.ok_or(MctxError::Ipv6ScopedMulticastRequiresInterface)?
362            }
363            _ => ipv6_destination_scope_id(group, effective_interface_index.unwrap_or(0)),
364        };
365        let destination = SocketAddrV6::new(group, config.dst_port, 0, destination_scope_id);
366        socket
367            .connect(&SockAddr::from(destination))
368            .map_err(MctxError::SocketConnectFailed)?;
369
370        Ok(SocketAddr::V6(destination))
371    }
372
373    fn validate_existing_socket(
374        socket: &Socket,
375        config: &PublicationConfig,
376    ) -> Result<(), MctxError> {
377        let Ok(local_addr) = socket.local_addr() else {
378            return Ok(());
379        };
380
381        match local_addr.as_socket() {
382            Some(SocketAddr::V4(local_v4)) => {
383                if config.is_ipv6() {
384                    return Err(MctxError::ExistingSocketAddressFamilyMismatch);
385                }
386
387                if let Some(IpAddr::V4(expected)) = config.source_addr
388                    && local_v4.ip() != &Ipv4Addr::UNSPECIFIED
389                    && local_v4.ip() != &expected
390                {
391                    return Err(MctxError::ExistingSocketAddressMismatch {
392                        expected: IpAddr::V4(expected),
393                        actual: IpAddr::V4(*local_v4.ip()),
394                    });
395                }
396
397                if let Some(expected) = config.source_port
398                    && local_v4.port() != 0
399                    && local_v4.port() != expected
400                {
401                    return Err(MctxError::ExistingSocketPortMismatch {
402                        expected,
403                        actual: local_v4.port(),
404                    });
405                }
406
407                Ok(())
408            }
409            Some(SocketAddr::V6(local_v6)) => {
410                if config.is_ipv4() {
411                    return Err(MctxError::ExistingSocketAddressFamilyMismatch);
412                }
413
414                if let Some(IpAddr::V6(expected)) = config.source_addr
415                    && local_v6.ip() != &Ipv6Addr::UNSPECIFIED
416                    && local_v6.ip() != &expected
417                {
418                    return Err(MctxError::ExistingSocketAddressMismatch {
419                        expected: IpAddr::V6(expected),
420                        actual: IpAddr::V6(*local_v6.ip()),
421                    });
422                }
423
424                if let Some(expected) = config.source_port
425                    && local_v6.port() != 0
426                    && local_v6.port() != expected
427                {
428                    return Err(MctxError::ExistingSocketPortMismatch {
429                        expected,
430                        actual: local_v6.port(),
431                    });
432                }
433
434                Ok(())
435            }
436            None => Err(MctxError::ExistingSocketAddressFamilyMismatch),
437        }
438    }
439
440    fn bind_if_needed(socket: &Socket, bind_addr: SocketAddr) -> Result<(), MctxError> {
441        let needs_bind = match socket.local_addr() {
442            Ok(local_addr) => match local_addr.as_socket() {
443                Some(local_addr) => local_addr != bind_addr,
444                None => return Err(MctxError::ExistingSocketAddressFamilyMismatch),
445            },
446            Err(_) => true,
447        };
448
449        if needs_bind {
450            socket
451                .bind(&SockAddr::from(bind_addr))
452                .map_err(MctxError::SocketBindFailed)?;
453        }
454
455        Ok(())
456    }
457
458    fn bind_addr_v4(config: &PublicationConfig) -> Option<SocketAddrV4> {
459        if config.source_addr.is_none() && config.source_port.is_none() {
460            return None;
461        }
462
463        Some(SocketAddrV4::new(
464            match config.source_addr {
465                Some(IpAddr::V4(source_addr)) => source_addr,
466                Some(IpAddr::V6(_)) => unreachable!("validated as IPv4"),
467                None => Ipv4Addr::UNSPECIFIED,
468            },
469            config.source_port.unwrap_or(0),
470        ))
471    }
472
473    fn bind_addr_v6(
474        config: &PublicationConfig,
475        bind_source: Option<Ipv6Addr>,
476        bind_scope_id: u32,
477    ) -> Option<SocketAddrV6> {
478        if bind_source.is_none() && config.source_port.is_none() {
479            return None;
480        }
481
482        Some(SocketAddrV6::new(
483            bind_source.unwrap_or(Ipv6Addr::UNSPECIFIED),
484            config.source_port.unwrap_or(0),
485            0,
486            bind_scope_id,
487        ))
488    }
489
490    fn group_v4(config: &PublicationConfig) -> Ipv4Addr {
491        match config.group {
492            IpAddr::V4(group) => group,
493            IpAddr::V6(_) => unreachable!("validated as IPv4"),
494        }
495    }
496
497    fn group_v6(config: &PublicationConfig) -> Ipv6Addr {
498        match config.group {
499            IpAddr::V4(_) => unreachable!("validated as IPv6"),
500            IpAddr::V6(group) => group,
501        }
502    }
503
504    fn source_addr_v6(config: &PublicationConfig) -> Option<Ipv6Addr> {
505        match config.source_addr {
506            Some(IpAddr::V4(_)) => unreachable!("validated as IPv6"),
507            Some(IpAddr::V6(source)) => Some(source),
508            None => None,
509        }
510    }
511
512    fn interface_addr_v6(config: &PublicationConfig) -> Option<Ipv6Addr> {
513        match config.outgoing_interface {
514            Some(OutgoingInterface::Ipv4Addr(_)) => unreachable!("validated as IPv6"),
515            Some(OutgoingInterface::Ipv6Addr(interface)) => Some(interface),
516            Some(OutgoingInterface::Ipv6Index(_)) | None => None,
517        }
518    }
519
520    fn explicit_interface_index_v6(
521        config: &PublicationConfig,
522        interface_addr: Option<Ipv6Addr>,
523    ) -> Result<Option<u32>, MctxError> {
524        match config.outgoing_interface {
525            Some(OutgoingInterface::Ipv4Addr(_)) => unreachable!("validated as IPv6"),
526            Some(OutgoingInterface::Ipv6Index(index)) => Ok(Some(index)),
527            Some(OutgoingInterface::Ipv6Addr(_)) => {
528                interface_addr.map(resolve_ipv6_interface_index).transpose()
529            }
530            None => Ok(None),
531        }
532    }
533}
534
535#[cfg(unix)]
536impl AsRawFd for Publication {
537    fn as_raw_fd(&self) -> RawFd {
538        self.socket.as_raw_fd()
539    }
540}
541
542#[cfg(windows)]
543impl AsRawSocket for Publication {
544    fn as_raw_socket(&self) -> RawSocket {
545        self.socket.as_raw_socket()
546    }
547}
548
549#[cfg(feature = "metrics")]
550#[derive(Debug, Default)]
551struct PublicationMetricsState {
552    send_calls: std::sync::atomic::AtomicU64,
553    packets_sent: std::sync::atomic::AtomicU64,
554    bytes_sent: std::sync::atomic::AtomicU64,
555    send_errors: std::sync::atomic::AtomicU64,
556}
557
558#[cfg(feature = "metrics")]
559impl PublicationMetricsState {
560    fn record_success(&self, bytes_sent: usize) {
561        use std::sync::atomic::Ordering::Relaxed;
562
563        self.send_calls.fetch_add(1, Relaxed);
564        self.packets_sent.fetch_add(1, Relaxed);
565        self.bytes_sent.fetch_add(bytes_sent as u64, Relaxed);
566    }
567
568    fn record_error(&self) {
569        use std::sync::atomic::Ordering::Relaxed;
570
571        self.send_calls.fetch_add(1, Relaxed);
572        self.send_errors.fetch_add(1, Relaxed);
573    }
574
575    fn snapshot(&self) -> PublicationMetricsSnapshot {
576        use std::sync::atomic::Ordering::Relaxed;
577
578        PublicationMetricsSnapshot {
579            send_calls: self.send_calls.load(Relaxed),
580            packets_sent: self.packets_sent.load(Relaxed),
581            bytes_sent: self.bytes_sent.load(Relaxed),
582            send_errors: self.send_errors.load(Relaxed),
583            captured_at: SystemTime::now(),
584        }
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591    #[cfg(feature = "metrics")]
592    use crate::metrics::PublicationMetricsSampler;
593    use crate::test_support::{
594        TEST_GROUP, TEST_GROUP_V6_GLOBAL, TEST_GROUP_V6_SAME_HOST, recv_payload,
595        recv_payload_with_source, test_multicast_receiver, test_multicast_receiver_v6,
596    };
597    use socket2::{Domain, Protocol, SockAddr, Type};
598
599    #[test]
600    fn publication_send_reaches_a_local_receiver() {
601        let (receiver, port) = test_multicast_receiver();
602        let publication =
603            Publication::new(PublicationId(1), PublicationConfig::new(TEST_GROUP, port)).unwrap();
604
605        let report = publication.send(b"hello multicast").unwrap();
606        let payload = recv_payload(&receiver);
607        let announce = publication.announce_tuple().unwrap();
608
609        assert_eq!(
610            report.destination,
611            SocketAddr::V4(SocketAddrV4::new(TEST_GROUP, port))
612        );
613        assert!(report.local_addr.is_some());
614        assert_eq!(report.source_addr, report.local_addr.map(|addr| addr.ip()));
615        assert_eq!(announce.1, IpAddr::V4(TEST_GROUP));
616        assert_eq!(announce.2, port);
617        assert_eq!(payload, b"hello multicast");
618    }
619
620    #[test]
621    fn publication_send_reaches_a_local_ipv6_receiver_with_configured_source() {
622        let interface = Ipv6Addr::LOCALHOST;
623        let source = Ipv6Addr::LOCALHOST;
624        let (receiver, port) = test_multicast_receiver_v6(TEST_GROUP_V6_SAME_HOST, interface);
625        let publication = Publication::new(
626            PublicationId(1),
627            PublicationConfig::new(TEST_GROUP_V6_SAME_HOST, port)
628                .with_source_addr(source)
629                .with_outgoing_interface(interface),
630        )
631        .unwrap();
632
633        let report = publication.send(b"hello multicast v6").unwrap();
634        let (payload, sender) = recv_payload_with_source(&receiver);
635
636        assert_eq!(
637            report.destination,
638            SocketAddr::V6(SocketAddrV6::new(
639                TEST_GROUP_V6_SAME_HOST,
640                port,
641                0,
642                publication.destination_v6().unwrap().scope_id()
643            ))
644        );
645        assert_eq!(report.source_addr, Some(IpAddr::V6(source)));
646        assert_eq!(sender.ip(), IpAddr::V6(source));
647        assert_eq!(payload, b"hello multicast v6");
648    }
649
650    #[test]
651    fn ipv6_interface_address_auto_binds_the_sender_source() {
652        let interface = Ipv6Addr::LOCALHOST;
653        let (receiver, port) = test_multicast_receiver_v6(TEST_GROUP_V6_SAME_HOST, interface);
654        let publication = Publication::new(
655            PublicationId(1),
656            PublicationConfig::new(TEST_GROUP_V6_SAME_HOST, port)
657                .with_outgoing_interface(interface),
658        )
659        .unwrap();
660
661        let report = publication.send(b"auto-bind v6").unwrap();
662        let (_payload, sender) = recv_payload_with_source(&receiver);
663
664        assert_eq!(report.source_addr, Some(IpAddr::V6(interface)));
665        assert_eq!(sender.ip(), IpAddr::V6(interface));
666    }
667
668    #[test]
669    fn wider_scope_ipv6_group_clears_destination_scope_id() {
670        let publication = Publication::new(
671            PublicationId(1),
672            PublicationConfig::new(TEST_GROUP_V6_GLOBAL, 5000)
673                .with_source_addr(Ipv6Addr::LOCALHOST),
674        )
675        .unwrap();
676
677        assert_eq!(publication.destination_v6().unwrap().scope_id(), 0);
678    }
679
680    #[cfg(feature = "metrics")]
681    #[test]
682    fn publication_metrics_track_successful_sends() {
683        let (_receiver, port) = test_multicast_receiver();
684        let publication =
685            Publication::new(PublicationId(1), PublicationConfig::new(TEST_GROUP, port)).unwrap();
686        let mut sampler = PublicationMetricsSampler::new(&publication);
687
688        assert!(sampler.sample().is_none());
689        publication.send(b"metrics packet").unwrap();
690
691        let snapshot = publication.metrics_snapshot();
692        let delta = sampler.sample().unwrap();
693
694        assert_eq!(snapshot.send_calls, 1);
695        assert_eq!(snapshot.packets_sent, 1);
696        assert_eq!(snapshot.bytes_sent, b"metrics packet".len() as u64);
697        assert_eq!(snapshot.send_errors, 0);
698        assert_eq!(delta.send_calls, 1);
699        assert_eq!(delta.packets_sent, 1);
700        assert_eq!(delta.bytes_sent, b"metrics packet".len() as u64);
701        assert_eq!(delta.send_errors, 0);
702    }
703
704    #[test]
705    fn existing_socket_source_addr_mismatch_is_rejected() {
706        let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
707        socket
708            .bind(&SockAddr::from(SocketAddrV4::new(
709                Ipv4Addr::new(127, 0, 0, 1),
710                0,
711            )))
712            .unwrap();
713
714        let result = Publication::new_with_socket(
715            PublicationId(1),
716            PublicationConfig::new(TEST_GROUP, 5000).with_source_addr(Ipv4Addr::new(127, 0, 0, 2)),
717            socket,
718        );
719
720        assert!(matches!(
721            result,
722            Err(MctxError::ExistingSocketAddressMismatch { expected, actual })
723                if expected == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2))
724                    && actual == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
725        ));
726    }
727}