message_io/adapters/
udp.rs

1use crate::network::adapter::{
2    Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
3    ListeningInfo, PendingStatus,
4};
5use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
6
7use mio::net::{UdpSocket};
8use mio::event::{Source};
9
10use socket2::{Socket, Domain, Type, Protocol};
11
12#[cfg(target_os = "linux")]
13use nix::errno::{Errno};
14#[cfg(target_os = "linux")]
15use nix::sys::socket::{self, sockopt, MsgFlags, SockaddrStorage, ControlMessageOwned};
16#[cfg(target_os = "linux")]
17use nix::ifaddrs::{getifaddrs};
18
19use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
20#[cfg(target_os = "linux")]
21use std::net::{IpAddr, Ipv6Addr};
22use std::io::{self, ErrorKind};
23use std::mem::{MaybeUninit};
24#[cfg(target_os = "linux")]
25use std::os::fd::AsRawFd;
26
27/// Maximun payload that UDP can send over the internet to be mostly compatible.
28pub const MAX_INTERNET_PAYLOAD_LEN: usize = 1500 - 20 - 8;
29// - 20: max IP header
30// - 8: max udp header
31
32/// Similar to [`MAX_INTERNET_PAYLOAD_LEN`] but for localhost instead of internet.
33/// Localhost can handle a bigger MTU.
34#[cfg(not(target_os = "macos"))]
35pub const MAX_LOCAL_PAYLOAD_LEN: usize = 65535 - 20 - 8;
36
37#[cfg(target_os = "macos")]
38pub const MAX_LOCAL_PAYLOAD_LEN: usize = 9216 - 20 - 8;
39
40#[derive(Clone, PartialEq, Eq, Hash, Debug)]
41pub struct UdpConnectConfig {
42    source_address: SocketAddr,
43    broadcast: bool,
44    reuse_address: bool,
45    reuse_port: bool,
46}
47
48impl UdpConnectConfig {
49    /// Specify the source address and port.
50    pub fn with_source_address(mut self, addr: SocketAddr) -> Self {
51        self.source_address = addr;
52        self
53    }
54
55    /// Enables the socket capabilities to send broadcast messages.
56    pub fn with_broadcast(mut self) -> Self {
57        self.broadcast = true;
58        self
59    }
60
61    /// Set value for the `SO_REUSEADDR` option on this socket. This indicates that futher calls to
62    /// `bind` may allow reuse of local addresses. For IPv4 sockets this means that a socket may
63    /// bind even when there’s a socket already listening on this port.
64    pub fn with_reuse_address(mut self) -> Self {
65        self.reuse_address = true;
66        self
67    }
68
69    /// Set value for the `SO_REUSEPORT` option on this socket. This indicates that further calls
70    /// to `bind` may allow reuse of local addresses. For IPv4 sockets this means that a socket may
71    /// bind even when there’s a socket already listening on this port. This option is always-on on
72    /// Windows and cannot be configured.
73    pub fn with_reuse_port(mut self) -> Self {
74        self.reuse_port = true;
75        self
76    }
77}
78
79impl Default for UdpConnectConfig {
80    fn default() -> Self {
81        Self {
82            source_address: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(),
83            broadcast: false,
84            reuse_address: false,
85            reuse_port: false,
86        }
87    }
88}
89
90#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
91pub struct UdpListenConfig {
92    send_broadcasts: bool,
93    receive_broadcasts: bool,
94    reuse_address: bool,
95    reuse_port: bool,
96}
97
98impl UdpListenConfig {
99    /// Enables the socket capabilities to send broadcast messages when the listening socket is
100    /// also used for sending with
101    /// [`Endpoint::from_listener`](crate::network::Endpoint::from_listener).
102    pub fn with_send_broadcasts(mut self) -> Self {
103        self.send_broadcasts = true;
104        self
105    }
106
107    /// On Windows, when listening on a specific IP address, the sockets also receives
108    /// corresponding subnet broadcasts and global broadcasts ([`std::net::Ipv4Addr::BROADCAST`])
109    /// received on the interface matching the IP.  When this option is set, message-io mimics this
110    /// behavior on Linux.
111    pub fn with_receive_broadcasts(mut self) -> Self {
112        self.receive_broadcasts = true;
113        self
114    }
115
116    /// Set value for the `SO_REUSEADDR` option on this socket. This indicates that futher calls to
117    /// `bind` may allow reuse of local addresses.
118    pub fn with_reuse_address(mut self) -> Self {
119        self.reuse_address = true;
120        self
121    }
122
123    /// Set value for the `SO_REUSEPORT` option on this socket. This indicates that further calls
124    /// to `bind` may allow reuse of local addresses. For IPv4 sockets this means that a socket may
125    /// bind even when there’s a socket already listening on this port. This option is always-on
126    /// on Windows and cannot be configured.
127    pub fn with_reuse_port(mut self) -> Self {
128        self.reuse_port = true;
129        self
130    }
131}
132
133pub(crate) struct UdpAdapter;
134impl Adapter for UdpAdapter {
135    type Remote = RemoteResource;
136    type Local = LocalResource;
137}
138
139pub(crate) struct RemoteResource {
140    socket: UdpSocket,
141}
142
143impl Resource for RemoteResource {
144    fn source(&mut self) -> &mut dyn Source {
145        &mut self.socket
146    }
147}
148
149impl Remote for RemoteResource {
150    fn connect_with(
151        config: TransportConnect,
152        remote_addr: RemoteAddr,
153    ) -> io::Result<ConnectionInfo<Self>> {
154        let config = match config {
155            TransportConnect::Udp(config) => config,
156            _ => panic!("Internal error: Got wrong config"),
157        };
158
159        let peer_addr = *remote_addr.socket_addr();
160
161        let socket = Socket::new(
162            match peer_addr {
163                SocketAddr::V4 { .. } => Domain::IPV4,
164                SocketAddr::V6 { .. } => Domain::IPV6,
165            },
166            Type::DGRAM,
167            Some(Protocol::UDP),
168        )?;
169        socket.set_nonblocking(true)?;
170
171        socket.set_reuse_address(config.reuse_address)?;
172        #[cfg(unix)]
173        socket.set_reuse_port(config.reuse_port)?;
174        socket.set_broadcast(config.broadcast)?;
175
176        socket.bind(&config.source_address.into())?;
177        socket.connect(&peer_addr.into())?;
178
179        let socket = UdpSocket::from_std(socket.into());
180        let local_addr = socket.local_addr()?;
181        Ok(ConnectionInfo { remote: RemoteResource { socket }, local_addr, peer_addr })
182    }
183
184    fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus {
185        let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
186        let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array
187
188        loop {
189            match self.socket.recv(&mut input_buffer) {
190                Ok(size) => process_data(&mut input_buffer[..size]),
191                Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
192                    break ReadStatus::WaitNextEvent
193                }
194                Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
195                    // Avoid ICMP generated error to be logged
196                    break ReadStatus::WaitNextEvent;
197                }
198                Err(err) => {
199                    log::error!("UDP receive error: {}", err);
200                    break ReadStatus::WaitNextEvent; // Should not happen
201                }
202            }
203        }
204    }
205
206    fn send(&self, data: &[u8]) -> SendStatus {
207        send_packet(data, |data| self.socket.send(data))
208    }
209
210    fn pending(&self, _readiness: Readiness) -> PendingStatus {
211        PendingStatus::Ready
212    }
213}
214
215pub(crate) struct LocalResource {
216    socket: UdpSocket,
217    #[cfg(target_os = "linux")]
218    ingress_addresses: Option<Vec<IpAddr>>,
219}
220
221impl Resource for LocalResource {
222    fn source(&mut self) -> &mut dyn Source {
223        &mut self.socket
224    }
225}
226
227#[cfg(target_os = "linux")]
228impl LocalResource {
229    fn accept_filtered(
230        &self,
231        ingress_addresses: &[IpAddr],
232        mut accept_remote: impl FnMut(AcceptedType<'_, RemoteResource>),
233    ) {
234        let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
235        let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array
236        let mut control_buffer = nix::cmsg_space!(libc::sockaddr_storage);
237
238        loop {
239            let mut iov = [io::IoSliceMut::new(&mut input_buffer)];
240            let result = socket::recvmsg::<SockaddrStorage>(
241                self.socket.as_raw_fd(),
242                &mut iov,
243                Some(&mut control_buffer),
244                MsgFlags::empty(),
245            );
246
247            match result {
248                Ok(msg) => {
249                    let size = msg.bytes;
250
251                    let ingress_ip = match msg.cmsgs().find_map(|cmsg| match cmsg {
252                        ControlMessageOwned::Ipv4PacketInfo(pktinfo) => {
253                            Some(Ipv4Addr::from(pktinfo.ipi_addr.s_addr.to_be()).into())
254                        }
255                        ControlMessageOwned::Ipv6PacketInfo(pktinfo) => {
256                            Some(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr).into())
257                        }
258                        _ => None,
259                    }) {
260                        Some(ingress_ip) => ingress_ip,
261                        None => continue,
262                    };
263
264                    if !ingress_addresses.contains(&ingress_ip) {
265                        continue;
266                    }
267
268                    fn convert_sockaddr(addr: SockaddrStorage) -> Option<SocketAddr> {
269                        if let Some(addr) = addr.as_sockaddr_in() {
270                            return Some(SocketAddr::V4((*addr).into()));
271                        }
272                        if let Some(addr) = addr.as_sockaddr_in6() {
273                            return Some(SocketAddr::V6((*addr).into()));
274                        }
275                        None
276                    }
277
278                    let addr = match msg.address.and_then(convert_sockaddr) {
279                        Some(addr) => addr,
280                        None => continue,
281                    };
282
283                    let data = &mut input_buffer[..size];
284                    accept_remote(AcceptedType::Data(addr, data))
285                }
286                Err(Errno::EWOULDBLOCK) => break,
287                Err(err) => break log::error!("UDP accept error: {}", err), // Should never happen
288            }
289        }
290    }
291}
292
293impl Local for LocalResource {
294    type Remote = RemoteResource;
295
296    fn listen_with(
297        config: TransportListen,
298        #[cfg(not(target_os = "linux"))] addr: SocketAddr,
299        #[cfg(target_os = "linux")] mut addr: SocketAddr,
300    ) -> io::Result<ListeningInfo<Self>> {
301        let config = match config {
302            TransportListen::Udp(config) => config,
303            _ => panic!("Internal error: Got wrong config"),
304        };
305
306        let multicast = match addr {
307            SocketAddr::V4(addr) if addr.ip().is_multicast() => Some(addr),
308            _ => None,
309        };
310
311        let socket = Socket::new(
312            match addr {
313                SocketAddr::V4 { .. } => Domain::IPV4,
314                SocketAddr::V6 { .. } => Domain::IPV6,
315            },
316            Type::DGRAM,
317            Some(Protocol::UDP),
318        )?;
319        socket.set_nonblocking(true)?;
320
321        if config.reuse_address || multicast.is_some() {
322            socket.set_reuse_address(true)?;
323        }
324        #[cfg(unix)]
325        if config.reuse_port || multicast.is_some() {
326            socket.set_reuse_port(true)?;
327        }
328        socket.set_broadcast(config.send_broadcasts)?;
329
330        #[cfg(target_os = "linux")]
331        let ingress_addresses = if config.receive_broadcasts {
332            // enable the socket packet info option
333            match addr {
334                SocketAddr::V4 { .. } => {
335                    socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv4PacketInfo, &true)?
336                }
337                SocketAddr::V6 { .. } => {
338                    socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv6RecvPacketInfo, &true)?
339                }
340            }
341
342            // find ifaddr matching the listen addr
343            let ifaddr = getifaddrs()?.find_map(|ifaddr| {
344                ifaddr.address.and_then(|ss| {
345                    match (
346                        ss.as_sockaddr_in().map(|si| Ipv4Addr::from(si.ip())),
347                        ss.as_sockaddr_in6().map(|si| si.ip()),
348                    ) {
349                        (Some(ip4), _) if IpAddr::V4(ip4) == addr.ip() => Some(ifaddr),
350                        (_, Some(ip6)) if IpAddr::V6(ip6) == addr.ip() => Some(ifaddr),
351                        _ => None,
352                    }
353                })
354            });
355
356            match ifaddr {
357                None => return Err(ErrorKind::AddrNotAvailable.into()),
358                Some(ifaddr) => {
359                    // Get allowed ingress IP addresses
360                    let mut ingress_addresses = vec![addr.ip()];
361
362                    // Some interfaces like VPN adapters don't have broadcast support.
363                    if let Some(broadcast_ss) = ifaddr.broadcast {
364                        if let Some(si) = broadcast_ss.as_sockaddr_in() {
365                            ingress_addresses.push(Ipv4Addr::from(si.ip()).into());
366                            ingress_addresses.push(Ipv4Addr::BROADCAST.into());
367                        }
368                        if let Some(si) = broadcast_ss.as_sockaddr_in6() {
369                            ingress_addresses.push(si.ip().into());
370                        }
371                    }
372
373                    // Bind the socket to the specific interface
374                    socket::setsockopt(
375                        socket.as_raw_fd(),
376                        sockopt::BindToDevice,
377                        &ifaddr.interface_name.into(),
378                    )?;
379
380                    // Listen on UNSPECIFIED
381                    addr.set_ip(match addr {
382                        SocketAddr::V4 { .. } => Ipv4Addr::UNSPECIFIED.into(),
383                        SocketAddr::V6 { .. } => Ipv6Addr::UNSPECIFIED.into(),
384                    });
385
386                    Some(ingress_addresses)
387                }
388            }
389        }
390        else {
391            None
392        };
393
394        if let Some(multicast) = multicast {
395            socket.join_multicast_v4(multicast.ip(), &Ipv4Addr::UNSPECIFIED)?;
396            socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port()).into())?;
397        }
398        else {
399            socket.bind(&addr.into())?;
400        }
401
402        let socket = UdpSocket::from_std(socket.into());
403        let local_addr = socket.local_addr().unwrap();
404        Ok(ListeningInfo {
405            local: {
406                LocalResource {
407                    socket,
408                    #[cfg(target_os = "linux")]
409                    ingress_addresses,
410                }
411            },
412            local_addr,
413        })
414    }
415
416    fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
417        #[cfg(target_os = "linux")]
418        if let Some(ingress_addresses) = &self.ingress_addresses {
419            self.accept_filtered(ingress_addresses, accept_remote);
420            return;
421        }
422
423        let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
424        let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array
425
426        loop {
427            match self.socket.recv_from(&mut input_buffer) {
428                Ok((size, addr)) => {
429                    let data = &mut input_buffer[..size];
430                    accept_remote(AcceptedType::Data(addr, data))
431                }
432                Err(ref err) if err.kind() == ErrorKind::WouldBlock => break,
433                Err(err) => break log::error!("UDP accept error: {}", err), // Should never happen
434            };
435        }
436    }
437
438    fn send_to(&self, addr: SocketAddr, data: &[u8]) -> SendStatus {
439        send_packet(data, |data| self.socket.send_to(data, addr))
440    }
441}
442
443impl Drop for LocalResource {
444    fn drop(&mut self) {
445        if let SocketAddr::V4(addr) = self.socket.local_addr().unwrap() {
446            if addr.ip().is_multicast() {
447                self.socket.leave_multicast_v4(addr.ip(), &Ipv4Addr::UNSPECIFIED).unwrap();
448            }
449        }
450    }
451}
452
453fn send_packet(data: &[u8], send_method: impl Fn(&[u8]) -> io::Result<usize>) -> SendStatus {
454    loop {
455        match send_method(data) {
456            Ok(_) => break SendStatus::Sent,
457            // Avoid ICMP generated error to be logged
458            Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
459                break SendStatus::ResourceNotFound
460            }
461            Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
462            Err(ref err) if err.kind() == ErrorKind::Other => {
463                break SendStatus::MaxPacketSizeExceeded
464            }
465            Err(err) => {
466                log::error!("UDP send error: {}", err);
467                break SendStatus::ResourceNotFound; // should not happen
468            }
469        }
470    }
471}