unix_udp_sock/
lib.rs

1//! Uniform interface to send/recv UDP packets with ECN information.
2use std::{
3    net::{IpAddr, Ipv6Addr, SocketAddr},
4    sync::atomic::{AtomicUsize, Ordering},
5};
6
7pub use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit};
8use imp::LastSendError;
9use tracing::warn;
10
11mod cmsg;
12
13#[path = "unix.rs"]
14mod imp;
15
16pub use imp::{sync, UdpSocket};
17pub mod framed;
18
19/// Maximum number of UDP packets that can be sent by the `sendmmsg`/`recvmmsg`
20/// wrappers.  Note that, for supported platforms, the OS caps the batch size at
21/// this value, but will not return an error, so this is just a suggested
22/// maximum.
23///
24/// Presently, this is 1024 on Linux and FreeBSD, and 1 on platforms that don't
25/// support `sendmmsg`/`recvmmsg`
26pub const BATCH_SIZE_CAP: usize = imp::BATCH_SIZE_CAP;
27
28/// Default number of UDP packets to send/receive at a time.
29pub const DEFAULT_BATCH_SIZE: usize = imp::DEFAULT_BATCH_SIZE;
30
31/// The capabilities a UDP socket suppports on a certain platform
32#[derive(Debug)]
33pub struct UdpState {
34    max_gso_segments: AtomicUsize,
35    gro_segments: usize,
36}
37
38impl UdpState {
39    pub fn new() -> Self {
40        imp::udp_state()
41    }
42
43    /// The maximum amount of segments which can be transmitted if a platform
44    /// supports Generic Send Offload (GSO).
45    ///
46    /// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
47    /// while using GSO.
48    #[inline]
49    pub fn max_gso_segments(&self) -> usize {
50        self.max_gso_segments.load(Ordering::Relaxed)
51    }
52
53    /// The number of segments to read when GRO is enabled. Used as a factor to
54    /// compute the receive buffer size.
55    ///
56    /// Returns 1 if the platform doesn't support GRO.
57    #[inline]
58    pub fn gro_segments(&self) -> usize {
59        self.gro_segments
60    }
61}
62
63impl Default for UdpState {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69/// Metadata about received packet. Includes which address we
70/// recv'd from, how many bytes, ecn codepoints, what the
71/// destination IP used was and what interface index was used.
72#[derive(Debug, Copy, Clone)]
73pub struct RecvMeta {
74    /// address we received datagram on
75    pub addr: SocketAddr,
76    /// length of datagram
77    pub len: usize,
78    /// received datagram stride
79    pub stride: usize,
80    /// ECN codepoint
81    pub ecn: Option<EcnCodepoint>,
82    /// The destination IP address for this datagram (ipi_addr)
83    pub dst_ip: Option<IpAddr>,
84    /// The destination local IP address for this datagram (ipi_spec_dst)
85    pub dst_local_ip: Option<IpAddr>,
86    /// interface index that datagram was received on
87    pub ifindex: u32,
88}
89
90impl Default for RecvMeta {
91    /// Constructs a value with arbitrary fields, intended to be overwritten
92    fn default() -> Self {
93        Self {
94            addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
95            len: 0,
96            stride: 0,
97            ecn: None,
98            dst_ip: None,
99            dst_local_ip: None,
100            ifindex: 0,
101        }
102    }
103}
104
105/// Log at most 1 IO error per minute
106const IO_ERROR_LOG_INTERVAL: u64 = 60;
107
108/// Logs a warning message when sendmsg fails
109///
110/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
111/// has elapsed since the last error was logged.
112fn log_sendmsg_error<B: AsPtr<u8>>(
113    last_send_error: LastSendError,
114    err: impl core::fmt::Debug,
115    transmit: &Transmit<B>,
116) {
117    if last_send_error.should_log() {
118        warn!(
119        "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
120            err, transmit.dst, transmit.src, transmit.ecn, transmit.contents.len(), transmit.segment_size);
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use std::net::Ipv4Addr;
127
128    use super::*;
129
130    #[test]
131    fn test_create() {
132        let s = sync::UdpSocket::bind("0.0.0.0:9909");
133        assert!(s.is_ok());
134    }
135    #[test]
136    fn test_send_recv() {
137        let saddr = "0.0.0.0:9901".parse().unwrap();
138        let a = sync::UdpSocket::bind(saddr).unwrap();
139        let b = sync::UdpSocket::bind("0.0.0.0:0").unwrap();
140        let buf = b"hello world";
141        b.send_to(&buf[..], saddr).unwrap();
142        // recv
143        let mut r = [0; 1024];
144        a.recv_from(&mut r).unwrap();
145        assert_eq!(buf[..], r[..11]);
146    }
147    #[test]
148    fn test_send_recv_msg() {
149        let saddr = "0.0.0.0:9902".parse().unwrap();
150        let a = sync::UdpSocket::bind(saddr).unwrap();
151        let b = sync::UdpSocket::bind("0.0.0.0:0").unwrap();
152        let send_port = b.local_addr().unwrap().port();
153        let send_addr = b.local_addr().unwrap().ip();
154        let buf = b"hello world";
155        let src = Source::Interface(1);
156        let tr = Transmit::new(saddr, *buf).src_ip(src);
157        b.send_msg(&UdpState::new(), tr).unwrap();
158        // recv
159        let mut r = [0; 1024];
160        let meta = a.recv_msg(&mut r).unwrap();
161        assert_eq!(buf[..], r[..11]);
162        // dst addr and b addr matches!
163        // meta.ifindex
164        assert_eq!(send_port, meta.addr.port());
165        assert_eq!(meta.ifindex, 1);
166        assert!(matches!(
167            meta.dst_local_ip,
168            // dst_local_ip might be 127.0.0.1
169            Some(addr) if addr == send_addr || addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
170        ));
171    }
172    #[test]
173    fn test_send_recv_msg_ip() {
174        let saddr = "0.0.0.0:9903".parse().unwrap();
175        let a = sync::UdpSocket::bind(saddr).unwrap();
176        let b = sync::UdpSocket::bind("0.0.0.0:0").unwrap();
177        let send_port = b.local_addr().unwrap().port();
178        let send_addr = b.local_addr().unwrap().ip();
179        let buf = b"hello world";
180        let src = Source::Ip("0.0.0.0".parse().unwrap());
181        let tr = Transmit::new(saddr, *buf).src_ip(src);
182        b.send_msg(&UdpState::new(), tr).unwrap();
183        // recv
184        let mut r = [0; 1024];
185        let meta = a.recv_msg(&mut r).unwrap();
186        assert_eq!(buf[..], r[..11]);
187        // dst addr and b addr matches!
188        // meta.ifindex
189        assert_eq!(send_port, meta.addr.port());
190        assert_eq!(meta.ifindex, 1);
191        assert!(matches!(
192            meta.dst_local_ip,
193            // dst_local_ip might be 127.0.0.1
194            Some(addr) if addr == send_addr || addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
195        ));
196    }
197}