1use crate::network::adapter::{
2 Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
3 ListeningInfo, PendingStatus,
4};
5use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
6
7use mio::net::{UdpSocket};
8use mio::event::{Source};
9
10use socket2::{Socket, Domain, Type, Protocol};
11
12#[cfg(target_os = "linux")]
13use nix::errno::{Errno};
14#[cfg(target_os = "linux")]
15use nix::sys::socket::{self, sockopt, MsgFlags, SockaddrStorage, ControlMessageOwned};
16#[cfg(target_os = "linux")]
17use nix::ifaddrs::{getifaddrs};
18
19use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
20#[cfg(target_os = "linux")]
21use std::net::{IpAddr, Ipv6Addr};
22use std::io::{self, ErrorKind};
23use std::mem::{MaybeUninit};
24#[cfg(target_os = "linux")]
25use std::os::fd::AsRawFd;
26
27pub const MAX_INTERNET_PAYLOAD_LEN: usize = 1500 - 20 - 8;
29#[cfg(not(target_os = "macos"))]
35pub const MAX_LOCAL_PAYLOAD_LEN: usize = 65535 - 20 - 8;
36
37#[cfg(target_os = "macos")]
38pub const MAX_LOCAL_PAYLOAD_LEN: usize = 9216 - 20 - 8;
39
40#[derive(Clone, PartialEq, Eq, Hash, Debug)]
41pub struct UdpConnectConfig {
42 source_address: SocketAddr,
43 broadcast: bool,
44 reuse_address: bool,
45 reuse_port: bool,
46}
47
48impl UdpConnectConfig {
49 pub fn with_source_address(mut self, addr: SocketAddr) -> Self {
51 self.source_address = addr;
52 self
53 }
54
55 pub fn with_broadcast(mut self) -> Self {
57 self.broadcast = true;
58 self
59 }
60
61 pub fn with_reuse_address(mut self) -> Self {
65 self.reuse_address = true;
66 self
67 }
68
69 pub fn with_reuse_port(mut self) -> Self {
74 self.reuse_port = true;
75 self
76 }
77}
78
79impl Default for UdpConnectConfig {
80 fn default() -> Self {
81 Self {
82 source_address: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(),
83 broadcast: false,
84 reuse_address: false,
85 reuse_port: false,
86 }
87 }
88}
89
90#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
91pub struct UdpListenConfig {
92 send_broadcasts: bool,
93 receive_broadcasts: bool,
94 reuse_address: bool,
95 reuse_port: bool,
96}
97
98impl UdpListenConfig {
99 pub fn with_send_broadcasts(mut self) -> Self {
103 self.send_broadcasts = true;
104 self
105 }
106
107 pub fn with_receive_broadcasts(mut self) -> Self {
112 self.receive_broadcasts = true;
113 self
114 }
115
116 pub fn with_reuse_address(mut self) -> Self {
119 self.reuse_address = true;
120 self
121 }
122
123 pub fn with_reuse_port(mut self) -> Self {
128 self.reuse_port = true;
129 self
130 }
131}
132
133pub(crate) struct UdpAdapter;
134impl Adapter for UdpAdapter {
135 type Remote = RemoteResource;
136 type Local = LocalResource;
137}
138
139pub(crate) struct RemoteResource {
140 socket: UdpSocket,
141}
142
143impl Resource for RemoteResource {
144 fn source(&mut self) -> &mut dyn Source {
145 &mut self.socket
146 }
147}
148
149impl Remote for RemoteResource {
150 fn connect_with(
151 config: TransportConnect,
152 remote_addr: RemoteAddr,
153 ) -> io::Result<ConnectionInfo<Self>> {
154 let config = match config {
155 TransportConnect::Udp(config) => config,
156 _ => panic!("Internal error: Got wrong config"),
157 };
158
159 let peer_addr = *remote_addr.socket_addr();
160
161 let socket = Socket::new(
162 match peer_addr {
163 SocketAddr::V4 { .. } => Domain::IPV4,
164 SocketAddr::V6 { .. } => Domain::IPV6,
165 },
166 Type::DGRAM,
167 Some(Protocol::UDP),
168 )?;
169 socket.set_nonblocking(true)?;
170
171 socket.set_reuse_address(config.reuse_address)?;
172 #[cfg(unix)]
173 socket.set_reuse_port(config.reuse_port)?;
174 socket.set_broadcast(config.broadcast)?;
175
176 socket.bind(&config.source_address.into())?;
177 socket.connect(&peer_addr.into())?;
178
179 let socket = UdpSocket::from_std(socket.into());
180 let local_addr = socket.local_addr()?;
181 Ok(ConnectionInfo { remote: RemoteResource { socket }, local_addr, peer_addr })
182 }
183
184 fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus {
185 let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
186 let mut input_buffer = unsafe { buffer.assume_init() }; loop {
189 match self.socket.recv(&mut input_buffer) {
190 Ok(size) => process_data(&mut input_buffer[..size]),
191 Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
192 break ReadStatus::WaitNextEvent
193 }
194 Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
195 break ReadStatus::WaitNextEvent;
197 }
198 Err(err) => {
199 log::error!("UDP receive error: {}", err);
200 break ReadStatus::WaitNextEvent; }
202 }
203 }
204 }
205
206 fn send(&self, data: &[u8]) -> SendStatus {
207 send_packet(data, |data| self.socket.send(data))
208 }
209
210 fn pending(&self, _readiness: Readiness) -> PendingStatus {
211 PendingStatus::Ready
212 }
213}
214
215pub(crate) struct LocalResource {
216 socket: UdpSocket,
217 #[cfg(target_os = "linux")]
218 ingress_addresses: Option<Vec<IpAddr>>,
219}
220
221impl Resource for LocalResource {
222 fn source(&mut self) -> &mut dyn Source {
223 &mut self.socket
224 }
225}
226
227#[cfg(target_os = "linux")]
228impl LocalResource {
229 fn accept_filtered(
230 &self,
231 ingress_addresses: &[IpAddr],
232 mut accept_remote: impl FnMut(AcceptedType<'_, RemoteResource>),
233 ) {
234 let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
235 let mut input_buffer = unsafe { buffer.assume_init() }; let mut control_buffer = nix::cmsg_space!(libc::sockaddr_storage);
237
238 loop {
239 let mut iov = [io::IoSliceMut::new(&mut input_buffer)];
240 let result = socket::recvmsg::<SockaddrStorage>(
241 self.socket.as_raw_fd(),
242 &mut iov,
243 Some(&mut control_buffer),
244 MsgFlags::empty(),
245 );
246
247 match result {
248 Ok(msg) => {
249 let size = msg.bytes;
250
251 let ingress_ip = match msg.cmsgs().find_map(|cmsg| match cmsg {
252 ControlMessageOwned::Ipv4PacketInfo(pktinfo) => {
253 Some(Ipv4Addr::from(pktinfo.ipi_addr.s_addr.to_be()).into())
254 }
255 ControlMessageOwned::Ipv6PacketInfo(pktinfo) => {
256 Some(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr).into())
257 }
258 _ => None,
259 }) {
260 Some(ingress_ip) => ingress_ip,
261 None => continue,
262 };
263
264 if !ingress_addresses.contains(&ingress_ip) {
265 continue;
266 }
267
268 fn convert_sockaddr(addr: SockaddrStorage) -> Option<SocketAddr> {
269 if let Some(addr) = addr.as_sockaddr_in() {
270 return Some(SocketAddr::V4((*addr).into()));
271 }
272 if let Some(addr) = addr.as_sockaddr_in6() {
273 return Some(SocketAddr::V6((*addr).into()));
274 }
275 None
276 }
277
278 let addr = match msg.address.and_then(convert_sockaddr) {
279 Some(addr) => addr,
280 None => continue,
281 };
282
283 let data = &mut input_buffer[..size];
284 accept_remote(AcceptedType::Data(addr, data))
285 }
286 Err(Errno::EWOULDBLOCK) => break,
287 Err(err) => break log::error!("UDP accept error: {}", err), }
289 }
290 }
291}
292
293impl Local for LocalResource {
294 type Remote = RemoteResource;
295
296 fn listen_with(
297 config: TransportListen,
298 #[cfg(not(target_os = "linux"))] addr: SocketAddr,
299 #[cfg(target_os = "linux")] mut addr: SocketAddr,
300 ) -> io::Result<ListeningInfo<Self>> {
301 let config = match config {
302 TransportListen::Udp(config) => config,
303 _ => panic!("Internal error: Got wrong config"),
304 };
305
306 let multicast = match addr {
307 SocketAddr::V4(addr) if addr.ip().is_multicast() => Some(addr),
308 _ => None,
309 };
310
311 let socket = Socket::new(
312 match addr {
313 SocketAddr::V4 { .. } => Domain::IPV4,
314 SocketAddr::V6 { .. } => Domain::IPV6,
315 },
316 Type::DGRAM,
317 Some(Protocol::UDP),
318 )?;
319 socket.set_nonblocking(true)?;
320
321 if config.reuse_address || multicast.is_some() {
322 socket.set_reuse_address(true)?;
323 }
324 #[cfg(unix)]
325 if config.reuse_port || multicast.is_some() {
326 socket.set_reuse_port(true)?;
327 }
328 socket.set_broadcast(config.send_broadcasts)?;
329
330 #[cfg(target_os = "linux")]
331 let ingress_addresses = if config.receive_broadcasts {
332 match addr {
334 SocketAddr::V4 { .. } => {
335 socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv4PacketInfo, &true)?
336 }
337 SocketAddr::V6 { .. } => {
338 socket::setsockopt(socket.as_raw_fd(), sockopt::Ipv6RecvPacketInfo, &true)?
339 }
340 }
341
342 let ifaddr = getifaddrs()?.find_map(|ifaddr| {
344 ifaddr.address.and_then(|ss| {
345 match (
346 ss.as_sockaddr_in().map(|si| Ipv4Addr::from(si.ip())),
347 ss.as_sockaddr_in6().map(|si| si.ip()),
348 ) {
349 (Some(ip4), _) if IpAddr::V4(ip4) == addr.ip() => Some(ifaddr),
350 (_, Some(ip6)) if IpAddr::V6(ip6) == addr.ip() => Some(ifaddr),
351 _ => None,
352 }
353 })
354 });
355
356 match ifaddr {
357 None => return Err(ErrorKind::AddrNotAvailable.into()),
358 Some(ifaddr) => {
359 let mut ingress_addresses = vec![addr.ip()];
361
362 if let Some(broadcast_ss) = ifaddr.broadcast {
364 if let Some(si) = broadcast_ss.as_sockaddr_in() {
365 ingress_addresses.push(Ipv4Addr::from(si.ip()).into());
366 ingress_addresses.push(Ipv4Addr::BROADCAST.into());
367 }
368 if let Some(si) = broadcast_ss.as_sockaddr_in6() {
369 ingress_addresses.push(si.ip().into());
370 }
371 }
372
373 socket::setsockopt(
375 socket.as_raw_fd(),
376 sockopt::BindToDevice,
377 &ifaddr.interface_name.into(),
378 )?;
379
380 addr.set_ip(match addr {
382 SocketAddr::V4 { .. } => Ipv4Addr::UNSPECIFIED.into(),
383 SocketAddr::V6 { .. } => Ipv6Addr::UNSPECIFIED.into(),
384 });
385
386 Some(ingress_addresses)
387 }
388 }
389 }
390 else {
391 None
392 };
393
394 if let Some(multicast) = multicast {
395 socket.join_multicast_v4(multicast.ip(), &Ipv4Addr::UNSPECIFIED)?;
396 socket.bind(&SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port()).into())?;
397 }
398 else {
399 socket.bind(&addr.into())?;
400 }
401
402 let socket = UdpSocket::from_std(socket.into());
403 let local_addr = socket.local_addr().unwrap();
404 Ok(ListeningInfo {
405 local: {
406 LocalResource {
407 socket,
408 #[cfg(target_os = "linux")]
409 ingress_addresses,
410 }
411 },
412 local_addr,
413 })
414 }
415
416 fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
417 #[cfg(target_os = "linux")]
418 if let Some(ingress_addresses) = &self.ingress_addresses {
419 self.accept_filtered(ingress_addresses, accept_remote);
420 return;
421 }
422
423 let buffer: MaybeUninit<[u8; MAX_LOCAL_PAYLOAD_LEN]> = MaybeUninit::uninit();
424 let mut input_buffer = unsafe { buffer.assume_init() }; loop {
427 match self.socket.recv_from(&mut input_buffer) {
428 Ok((size, addr)) => {
429 let data = &mut input_buffer[..size];
430 accept_remote(AcceptedType::Data(addr, data))
431 }
432 Err(ref err) if err.kind() == ErrorKind::WouldBlock => break,
433 Err(err) => break log::error!("UDP accept error: {}", err), };
435 }
436 }
437
438 fn send_to(&self, addr: SocketAddr, data: &[u8]) -> SendStatus {
439 send_packet(data, |data| self.socket.send_to(data, addr))
440 }
441}
442
443impl Drop for LocalResource {
444 fn drop(&mut self) {
445 if let SocketAddr::V4(addr) = self.socket.local_addr().unwrap() {
446 if addr.ip().is_multicast() {
447 self.socket.leave_multicast_v4(addr.ip(), &Ipv4Addr::UNSPECIFIED).unwrap();
448 }
449 }
450 }
451}
452
453fn send_packet(data: &[u8], send_method: impl Fn(&[u8]) -> io::Result<usize>) -> SendStatus {
454 loop {
455 match send_method(data) {
456 Ok(_) => break SendStatus::Sent,
457 Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
459 break SendStatus::ResourceNotFound
460 }
461 Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
462 Err(ref err) if err.kind() == ErrorKind::Other => {
463 break SendStatus::MaxPacketSizeExceeded
464 }
465 Err(err) => {
466 log::error!("UDP send error: {}", err);
467 break SendStatus::ResourceNotFound; }
469 }
470 }
471}