1#![warn(rust_2018_idioms)]
4#![allow(dead_code)]
5#[allow(unused_macros)]
8macro_rules! ready {
9 ($e:expr $(,)?) => {
10 match $e {
11 std::task::Poll::Ready(t) => t,
12 std::task::Poll::Pending => return std::task::Poll::Pending,
13 }
14 };
15}
16
17#[cfg(unix)]
18use std::os::unix::io::AsRawFd;
19#[cfg(windows)]
20use std::os::windows::io::AsRawSocket;
21use std::sync::atomic::AtomicU64;
22use std::{
23 net::{IpAddr, Ipv6Addr, SocketAddr},
24 sync::atomic::{AtomicUsize, Ordering},
25 time::{Duration, Instant},
26};
27
28use tracing::warn;
29
30#[cfg(unix)]
31mod cmsg;
32#[cfg(unix)]
33#[path = "unix.rs"]
34mod imp;
35
36#[cfg(windows)]
37#[path = "windows.rs"]
38mod imp;
39
40#[cfg(not(any(unix, windows)))]
42#[path = "fallback.rs"]
43mod imp;
44
45mod proto;
46mod runtime;
47
48pub use imp::UdpSocketState;
49pub use proto::{EcnCodepoint, Transmit};
50#[cfg(not(feature = "metal-io"))]
51pub use runtime::AsyncUdpSocket;
52pub use runtime::UdpSocket;
53
54pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
56
57#[derive(Debug)]
59pub struct Capabilities {
60 max_gso_segments: AtomicUsize,
61 gro_segments: usize,
62}
63
64impl Capabilities {
65 pub fn new() -> Self {
66 imp::capabilities()
67 }
68
69 #[inline]
75 pub fn max_gso_segments(&self) -> usize {
76 self.max_gso_segments.load(Ordering::Relaxed)
77 }
78
79 #[inline]
84 pub fn gro_segments(&self) -> usize {
85 self.gro_segments
86 }
87}
88
89impl Default for Capabilities {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95#[derive(Debug, Copy, Clone)]
96pub struct RecvMeta {
97 pub addr: SocketAddr,
98 pub len: usize,
99 pub stride: usize,
100 pub ecn: Option<EcnCodepoint>,
101 pub dst_ip: Option<IpAddr>,
103}
104
105impl Default for RecvMeta {
106 fn default() -> Self {
108 Self {
109 addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
110 len: 0,
111 stride: 0,
112 ecn: None,
113 dst_ip: None,
114 }
115 }
116}
117
118const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
120
121fn log_sendmsg_error(
126 epoch: &Instant,
127 last_send_error: &AtomicU64,
128 err: impl core::fmt::Debug,
129 transmit: &Transmit,
130) {
131 let d = last_send_error.load(Ordering::Relaxed);
132 let last = epoch.checked_add(Duration::from_nanos(d)).unwrap();
133 let now = Instant::now();
134 let interval = now.saturating_duration_since(last);
135 if interval > IO_ERROR_LOG_INTERVAL {
136 last_send_error.store(interval.as_nanos() as u64, Ordering::Relaxed);
137 warn!(
138 "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
139 err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
140 }
141}
142
143pub struct UdpSockRef<'a>(socket2::SockRef<'a>);
149
150#[cfg(unix)]
151impl<'s, S> From<&'s S> for UdpSockRef<'s>
152where
153 S: AsRawFd + std::os::fd::AsFd,
154{
155 fn from(socket: &'s S) -> Self {
156 Self(socket.into())
157 }
158}
159
160#[cfg(windows)]
161impl<'s, S> From<&'s S> for UdpSockRef<'s>
162where
163 S: AsRawSocket + std::os::windows::io::AsSocket,
164{
165 fn from(socket: &'s S) -> Self {
166 Self(socket.into())
167 }
168}