1#![allow(rust_2024_compatibility)]
3#![warn(rust_2018_idioms)]
4#![allow(dead_code)]
5#[allow(unused_macros)]
8macro_rules! ready {
9 ($e:expr $(,)?) => {
10 match $e {
11 std::task::Poll::Ready(t) => t,
12 std::task::Poll::Pending => return std::task::Poll::Pending,
13 }
14 };
15}
16
17#[cfg(unix)]
18use std::os::unix::io::AsRawFd;
19#[cfg(windows)]
20use std::os::windows::io::AsRawSocket;
21use std::sync::atomic::AtomicU64;
22use std::{
23 net::{IpAddr, Ipv6Addr, SocketAddr},
24 sync::atomic::{AtomicUsize, Ordering},
25 time::{Duration, Instant},
26};
27
28use tracing::warn;
29
30#[cfg(unix)]
31mod cmsg;
32#[cfg(unix)]
33#[path = "unix.rs"]
34mod imp;
35
36#[cfg(windows)]
37#[path = "windows.rs"]
38mod imp;
39
40#[cfg(not(any(unix, windows)))]
42#[path = "fallback.rs"]
43mod imp;
44
45mod proto;
46mod runtime;
47
48pub use imp::UdpSocketState;
49pub use proto::{
50 EcnCodepoint, FiveTuple, FourTuple, TaggedBytesMut, TaggedString, Transmit, TransportContext,
51 TransportMessage, TransportProtocol,
52};
53pub use runtime::AsyncUdpSocket;
54pub use runtime::UdpSocket;
55
56pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
58
59#[derive(Debug)]
61pub struct Capabilities {
62 max_gso_segments: AtomicUsize,
63 gro_segments: usize,
64}
65
66impl Capabilities {
67 pub fn new() -> Self {
68 imp::capabilities()
69 }
70
71 #[inline]
77 pub fn max_gso_segments(&self) -> usize {
78 self.max_gso_segments.load(Ordering::Relaxed)
79 }
80
81 #[inline]
86 pub fn gro_segments(&self) -> usize {
87 self.gro_segments
88 }
89}
90
91impl Default for Capabilities {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97#[derive(Debug, Copy, Clone)]
98pub struct RecvMeta {
99 pub addr: SocketAddr,
100 pub len: usize,
101 pub stride: usize,
102 pub ecn: Option<EcnCodepoint>,
103 pub dst_ip: Option<IpAddr>,
105}
106
107impl Default for RecvMeta {
108 fn default() -> Self {
110 Self {
111 addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
112 len: 0,
113 stride: 0,
114 ecn: None,
115 dst_ip: None,
116 }
117 }
118}
119
120const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
122
123fn log_sendmsg_error(
128 epoch: &Instant,
129 last_send_error: &AtomicU64,
130 err: impl core::fmt::Debug,
131 transmit: &Transmit,
132) {
133 let d = last_send_error.load(Ordering::Relaxed);
134 let last = epoch.checked_add(Duration::from_nanos(d)).unwrap();
135 let now = Instant::now();
136 let interval = now.saturating_duration_since(last);
137 if interval > IO_ERROR_LOG_INTERVAL {
138 last_send_error.store(interval.as_nanos() as u64, Ordering::Relaxed);
139 warn!(
140 "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
141 err,
142 transmit.destination,
143 transmit.src_ip,
144 transmit.ecn,
145 transmit.contents.len(),
146 transmit.segment_size
147 );
148 }
149}
150
151pub struct UdpSockRef<'a>(socket2::SockRef<'a>);
157
158#[cfg(unix)]
159impl<'s, S> From<&'s S> for UdpSockRef<'s>
160where
161 S: AsRawFd + std::os::fd::AsFd,
162{
163 fn from(socket: &'s S) -> Self {
164 Self(socket.into())
165 }
166}
167
168#[cfg(windows)]
169impl<'s, S> From<&'s S> for UdpSockRef<'s>
170where
171 S: AsRawSocket + std::os::windows::io::AsSocket,
172{
173 fn from(socket: &'s S) -> Self {
174 Self(socket.into())
175 }
176}