1use std::{
3 net::{IpAddr, Ipv6Addr, SocketAddr},
4 sync::atomic::{AtomicUsize, Ordering},
5};
6
7pub use crate::cmsg::{AsPtr, EcnCodepoint, Source, Transmit};
8use imp::LastSendError;
9use tracing::warn;
10
11mod cmsg;
12
13#[path = "unix.rs"]
14mod imp;
15
16pub use imp::{sync, UdpSocket};
17pub mod framed;
18
19pub const BATCH_SIZE_CAP: usize = imp::BATCH_SIZE_CAP;
27
28pub const DEFAULT_BATCH_SIZE: usize = imp::DEFAULT_BATCH_SIZE;
30
31#[derive(Debug)]
33pub struct UdpState {
34 max_gso_segments: AtomicUsize,
35 gro_segments: usize,
36}
37
38impl UdpState {
39 pub fn new() -> Self {
40 imp::udp_state()
41 }
42
43 #[inline]
49 pub fn max_gso_segments(&self) -> usize {
50 self.max_gso_segments.load(Ordering::Relaxed)
51 }
52
53 #[inline]
58 pub fn gro_segments(&self) -> usize {
59 self.gro_segments
60 }
61}
62
63impl Default for UdpState {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[derive(Debug, Copy, Clone)]
73pub struct RecvMeta {
74 pub addr: SocketAddr,
76 pub len: usize,
78 pub stride: usize,
80 pub ecn: Option<EcnCodepoint>,
82 pub dst_ip: Option<IpAddr>,
84 pub dst_local_ip: Option<IpAddr>,
86 pub ifindex: u32,
88}
89
90impl Default for RecvMeta {
91 fn default() -> Self {
93 Self {
94 addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
95 len: 0,
96 stride: 0,
97 ecn: None,
98 dst_ip: None,
99 dst_local_ip: None,
100 ifindex: 0,
101 }
102 }
103}
104
105const IO_ERROR_LOG_INTERVAL: u64 = 60;
107
108fn log_sendmsg_error<B: AsPtr<u8>>(
113 last_send_error: LastSendError,
114 err: impl core::fmt::Debug,
115 transmit: &Transmit<B>,
116) {
117 if last_send_error.should_log() {
118 warn!(
119 "sendmsg error: {:?}, Transmit: {{ destination: {:?}, src_ip: {:?}, enc: {:?}, len: {:?}, segment_size: {:?} }}",
120 err, transmit.dst, transmit.src, transmit.ecn, transmit.contents.len(), transmit.segment_size);
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use std::net::Ipv4Addr;
127
128 use super::*;
129
130 #[test]
131 fn test_create() {
132 let s = sync::UdpSocket::bind("0.0.0.0:9909");
133 assert!(s.is_ok());
134 }
135 #[test]
136 fn test_send_recv() {
137 let saddr = "0.0.0.0:9901".parse().unwrap();
138 let a = sync::UdpSocket::bind(saddr).unwrap();
139 let b = sync::UdpSocket::bind("0.0.0.0:0").unwrap();
140 let buf = b"hello world";
141 b.send_to(&buf[..], saddr).unwrap();
142 let mut r = [0; 1024];
144 a.recv_from(&mut r).unwrap();
145 assert_eq!(buf[..], r[..11]);
146 }
147 #[test]
148 fn test_send_recv_msg() {
149 let saddr = "0.0.0.0:9902".parse().unwrap();
150 let a = sync::UdpSocket::bind(saddr).unwrap();
151 let b = sync::UdpSocket::bind("0.0.0.0:0").unwrap();
152 let send_port = b.local_addr().unwrap().port();
153 let send_addr = b.local_addr().unwrap().ip();
154 let buf = b"hello world";
155 let src = Source::Interface(1);
156 let tr = Transmit::new(saddr, *buf).src_ip(src);
157 b.send_msg(&UdpState::new(), tr).unwrap();
158 let mut r = [0; 1024];
160 let meta = a.recv_msg(&mut r).unwrap();
161 assert_eq!(buf[..], r[..11]);
162 assert_eq!(send_port, meta.addr.port());
165 assert_eq!(meta.ifindex, 1);
166 assert!(matches!(
167 meta.dst_local_ip,
168 Some(addr) if addr == send_addr || addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
170 ));
171 }
172 #[test]
173 fn test_send_recv_msg_ip() {
174 let saddr = "0.0.0.0:9903".parse().unwrap();
175 let a = sync::UdpSocket::bind(saddr).unwrap();
176 let b = sync::UdpSocket::bind("0.0.0.0:0").unwrap();
177 let send_port = b.local_addr().unwrap().port();
178 let send_addr = b.local_addr().unwrap().ip();
179 let buf = b"hello world";
180 let src = Source::Ip("0.0.0.0".parse().unwrap());
181 let tr = Transmit::new(saddr, *buf).src_ip(src);
182 b.send_msg(&UdpState::new(), tr).unwrap();
183 let mut r = [0; 1024];
185 let meta = a.recv_msg(&mut r).unwrap();
186 assert_eq!(buf[..], r[..11]);
187 assert_eq!(send_port, meta.addr.port());
190 assert_eq!(meta.ifindex, 1);
191 assert!(matches!(
192 meta.dst_local_ip,
193 Some(addr) if addr == send_addr || addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
195 ));
196 }
197}