squinn_udp/
lib.rs

1//! Uniform interface to send/recv UDP packets with ECN information.
2#![warn(unreachable_pub)]
3#![warn(clippy::use_self)]
4
5#[cfg(unix)]
6use std::os::unix::io::AsFd;
7#[cfg(windows)]
8use std::os::windows::io::AsSocket;
9use std::{   
10    sync::Mutex,
11    time::{Duration, Instant},
12};
13
14use scionnet::{IpAddr, Ipv6Addr, SocketAddr};
15
16use bytes::Bytes;
17use tracing::warn;
18
19#[cfg(unix)]
20mod cmsg;
21#[cfg(unix)]
22#[path = "unix.rs"]
23mod imp;
24
25#[cfg(windows)]
26#[path = "windows.rs"]
27mod imp;
28
29// No ECN support
30#[cfg(not(any(unix, windows)))]
31#[path = "fallback.rs"]
32mod imp;
33
34pub use imp::UdpSocketState;
35
36/// Number of UDP packets to send/receive at a time
37pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
38
39/// Metadata for a single buffer filled with bytes received from the network
40///
41/// This associated buffer can contain one or more datagrams, see [`stride`].
42///
43/// [`stride`]: RecvMeta::stride
44#[derive(Debug, Copy, Clone)]
45pub struct RecvMeta {
46    /// The source address of the datagram(s) contained in the buffer
47    pub addr: SocketAddr,
48    /// The number of bytes the associated buffer has
49    pub len: usize,
50    /// The size of a single datagram in the associated buffer
51    ///
52    /// When GRO (Generic Receive Offload) is used this indicates the size of a single
53    /// datagram inside the buffer. If the buffer is larger, that is if [`len`] is greater
54    /// then this value, then the individual datagrams contained have their boundaries at
55    /// `stride` increments from the start. The last datagram could be smaller than
56    /// `stride`.
57    ///
58    /// [`len`]: RecvMeta::len
59    pub stride: usize,
60    /// The Explicit Congestion Notification bits for the datagram(s) in the buffer
61    pub ecn: Option<EcnCodepoint>,
62    /// The destination IP address which was encoded in this datagram
63    pub dst_ip: Option<IpAddr>,
64}
65
66impl Default for RecvMeta {
67    /// Constructs a value with arbitrary fields, intended to be overwritten
68    fn default() -> Self {
69        Self {
70            addr: SocketAddr::new_ip(Ipv6Addr::UNSPECIFIED.into(), 0),
71            len: 0,
72            stride: 0,
73            ecn: None,
74            dst_ip: None,
75        }
76    }
77}
78
79/// An outgoing packet
80#[derive(Debug, Clone)]
81pub struct Transmit {
82    /// The socket this datagram should be sent to
83    pub destination: SocketAddr,
84    /// Explicit congestion notification bits to set on the packet
85    pub ecn: Option<EcnCodepoint>,
86    /// Contents of the datagram
87    pub contents: Bytes,
88    /// The segment size if this transmission contains multiple datagrams.
89    /// This is `None` if the transmit only contains a single datagram
90    pub segment_size: Option<usize>,
91    /// Optional source IP address for the datagram
92    pub src_ip: Option<IpAddr>,
93}
94
95/// Log at most 1 IO error per minute
96const IO_ERROR_LOG_INTERVAL: Duration = std::time::Duration::from_secs(60);
97
98/// Logs a warning message when sendmsg fails
99///
100/// Logging will only be performed if at least [`IO_ERROR_LOG_INTERVAL`]
101/// has elapsed since the last error was logged.
102fn log_sendmsg_error(
103    last_send_error: &Mutex<Instant>,
104    err: impl core::fmt::Debug,
105    transmit: &Transmit,
106) {
107    let now = Instant::now();
108    let last_send_error = &mut *last_send_error.lock().expect("poisend lock");
109    if now.saturating_duration_since(*last_send_error) > IO_ERROR_LOG_INTERVAL {
110        *last_send_error = now;
111        warn!(
112        "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
113            err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
114    }
115}
116
117/// A borrowed UDP socket
118///
119/// On Unix, constructible via `From<T: AsRawFd>`. On Windows, constructible via `From<T:
120/// AsRawSocket>`.
121// Wrapper around socket2 to avoid making it a public dependency and incurring stability risk
122pub struct UdpSockRef<'a>(socket2::SockRef<'a>);
123
124#[cfg(unix)]
125impl<'s, S> From<&'s S> for UdpSockRef<'s>
126where
127    S: AsFd,
128{
129    fn from(socket: &'s S) -> Self {
130        Self(socket.into())
131    }
132}
133
134#[cfg(windows)]
135impl<'s, S> From<&'s S> for UdpSockRef<'s>
136where
137    S: AsSocket,
138{
139    fn from(socket: &'s S) -> Self {
140        Self(socket.into())
141    }
142}
143
144/// Explicit congestion notification codepoint
145#[repr(u8)]
146#[derive(Debug, Copy, Clone, Eq, PartialEq)]
147pub enum EcnCodepoint {
148    #[doc(hidden)]
149    Ect0 = 0b10,
150    #[doc(hidden)]
151    Ect1 = 0b01,
152    #[doc(hidden)]
153    Ce = 0b11,
154}
155
156impl EcnCodepoint {
157    /// Create new object from the given bits
158    pub fn from_bits(x: u8) -> Option<Self> {
159        use self::EcnCodepoint::*;
160        Some(match x & 0b11 {
161            0b10 => Ect0,
162            0b01 => Ect1,
163            0b11 => Ce,
164            _ => {
165                return None;
166            }
167        })
168    }
169}