async_transport/
lib.rs

1//! Uniform interface to send/recv UDP packets with ECN information.
2
3#![warn(rust_2018_idioms)]
4#![allow(dead_code)]
5//#![warn(missing_docs)]
6
7#[allow(unused_macros)]
8macro_rules! ready {
9    ($e:expr $(,)?) => {
10        match $e {
11            std::task::Poll::Ready(t) => t,
12            std::task::Poll::Pending => return std::task::Poll::Pending,
13        }
14    };
15}
16
17#[cfg(unix)]
18use std::os::unix::io::AsRawFd;
19#[cfg(windows)]
20use std::os::windows::io::AsRawSocket;
21use std::sync::atomic::AtomicU64;
22use std::{
23    net::{IpAddr, Ipv6Addr, SocketAddr},
24    sync::atomic::{AtomicUsize, Ordering},
25    time::{Duration, Instant},
26};
27
28use tracing::warn;
29
30#[cfg(unix)]
31mod cmsg;
32#[cfg(unix)]
33#[path = "unix.rs"]
34mod imp;
35
36#[cfg(windows)]
37#[path = "windows.rs"]
38mod imp;
39
40// No ECN support
41#[cfg(not(any(unix, windows)))]
42#[path = "fallback.rs"]
43mod imp;
44
45mod proto;
46mod runtime;
47
48pub use imp::UdpSocketState;
49pub use proto::{EcnCodepoint, Transmit};
50#[cfg(not(feature = "metal-io"))]
51pub use runtime::AsyncUdpSocket;
52pub use runtime::UdpSocket;
53
54/// Number of UDP packets to send/receive at a time
55pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
56
57/// The capabilities a UDP socket supports on a certain platform
58#[derive(Debug)]
59pub struct Capabilities {
60    max_gso_segments: AtomicUsize,
61    gro_segments: usize,
62}
63
64impl Capabilities {
65    pub fn new() -> Self {
66        imp::capabilities()
67    }
68
69    /// The maximum amount of segments which can be transmitted if a platform
70    /// supports Generic Send Offload (GSO).
71    ///
72    /// This is 1 if the platform doesn't support GSO. Subject to change if errors are detected
73    /// while using GSO.
74    #[inline]
75    pub fn max_gso_segments(&self) -> usize {
76        self.max_gso_segments.load(Ordering::Relaxed)
77    }
78
79    /// The number of segments to read when GRO is enabled. Used as a factor to
80    /// compute the receive buffer size.
81    ///
82    /// Returns 1 if the platform doesn't support GRO.
83    #[inline]
84    pub fn gro_segments(&self) -> usize {
85        self.gro_segments
86    }
87}
88
89impl Default for Capabilities {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95#[derive(Debug, Copy, Clone)]
96pub struct RecvMeta {
97    pub addr: SocketAddr,
98    pub len: usize,
99    pub stride: usize,
100    pub ecn: Option<EcnCodepoint>,
101    /// The destination IP address which was encoded in this datagram
102    pub dst_ip: Option<IpAddr>,
103}
104
105impl Default for RecvMeta {
106    /// Constructs a value with arbitrary fields, intended to be overwritten
107    fn default() -> Self {
108        Self {
109            addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
110            len: 0,
111            stride: 0,
112            ecn: None,
113            dst_ip: None,
114        }
115    }
116}
117
118/// Log at most 1 IO error per minute
119const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
120
121/// Logs a warning message when sendmsg fails
122///
123/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
124/// has elapsed since the last error was logged.
125fn log_sendmsg_error(
126    epoch: &Instant,
127    last_send_error: &AtomicU64,
128    err: impl core::fmt::Debug,
129    transmit: &Transmit,
130) {
131    let d = last_send_error.load(Ordering::Relaxed);
132    let last = epoch.checked_add(Duration::from_nanos(d)).unwrap();
133    let now = Instant::now();
134    let interval = now.saturating_duration_since(last);
135    if interval > IO_ERROR_LOG_INTERVAL {
136        last_send_error.store(interval.as_nanos() as u64, Ordering::Relaxed);
137        warn!(
138        "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
139            err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
140    }
141}
142
143/// A borrowed UDP socket
144///
145/// On Unix, constructible via `From<T: AsRawFd>`. On Windows, constructible via `From<T:
146/// AsRawSocket>`.
147// Wrapper around socket2 to avoid making it a public dependency and incurring stability risk
148pub struct UdpSockRef<'a>(socket2::SockRef<'a>);
149
150#[cfg(unix)]
151impl<'s, S> From<&'s S> for UdpSockRef<'s>
152where
153    S: AsRawFd + std::os::fd::AsFd,
154{
155    fn from(socket: &'s S) -> Self {
156        Self(socket.into())
157    }
158}
159
160#[cfg(windows)]
161impl<'s, S> From<&'s S> for UdpSockRef<'s>
162where
163    S: AsRawSocket + std::os::windows::io::AsSocket,
164{
165    fn from(socket: &'s S) -> Self {
166        Self(socket.into())
167    }
168}