binger_udp/socket.rs
1use crate::sys::{self, Fd};
2use std::io;
3use std::net::SocketAddr;
4#[cfg(unix)]
5use std::os::fd::AsRawFd;
6#[cfg(all(unix, not(feature = "tokio")))]
7use std::os::fd::IntoRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(all(windows, not(feature = "tokio")))]
11use std::os::windows::io::IntoRawSocket;
12
13use crate::batch::{RecvBatchRaw, SendBatchRaw};
14#[cfg(feature = "metrics")]
15use crate::metrics::BingerMetrics;
16use crate::sockaddr;
17
18/// Socket configuration for [`BingerUdp`].
19///
20/// Create a `Config` with [`Config::new()`] or [`Config::default()`], then
21/// customize it using the builder-style `with_*` methods.
22///
23/// # Default values
24///
25/// | Field | Default |
26/// |-------|---------|
27/// | `batch_size` | 32 |
28/// | `send_buf_size` (kernel `SO_SNDBUF`) | Not set (OS default) |
29/// | `recv_os_buf_size` (kernel `SO_RCVBUF`) | Not set (OS default) |
30/// | `adaptive_batching` | `false` |
31/// | `metrics_enabled` | `false` |
32///
33/// # Example
34///
35/// ```rust
36/// use binger_udp::Config;
37///
38/// let config = Config::new()
39/// .with_batch_size(64)
40/// .with_adaptive_batching(true);
41/// ```
42#[derive(Debug, Clone)]
43pub struct Config {
44 pub(crate) batch_size: usize,
45 pub(crate) send_buf_size: Option<usize>,
46 pub(crate) recv_os_buf_size: Option<usize>,
47 pub(crate) adaptive_batching: bool,
48 #[cfg(feature = "metrics")]
49 pub(crate) metrics_enabled: bool,
50}
51
52impl Default for Config {
53 fn default() -> Self {
54 Self {
55 batch_size: 32,
56 send_buf_size: None,
57 recv_os_buf_size: None,
58 adaptive_batching: false,
59 #[cfg(feature = "metrics")]
60 metrics_enabled: false,
61 }
62 }
63}
64
65impl Config {
66 /// Creates a new `Config` with default values.
67 ///
68 /// Equivalent to [`Config::default()`].
69 #[must_use]
70 pub fn new() -> Self {
71 Self::default()
72 }
73
74 /// Sets the target batch size for [`BingerUdp::send_batch`] and
75 /// [`BingerUdp::recv_batch`] operations.
76 ///
77 /// The actual batch size used may be adjusted if adaptive batching is
78 /// enabled via [`Config::with_adaptive_batching`].
79 ///
80 /// Default: `32`.
81 #[must_use]
82 pub fn with_batch_size(mut self, n: usize) -> Self {
83 self.batch_size = n;
84 self
85 }
86
87 /// Sets the kernel send buffer size (`SO_SNDBUF`) for the socket.
88 ///
89 /// This is an OS-level socket option that controls how much data the kernel
90 /// buffers for sending. Larger values can improve throughput at the cost
91 /// of memory.
92 ///
93 /// Default: `None` (OS default is used).
94 #[must_use]
95 pub fn with_send_buf_size(mut self, n: usize) -> Self {
96 self.send_buf_size = Some(n);
97 self
98 }
99
100 /// Sets the kernel receive buffer size (`SO_RCVBUF`) for the socket.
101 ///
102 /// This is an OS-level socket option. The kernel may double the requested
103 /// value. Useful for high-throughput receivers that want to avoid packet
104 /// drops in the kernel.
105 ///
106 /// Default: `None` (OS default is used).
107 #[must_use]
108 pub fn with_recv_os_buf_size(mut self, n: usize) -> Self {
109 self.recv_os_buf_size = Some(n);
110 self
111 }
112
113 /// Enables or disables adaptive batching.
114 ///
115 /// When enabled, the effective batch size is dynamically adjusted based on
116 /// the rate of `WouldBlock` events. The batch size is reduced when the
117 /// `WouldBlock` rate exceeds 30%, and increased when it drops below 10%.
118 /// Adjustments occur at most every 100 ms.
119 ///
120 /// Default: `false`.
121 ///
122 /// # Related
123 ///
124 /// * [`BingerUdp::recommended_batch_size`] — queries the current
125 /// effective batch size.
126 #[must_use]
127 pub fn with_adaptive_batching(mut self, enabled: bool) -> Self {
128 self.adaptive_batching = enabled;
129 self
130 }
131
132 /// Enables or disables built-in metrics collection.
133 ///
134 /// When enabled, [`BingerUdp::metrics`] returns a reference to the
135 /// [`BingerMetrics`] instance with atomic counters for packets
136 /// sent/received, batch operations, syscalls, and error events.
137 ///
138 /// Default: `false`.
139 ///
140 /// Only available with the `metrics` feature.
141 #[cfg(feature = "metrics")]
142 #[must_use]
143 pub fn with_metrics(mut self, enabled: bool) -> Self {
144 self.metrics_enabled = enabled;
145 self
146 }
147}
148
149/// Reports which platform-optimized syscalls and features are available
150/// at compile time.
151///
152/// `PlatformCaps` is obtained via [`platform_capabilities()`] or
153/// [`BingerUdp::capabilities()`]. Use it to dynamically select code paths
154/// or to log which backend is active.
155///
156/// Some fields are conditionally compiled:
157///
158/// | Field | Platform / Feature |
159/// |-------|--------------------|
160/// | `supports_sendmsg_x`, `supports_recvmsg_x` | `target_os = "macos"` |
161/// | `supports_wsa_send_msg`, `supports_wsa_recv_msg` | `target_os = "windows"` |
162/// | `supports_timestamping` | `feature = "timestamping"` |
163/// | `supports_pktinfo` | `feature = "pktinfo"` |
164///
165/// # Example
166///
167/// ```rust
168/// use binger_udp::platform_capabilities;
169///
170/// let caps = platform_capabilities();
171/// println!("Backend: {}", caps.backend_name);
172/// ```
173#[allow(clippy::struct_excessive_bools)]
174pub struct PlatformCaps {
175 /// Whether `sendmmsg` is available (Linux only).
176 pub supports_sendmmsg: bool,
177 /// Whether `recvmmsg` is available (Linux only).
178 pub supports_recvmmsg: bool,
179 /// Whether `sendmsg_x` is available (macOS only, runtime-detected via dlsym).
180 #[cfg(target_os = "macos")]
181 pub supports_sendmsg_x: bool,
182 /// Whether `recvmsg_x` is available (macOS only, runtime-detected via dlsym).
183 #[cfg(target_os = "macos")]
184 pub supports_recvmsg_x: bool,
185 /// Whether `WSASendMsg` is available (Windows only, runtime-detected via `WSAIoctl`).
186 #[cfg(target_os = "windows")]
187 pub supports_wsa_send_msg: bool,
188 /// Whether `WSARecvMsg` is available (Windows only, runtime-detected via `WSAIoctl`).
189 #[cfg(target_os = "windows")]
190 pub supports_wsa_recv_msg: bool,
191 /// Whether Generic Segmentation Offload (GSO) is available (Linux, requires `gso` feature).
192 pub supports_gso: bool,
193 /// Whether Generic Receive Offload (GRO) is available (Linux, requires `gro` feature).
194 pub supports_gro: bool,
195 /// Whether `SO_BUSY_POLL` is available (Linux, requires `busy-poll` feature).
196 pub supports_busy_poll: bool,
197 /// Whether `SO_MAX_PACING_RATE` is available (Linux, requires `pacing` feature).
198 pub supports_pacing: bool,
199 /// Whether kernel timestamping is available (Linux, requires `timestamping` feature).
200 #[cfg(feature = "timestamping")]
201 pub supports_timestamping: bool,
202 /// Whether `IP_PKTINFO` / `IPV6_RECVPKTINFO` is available (Linux, requires `pktinfo` feature).
203 #[cfg(feature = "pktinfo")]
204 pub supports_pktinfo: bool,
205 /// Maximum batch size supported by the backend.
206 ///
207 /// Linux backends typically support up to 1024; other platforms up to 32.
208 pub max_batch_size: usize,
209 /// Human-readable name of the active backend.
210 ///
211 /// Examples: `"sendmmsg/recvmmsg (Linux)"`, `"fallback (loop sendto/recvfrom)"`.
212 pub backend_name: &'static str,
213}
214
215/// Returns the [`PlatformCaps`] for the current platform and enabled features.
216///
217/// This function is stateless — it returns compile-time constants that reflect
218/// the target OS and the feature flags the crate was built with.
219///
220/// # Example
221///
222/// ```rust
223/// use binger_udp::platform_capabilities;
224///
225/// let caps = platform_capabilities();
226/// assert!(!caps.backend_name.is_empty());
227/// ```
228#[must_use]
229pub fn platform_capabilities() -> PlatformCaps {
230 PlatformCaps {
231 supports_sendmmsg: cfg!(target_os = "linux"),
232 supports_recvmmsg: cfg!(target_os = "linux"),
233 #[cfg(target_os = "macos")]
234 supports_sendmsg_x: true,
235 #[cfg(target_os = "macos")]
236 supports_recvmsg_x: true,
237 #[cfg(target_os = "windows")]
238 supports_wsa_send_msg: true,
239 #[cfg(target_os = "windows")]
240 supports_wsa_recv_msg: true,
241 supports_gso: cfg!(all(target_os = "linux", feature = "gso")),
242 supports_gro: cfg!(all(target_os = "linux", feature = "gro")),
243 supports_busy_poll: cfg!(all(target_os = "linux", feature = "busy-poll")),
244 supports_pacing: cfg!(all(target_os = "linux", feature = "pacing")),
245 #[cfg(feature = "timestamping")]
246 supports_timestamping: cfg!(all(target_os = "linux", feature = "timestamping")),
247 #[cfg(feature = "pktinfo")]
248 supports_pktinfo: cfg!(all(target_os = "linux", feature = "pktinfo")),
249 max_batch_size: if cfg!(target_os = "linux") { 1024 } else { 32 },
250 backend_name: backends(),
251 }
252}
253
254const fn backends() -> &'static str {
255 #[cfg(target_os = "linux")]
256 {
257 "sendmmsg/recvmmsg (Linux)"
258 }
259 #[cfg(target_os = "macos")]
260 {
261 "sendmsg_x/recvmsg_x (macOS)"
262 }
263 #[cfg(target_os = "windows")]
264 {
265 "WSASendMsg/WSARecvMsg (Windows)"
266 }
267 #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
268 {
269 "fallback (loop sendto/recvfrom)"
270 }
271}
272
273struct AdaptiveState {
274 target_size: usize,
275 would_block_count: u64,
276 total_send_count: u64,
277 last_adjustment: std::time::Instant,
278}
279
280impl AdaptiveState {
281 const MIN_BATCH: usize = 1;
282 const MAX_BATCH: usize = 1024;
283 const ADJUSTMENT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
284
285 fn new(initial: usize) -> Self {
286 Self {
287 target_size: initial.clamp(Self::MIN_BATCH, Self::MAX_BATCH),
288 would_block_count: 0,
289 total_send_count: 0,
290 last_adjustment: std::time::Instant::now(),
291 }
292 }
293
294 fn record_would_block(&mut self) {
295 self.would_block_count += 1;
296 self.total_send_count += 1;
297 }
298
299 fn record_event(&mut self) {
300 self.total_send_count += 1;
301 }
302
303 #[allow(clippy::cast_precision_loss)]
304 fn maybe_adjust(&mut self) {
305 if self.last_adjustment.elapsed() < Self::ADJUSTMENT_INTERVAL {
306 return;
307 }
308 if self.total_send_count == 0 {
309 return;
310 }
311 let wb_rate = self.would_block_count as f64 / self.total_send_count as f64;
312 if wb_rate > 0.3 {
313 self.target_size = (self.target_size / 2).max(Self::MIN_BATCH);
314 } else if wb_rate < 0.1 && self.target_size < Self::MAX_BATCH {
315 self.target_size = (self.target_size * 3 / 2).min(Self::MAX_BATCH);
316 }
317 self.would_block_count = 0;
318 self.total_send_count = 0;
319 self.last_adjustment = std::time::Instant::now();
320 }
321
322 fn recommended_size(&self) -> usize {
323 self.target_size
324 }
325}
326
327/// A batch-native UDP socket that automatically selects the most efficient
328/// platform syscall for send and receive operations.
329///
330/// # Platform backends
331///
332/// | Platform | Send backend | Recv backend |
333/// |----------|-------------|--------------|
334/// | Linux (connected) | `sendmsg` with GSO | `recvmmsg` with GRO |
335/// | Linux (multi-dest) | `sendmmsg` | `recvmmsg` |
336/// | macOS | `sendmsg_x` (via dlsym) | `recvmsg_x` (via dlsym) |
337/// | Windows | `WSASendMsg` (via `WSAIoctl`) | `WSARecvMsg` (via `WSAIoctl`) |
338/// | Fallback | `sendto` (loop) | `recvfrom` (loop) |
339///
340/// # Async support
341///
342/// With the default `tokio` feature, all `async` methods use Tokio's
343/// readiness-based I/O ([`tokio::net::UdpSocket::readable`] /
344/// [`tokio::net::UdpSocket::writable`]) without busy-looping.
345///
346/// # Example
347///
348/// ```rust,no_run
349/// use binger_udp::{BingerUdp, SendBatch, RecvBatch, Config};
350///
351/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
352/// let socket = BingerUdp::from_std(
353/// std::net::UdpSocket::bind("0.0.0.0:0")?,
354/// Config::default(),
355/// )?;
356///
357/// let mut send = SendBatch::<32>::new();
358/// send.push(b"hello", "192.168.1.1:8080".parse().unwrap())?;
359/// socket.send_batch(&mut send).await?;
360/// # Ok(())
361/// # }
362/// ```
363pub struct BingerUdp {
364 fd: Fd,
365 #[cfg(feature = "tokio")]
366 tokio_sock: tokio::net::UdpSocket,
367 #[cfg(feature = "metrics")]
368 metrics: Option<BingerMetrics>,
369 adaptive_send: Option<std::sync::Mutex<AdaptiveState>>,
370 adaptive_recv: Option<std::sync::Mutex<AdaptiveState>>,
371}
372
373impl BingerUdp {
374 /// Get raw socket handle from a borrowed `UdpSocket`.
375 fn raw_fd_std(socket: &std::net::UdpSocket) -> Fd {
376 #[cfg(unix)]
377 {
378 socket.as_raw_fd()
379 }
380 #[cfg(windows)]
381 {
382 socket.as_raw_socket() as Fd
383 }
384 }
385
386 /// Get raw socket handle from an owned `UdpSocket` (consumes it).
387 #[cfg(not(feature = "tokio"))]
388 fn raw_fd_std_owned(socket: std::net::UdpSocket) -> Fd {
389 #[cfg(unix)]
390 {
391 socket.into_raw_fd()
392 }
393 #[cfg(windows)]
394 {
395 socket.into_raw_socket() as Fd
396 }
397 }
398
399 /// Creates a [`BingerUdp`] from a standard [`std::net::UdpSocket`] and
400 /// [`Config`].
401 ///
402 /// The socket is set to non-blocking mode. OS-level buffer sizes specified
403 /// in the config (`SO_SNDBUF`, `SO_RCVBUF`) are applied before constructing
404 /// the wrapper.
405 ///
406 /// With the default `tokio` feature, the socket is converted into a
407 /// [`tokio::net::UdpSocket`] for async readiness-based I/O.
408 ///
409 /// This is the primary (and only) way to create a `BingerUdp` instance.
410 ///
411 /// # Errors
412 ///
413 /// Returns an I/O error if:
414 /// * Setting the socket to non-blocking fails.
415 /// * Setting OS socket buffer sizes fails.
416 /// * Converting to a tokio socket fails (with `tokio` feature).
417 ///
418 /// # Example
419 ///
420 /// ```rust,no_run
421 /// use binger_udp::{BingerUdp, Config};
422 ///
423 /// let socket = BingerUdp::from_std(
424 /// std::net::UdpSocket::bind("0.0.0.0:0").unwrap(),
425 /// Config::default(),
426 /// ).unwrap();
427 /// ```
428 #[allow(clippy::needless_pass_by_value)]
429 pub fn from_std(socket: std::net::UdpSocket, config: Config) -> io::Result<Self> {
430 socket.set_nonblocking(true)?;
431
432 if let Some(size) = config.send_buf_size {
433 sockaddr::raw_setsockopt(
434 Self::raw_fd_std(&socket),
435 sys::SOL_SOCKET,
436 sys::SO_SNDBUF,
437 size as libc::c_int,
438 )?;
439 }
440 if let Some(size) = config.recv_os_buf_size {
441 sockaddr::raw_setsockopt(
442 Self::raw_fd_std(&socket),
443 sys::SOL_SOCKET,
444 sys::SO_RCVBUF,
445 size as libc::c_int,
446 )?;
447 }
448
449 #[cfg(feature = "tokio")]
450 let fd = Self::raw_fd_std(&socket);
451 #[cfg(feature = "tokio")]
452 let tokio_sock = tokio::net::UdpSocket::from_std(socket)?;
453
454 #[cfg(not(feature = "tokio"))]
455 let fd = Self::raw_fd_std_owned(socket);
456
457 Ok(Self {
458 fd,
459 #[cfg(feature = "tokio")]
460 tokio_sock,
461 #[cfg(feature = "metrics")]
462 metrics: if config.metrics_enabled {
463 Some(BingerMetrics::default())
464 } else {
465 None
466 },
467 adaptive_send: if config.adaptive_batching {
468 Some(std::sync::Mutex::new(AdaptiveState::new(config.batch_size)))
469 } else {
470 None
471 },
472 adaptive_recv: if config.adaptive_batching {
473 Some(std::sync::Mutex::new(AdaptiveState::new(config.batch_size)))
474 } else {
475 None
476 },
477 })
478 }
479
480 /// Sends all packets in the batch, retrying on `WouldBlock`.
481 ///
482 /// This is the primary send API. It calls [`BingerUdp::try_send_batch`] in a
483 /// loop, waiting for the socket to become writable whenever the kernel
484 /// returns `WouldBlock`.
485 ///
486 /// The `batch` argument can be a [`SendBatch<N>`](crate::batch::SendBatch)
487 /// or [`SendBatchRaw`] (the former dereferences via [`std::ops::DerefMut`]).
488 ///
489 /// Returns the total number of packets sent on success (always equal to
490 /// `batch.len()` since retries are transparent).
491 ///
492 /// # Errors
493 ///
494 /// Returns the underlying I/O error on failure. `WouldBlock` is handled
495 /// internally and never returned to the caller.
496 ///
497 /// # Related
498 ///
499 /// * [`BingerUdp::try_send_batch`] — non-blocking variant.
500 /// * [`BingerUdp::send_to`] — single-packet convenience wrapper.
501 pub async fn send_batch(&self, batch: &mut SendBatchRaw) -> io::Result<usize> {
502 loop {
503 match self.try_send_batch(batch) {
504 Ok(n) => return Ok(n),
505 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
506 if let Some(ref state) = self.adaptive_send {
507 if let Ok(mut s) = state.lock() {
508 s.record_would_block();
509 s.maybe_adjust();
510 }
511 }
512 self.wait_writable().await?;
513 }
514 Err(e) => return Err(e),
515 }
516 }
517 }
518
519 /// Receives a batch of packets, retrying on `WouldBlock`.
520 ///
521 /// This is the primary receive API. It calls [`BingerUdp::try_recv_batch`]
522 /// in a loop, waiting for the socket to become readable whenever the kernel
523 /// returns `WouldBlock`.
524 ///
525 /// The `batch` argument can be a [`RecvBatch<N>`](crate::batch::RecvBatch)
526 /// or [`RecvBatchRaw`] (the former dereferences via [`std::ops::DerefMut`]).
527 ///
528 /// Returns the number of packets actually received (may be less than
529 /// `batch.capacity()`).
530 ///
531 /// # Errors
532 ///
533 /// Returns the underlying I/O error on failure. `WouldBlock` is handled
534 /// internally and never returned to the caller.
535 ///
536 /// # Related
537 ///
538 /// * [`BingerUdp::try_recv_batch`] — non-blocking variant.
539 /// * [`BingerUdp::recv_from`] — single-packet convenience wrapper.
540 pub async fn recv_batch(&self, batch: &mut RecvBatchRaw) -> io::Result<usize> {
541 loop {
542 match self.try_recv_batch(batch) {
543 Ok(0) => {
544 if let Some(ref state) = self.adaptive_recv {
545 if let Ok(mut s) = state.lock() {
546 s.record_would_block();
547 s.maybe_adjust();
548 }
549 }
550 self.wait_readable().await?;
551 }
552 Ok(n) => return Ok(n),
553 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
554 if let Some(ref state) = self.adaptive_recv {
555 if let Ok(mut s) = state.lock() {
556 s.record_would_block();
557 s.maybe_adjust();
558 }
559 }
560 self.wait_readable().await?;
561 }
562 Err(e) => return Err(e),
563 }
564 }
565 }
566
567 /// Attempts to send a batch of packets without retrying on `WouldBlock`.
568 ///
569 /// Unlike [`BingerUdp::send_batch`], this method returns
570 /// [`io::ErrorKind::WouldBlock`] immediately if the socket is not ready
571 /// for writing. This is useful for integrating with custom event loops or
572 /// `select!`-based concurrency.
573 ///
574 /// The `batch` argument can be a [`SendBatch<N>`](crate::batch::SendBatch)
575 /// or [`SendBatchRaw`].
576 ///
577 /// Returns the number of packets actually sent (may be less than
578 /// `batch.len()` on a partial write, or 0 if `WouldBlock`).
579 ///
580 /// # Errors
581 ///
582 /// Returns the underlying I/O error on failure, including `WouldBlock`.
583 ///
584 /// # Related
585 ///
586 /// * [`BingerUdp::send_batch`] — retry-on-WouldBlock variant.
587 pub fn try_send_batch(&self, batch: &mut SendBatchRaw) -> io::Result<usize> {
588 #[cfg(not(feature = "metrics"))]
589 let sent = crate::platform::try_send_batch(self.fd, batch)?;
590 #[cfg(feature = "metrics")]
591 let sent = crate::platform::try_send_batch(self.fd, batch).map_err(|e| {
592 if let Some(ref m) = self.metrics {
593 m.inc_send_errors();
594 if e.kind() == io::ErrorKind::WouldBlock {
595 m.inc_send_would_block();
596 }
597 }
598 e
599 })?;
600
601 if let Some(ref state) = self.adaptive_send {
602 if let Ok(mut s) = state.lock() {
603 s.record_event();
604 }
605 }
606
607 #[cfg(feature = "metrics")]
608 if let Some(ref m) = self.metrics {
609 m.inc_packets_sent(sent as u64);
610 m.inc_batches_sent();
611 m.inc_send_syscalls();
612 }
613
614 Ok(sent)
615 }
616
617 /// Attempts to receive a batch of packets without retrying on `WouldBlock`.
618 ///
619 /// Unlike [`BingerUdp::recv_batch`], this method returns
620 /// [`io::ErrorKind::WouldBlock`] immediately if the socket is not ready
621 /// for reading.
622 ///
623 /// The `batch` argument can be a [`RecvBatch<N>`](crate::batch::RecvBatch)
624 /// or [`RecvBatchRaw`].
625 ///
626 /// Returns the number of packets actually received (0 if `WouldBlock`).
627 ///
628 /// # Errors
629 ///
630 /// Returns the underlying I/O error on failure, including `WouldBlock`.
631 ///
632 /// # Related
633 ///
634 /// * [`BingerUdp::recv_batch`] — retry-on-WouldBlock variant.
635 pub fn try_recv_batch(&self, batch: &mut RecvBatchRaw) -> io::Result<usize> {
636 #[cfg(not(feature = "metrics"))]
637 let received = crate::platform::try_recv_batch(self.fd, batch)?;
638 #[cfg(feature = "metrics")]
639 let received = crate::platform::try_recv_batch(self.fd, batch).map_err(|e| {
640 if let Some(ref m) = self.metrics {
641 m.inc_recv_errors();
642 if e.kind() == io::ErrorKind::WouldBlock {
643 m.inc_recv_would_block();
644 }
645 }
646 e
647 })?;
648
649 if let Some(ref state) = self.adaptive_recv {
650 if let Ok(mut s) = state.lock() {
651 s.record_event();
652 }
653 }
654
655 #[cfg(feature = "metrics")]
656 if let Some(ref m) = self.metrics {
657 m.inc_packets_received(received as u64);
658 m.inc_batches_received();
659 m.inc_recv_syscalls();
660 }
661
662 Ok(received)
663 }
664
665 /// Sends a single UDP datagram to the specified address, retrying on
666 /// `WouldBlock`.
667 ///
668 /// This is a convenience wrapper around [`BingerUdp::send_batch`] that
669 /// constructs a single-element batch internally.
670 ///
671 /// # Errors
672 ///
673 /// Returns the underlying I/O error on failure.
674 ///
675 /// # Panics
676 ///
677 /// Panics if the internal single-element batch capacity is exceeded, which
678 /// should never happen.
679 ///
680 /// # Related
681 ///
682 /// * [`BingerUdp::send_batch`] — batch variant for higher throughput.
683 /// * [`BingerUdp::try_send_to`] — non-blocking single-packet variant.
684 pub async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> {
685 let mut batch = SendBatchRaw::with_capacity(1);
686 batch.push(buf, Some(addr)).expect("capacity 1");
687 self.send_batch(&mut batch).await?;
688 Ok(buf.len())
689 }
690
691 /// Receives a single UDP datagram into the buffer, retrying on `WouldBlock`.
692 ///
693 /// This is a convenience wrapper around [`BingerUdp::recv_batch`] that
694 /// constructs a single-element batch internally.
695 ///
696 /// Returns the number of bytes received and the source address.
697 ///
698 /// # Errors
699 ///
700 /// Returns the underlying I/O error on failure.
701 ///
702 /// # Related
703 ///
704 /// * [`BingerUdp::recv_batch`] — batch variant for higher throughput.
705 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
706 let mut batch = RecvBatchRaw::with_capacity(1, buf.len());
707 self.recv_batch(&mut batch).await?;
708 let data = batch.data(0);
709 let addr = batch.addr(0);
710 let len = data.len().min(buf.len());
711 buf[..len].copy_from_slice(&data[..len]);
712 Ok((len, addr))
713 }
714
715 /// Attempts to send a single UDP datagram without retrying on `WouldBlock`.
716 ///
717 /// This is a convenience wrapper around [`BingerUdp::try_send_batch`] that
718 /// constructs a single-element batch internally.
719 ///
720 /// # Errors
721 ///
722 /// Returns the underlying I/O error on failure, including `WouldBlock`.
723 ///
724 /// # Panics
725 ///
726 /// Panics if the internal single-element batch capacity is exceeded, which
727 /// should never happen.
728 ///
729 /// # Related
730 ///
731 /// * [`BingerUdp::send_to`] — retry-on-WouldBlock variant.
732 /// * [`BingerUdp::try_send_batch`] — batch variant.
733 pub fn try_send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> {
734 let mut batch = SendBatchRaw::with_capacity(1);
735 batch.push(buf, Some(addr)).expect("capacity 1");
736 self.try_send_batch(&mut batch)?;
737 Ok(buf.len())
738 }
739
740 /// Connects the UDP socket to a remote address.
741 ///
742 /// A connected UDP socket can only send to and receive from that address.
743 /// This is required for GSO (Generic Segmentation Offload) on Linux.
744 ///
745 /// # Errors
746 ///
747 /// Returns the underlying I/O error on failure (e.g., invalid address).
748 pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
749 sockaddr::raw_connect(self.fd, addr)
750 }
751
752 /// Returns the local socket address.
753 ///
754 /// # Errors
755 ///
756 /// Returns the underlying IO error on failure.
757 pub fn local_addr(&self) -> io::Result<SocketAddr> {
758 sockaddr::raw_getsockname(self.fd)
759 }
760
761 /// Returns the TTL of the socket.
762 ///
763 /// # Errors
764 ///
765 /// Returns the underlying IO error on failure.
766 pub fn ttl(&self) -> io::Result<u32> {
767 sockaddr::raw_getsockopt(self.fd, sys::IPPROTO_IP, sys::IP_TTL).map(|v| v as u32)
768 }
769
770 /// Sets the TTL of the socket.
771 ///
772 /// # Errors
773 ///
774 /// Returns the underlying IO error on failure.
775 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
776 sockaddr::raw_setsockopt(self.fd, sys::IPPROTO_IP, sys::IP_TTL, ttl as libc::c_int)
777 }
778
779 /// Returns the underlying file descriptor (or socket handle on Windows).
780 ///
781 /// On Unix, this is a `RawFd` (`c_int`). On Windows, this is a `SOCKET`
782 /// (`usize`).
783 ///
784 /// Use this to pass the socket to low-level system calls or integration
785 /// with other I/O libraries.
786 #[must_use]
787 pub fn as_raw_fd(&self) -> Fd {
788 self.fd
789 }
790
791 /// Enables or disables Generic Segmentation Offload (GSO).
792 ///
793 /// GSO allows the kernel to segment a large UDP datagram into MTU-sized
794 /// segments, reducing the number of user-to-kernel transitions. Requires a
795 /// connected socket (see [`BingerUdp::connect`]).
796 ///
797 /// Only available on Linux with the `gso` feature.
798 ///
799 /// # Errors
800 ///
801 /// Returns an I/O error if the setsockopt call fails.
802 #[cfg(all(target_os = "linux", feature = "gso"))]
803 pub fn set_gso(&self, enabled: bool) -> io::Result<()> {
804 sockaddr::raw_setsockopt(
805 self.fd,
806 libc::IPPROTO_UDP,
807 libc::UDP_SEGMENT,
808 i32::from(enabled),
809 )
810 }
811
812 /// Enables or disables Generic Receive Offload (GRO).
813 ///
814 /// GRO allows the kernel to coalesce multiple incoming UDP datagrams into
815 /// a single large buffer, reducing the number of receive syscalls.
816 ///
817 /// Only available on Linux with the `gro` feature.
818 ///
819 /// # Errors
820 ///
821 /// Returns an I/O error if the setsockopt call fails.
822 #[cfg(all(target_os = "linux", feature = "gro"))]
823 pub fn set_gro(&self, enabled: bool) -> io::Result<()> {
824 sockaddr::raw_setsockopt(
825 self.fd,
826 libc::IPPROTO_UDP,
827 libc::UDP_GRO,
828 i32::from(enabled),
829 )
830 }
831
832 /// Sends a large datagram segmented by GSO in a single syscall
833 /// (non-blocking).
834 ///
835 /// The kernel fragments `data` into segments of `segment_size` bytes each
836 /// (the last segment may be smaller). The socket must be connected (see
837 /// [`BingerUdp::connect`]) and GSO must be enabled via
838 /// [`BingerUdp::set_gso(true)`](BingerUdp::set_gso).
839 ///
840 /// Only available on Linux with the `gso` feature and without `miri-safe`.
841 ///
842 /// Returns the total number of bytes sent (the entire `data` payload on
843 /// success).
844 ///
845 /// # Errors
846 ///
847 /// Returns the underlying I/O error on failure, including `WouldBlock`.
848 #[cfg(all(target_os = "linux", feature = "gso", not(feature = "miri-safe")))]
849 pub fn try_send_gso(&self, data: &[u8], segment_size: u16) -> io::Result<usize> {
850 crate::platform::try_send_gso(self.fd, data, segment_size)
851 }
852
853 /// Sends a large datagram segmented by GSO, retrying on `WouldBlock`.
854 ///
855 /// Calls [`BingerUdp::try_send_gso`] in a loop, waiting for the socket
856 /// to become writable on `WouldBlock`.
857 ///
858 /// Only available on Linux with the `gso` feature and without `miri-safe`.
859 ///
860 /// # Errors
861 ///
862 /// Returns the underlying I/O error on failure. `WouldBlock` is handled
863 /// internally.
864 #[cfg(all(target_os = "linux", feature = "gso", not(feature = "miri-safe")))]
865 pub async fn send_gso(&self, data: &[u8], segment_size: u16) -> io::Result<usize> {
866 loop {
867 match self.try_send_gso(data, segment_size) {
868 Ok(n) => return Ok(n),
869 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
870 self.wait_writable().await?;
871 }
872 Err(e) => return Err(e),
873 }
874 }
875 }
876
877 /// Sets the maximum pacing rate for UDP sends.
878 ///
879 /// `bytes_per_sec` is the rate in bytes per second. Set to `0` to disable
880 /// pacing. Requires the `fq` qdisc (fair queueing) on the egress path.
881 ///
882 /// Only available on Linux with the `pacing` feature.
883 ///
884 /// # Errors
885 ///
886 /// Returns the underlying I/O error if setsockopt fails.
887 #[cfg(all(target_os = "linux", feature = "pacing"))]
888 pub fn set_pacing_rate(&self, bytes_per_sec: u32) -> io::Result<()> {
889 sockaddr::raw_setsockopt_u32(
890 self.fd,
891 libc::SOL_SOCKET,
892 crate::sys::SO_MAX_PACING_RATE,
893 bytes_per_sec,
894 )
895 }
896
897 /// Enables busy-polling for ultra-low latency (`SO_BUSY_POLL`).
898 ///
899 /// `usecs` is the time in microseconds to busy-poll. Higher values reduce
900 /// latency at the cost of CPU usage. Requires `CAP_NET_ADMIN` or the
901 /// `net.core.busy_poll` sysctl to be configured globally.
902 ///
903 /// Only available on Linux with the `busy-poll` feature.
904 ///
905 /// # Errors
906 ///
907 /// Returns the underlying I/O error if setsockopt fails.
908 #[cfg(all(target_os = "linux", feature = "busy-poll"))]
909 pub fn set_busy_poll(&self, usecs: u32) -> io::Result<()> {
910 sockaddr::raw_setsockopt(
911 self.fd,
912 libc::SOL_SOCKET,
913 crate::sys::SO_BUSY_POLL,
914 usecs as libc::c_int,
915 )
916 }
917
918 /// Enables software receive timestamping (`SO_TIMESTAMPNS`).
919 ///
920 /// When enabled, each received packet carries a kernel timestamp
921 /// accessible via [`RecvBatch::timestamp`](crate::batch::RecvBatch::timestamp).
922 ///
923 /// # Errors
924 ///
925 /// Returns the underlying IO error if setsockopt fails.
926 #[cfg(all(target_os = "linux", feature = "timestamping"))]
927 pub fn enable_timestamping(&self, enabled: bool) -> io::Result<()> {
928 sockaddr::raw_setsockopt(
929 self.fd,
930 libc::SOL_SOCKET,
931 crate::sys::SO_TIMESTAMPNS,
932 i32::from(enabled),
933 )
934 }
935
936 /// Enables `IP_PKTINFO` / `IPV6_RECVPKTINFO` for receiving destination addresses.
937 ///
938 /// When enabled, each received packet's destination address is available
939 /// via [`RecvBatch::dst_addr`](crate::batch::RecvBatch::dst_addr).
940 ///
941 /// # Errors
942 ///
943 /// Returns the underlying IO error if setsockopt fails.
944 #[cfg(all(target_os = "linux", feature = "pktinfo"))]
945 pub fn enable_pktinfo(&self, enabled: bool) -> io::Result<()> {
946 sockaddr::raw_setsockopt(
947 self.fd,
948 libc::IPPROTO_IP,
949 libc::IP_PKTINFO,
950 i32::from(enabled),
951 )?;
952 sockaddr::raw_setsockopt(
953 self.fd,
954 libc::IPPROTO_IPV6,
955 libc::IPV6_RECVPKTINFO,
956 i32::from(enabled),
957 )
958 }
959
960 /// Returns the [`PlatformCaps`] for the current platform and enabled features.
961 ///
962 /// This is a shorthand for calling [`platform_capabilities()`] directly.
963 #[must_use]
964 pub fn capabilities(&self) -> PlatformCaps {
965 platform_capabilities()
966 }
967
968 /// Returns the recommended batch size based on adaptive batching state.
969 ///
970 /// When adaptive batching is enabled via
971 /// [`Config::with_adaptive_batching`], this returns the dynamically
972 /// adjusted batch size (pass `true` at construction time). The adjustment happens at most every 100 ms:
973 ///
974 /// * When the `WouldBlock` rate exceeds 30%, the batch size is halved.
975 /// * When the rate drops below 10%, the batch size is increased by 50%.
976 /// * The batch size is clamped to the range `[1, 1024]`.
977 ///
978 /// If adaptive batching is disabled, returns a fixed default of `32`.
979 ///
980 /// # Panics
981 #[must_use]
982 pub fn recommended_batch_size(&self) -> usize {
983 self.adaptive_send
984 .as_ref()
985 .and_then(|s| s.lock().ok())
986 .map_or(32, |g| g.recommended_size())
987 }
988
989 /// Returns a reference to the [`BingerMetrics`] instance, if metrics
990 /// collection is enabled.
991 ///
992 /// Metrics must be enabled at construction time via
993 /// [`Config::with_metrics`]. This requires the `metrics` feature.
994 ///
995 /// Returns `None` if metrics are disabled.
996 #[cfg(feature = "metrics")]
997 #[must_use]
998 pub fn metrics(&self) -> Option<&BingerMetrics> {
999 self.metrics.as_ref()
1000 }
1001
1002 /// Waits until the socket becomes readable.
1003 ///
1004 /// This is a direct wrapper around
1005 /// [`tokio::net::UdpSocket::readable()`]. Use this in `tokio::select!`
1006 /// to be notified when data is available for
1007 /// [`BingerUdp::recv_batch`] or [`BingerUdp::recv_from`].
1008 ///
1009 /// Only available with the default `tokio` feature.
1010 ///
1011 /// # Errors
1012 ///
1013 /// Returns the underlying I/O error on failure.
1014 #[cfg(feature = "tokio")]
1015 pub async fn readable(&self) -> io::Result<()> {
1016 self.tokio_sock.readable().await
1017 }
1018
1019 /// Waits until the socket becomes writable.
1020 ///
1021 /// This is a direct wrapper around
1022 /// [`tokio::net::UdpSocket::writable()`]. Use this in `tokio::select!`
1023 /// to be notified when the socket can accept data for
1024 /// [`BingerUdp::send_batch`] or [`BingerUdp::send_to`].
1025 ///
1026 /// Only available with the default `tokio` feature.
1027 ///
1028 /// # Errors
1029 ///
1030 /// Returns the underlying I/O error on failure.
1031 #[cfg(feature = "tokio")]
1032 pub async fn writable(&self) -> io::Result<()> {
1033 self.tokio_sock.writable().await
1034 }
1035
1036 #[cfg(feature = "tokio")]
1037 async fn wait_writable(&self) -> io::Result<()> {
1038 self.tokio_sock.writable().await?;
1039 Ok(())
1040 }
1041
1042 #[cfg(feature = "tokio")]
1043 async fn wait_readable(&self) -> io::Result<()> {
1044 self.tokio_sock.readable().await?;
1045 Ok(())
1046 }
1047
1048 #[cfg(not(feature = "tokio"))]
1049 async fn wait_writable(&self) -> io::Result<()> {
1050 Err(io::Error::new(
1051 io::ErrorKind::Other,
1052 "tokio feature disabled",
1053 ))
1054 }
1055
1056 #[cfg(not(feature = "tokio"))]
1057 async fn wait_readable(&self) -> io::Result<()> {
1058 Err(io::Error::new(
1059 io::ErrorKind::Other,
1060 "tokio feature disabled",
1061 ))
1062 }
1063}
1064
1065#[cfg(not(feature = "tokio"))]
1066impl Drop for BingerUdp {
1067 /// Closes the underlying file descriptor.
1068 fn drop(&mut self) {
1069 sys::close_fd(self.fd);
1070 }
1071}
1072
1073// SAFETY: BingerUdp wraps a raw fd and optionally a tokio UdpSocket.
1074// Both are safe to send/share across threads — the fd is used with
1075// thread-safe syscalls, and tokio::net::UdpSocket is Send + Sync.
1076unsafe impl Send for BingerUdp {}
1077unsafe impl Sync for BingerUdp {}