Skip to main content

binger_udp/
socket.rs

1use crate::sys::{self, Fd};
2use std::io;
3use std::net::SocketAddr;
4#[cfg(unix)]
5use std::os::fd::AsRawFd;
6#[cfg(all(unix, not(feature = "tokio")))]
7use std::os::fd::IntoRawFd;
8#[cfg(windows)]
9use std::os::windows::io::AsRawSocket;
10#[cfg(all(windows, not(feature = "tokio")))]
11use std::os::windows::io::IntoRawSocket;
12
13use crate::batch::{RecvBatchRaw, SendBatchRaw};
14#[cfg(feature = "metrics")]
15use crate::metrics::BingerMetrics;
16use crate::sockaddr;
17
18/// Socket configuration for [`BingerUdp`].
19///
20/// Create a `Config` with [`Config::new()`] or [`Config::default()`], then
21/// customize it using the builder-style `with_*` methods.
22///
23/// # Default values
24///
25/// | Field | Default |
26/// |-------|---------|
27/// | `batch_size` | 32 |
28/// | `send_buf_size` (kernel `SO_SNDBUF`) | Not set (OS default) |
29/// | `recv_os_buf_size` (kernel `SO_RCVBUF`) | Not set (OS default) |
30/// | `adaptive_batching` | `false` |
31/// | `metrics_enabled` | `false` |
32///
33/// # Example
34///
35/// ```rust
36/// use binger_udp::Config;
37///
38/// let config = Config::new()
39///     .with_batch_size(64)
40///     .with_adaptive_batching(true);
41/// ```
42#[derive(Debug, Clone)]
43pub struct Config {
44    pub(crate) batch_size: usize,
45    pub(crate) send_buf_size: Option<usize>,
46    pub(crate) recv_os_buf_size: Option<usize>,
47    pub(crate) adaptive_batching: bool,
48    #[cfg(feature = "metrics")]
49    pub(crate) metrics_enabled: bool,
50}
51
52impl Default for Config {
53    fn default() -> Self {
54        Self {
55            batch_size: 32,
56            send_buf_size: None,
57            recv_os_buf_size: None,
58            adaptive_batching: false,
59            #[cfg(feature = "metrics")]
60            metrics_enabled: false,
61        }
62    }
63}
64
65impl Config {
66    /// Creates a new `Config` with default values.
67    ///
68    /// Equivalent to [`Config::default()`].
69    #[must_use]
70    pub fn new() -> Self {
71        Self::default()
72    }
73
74    /// Sets the target batch size for [`BingerUdp::send_batch`] and
75    /// [`BingerUdp::recv_batch`] operations.
76    ///
77    /// The actual batch size used may be adjusted if adaptive batching is
78    /// enabled via [`Config::with_adaptive_batching`].
79    ///
80    /// Default: `32`.
81    #[must_use]
82    pub fn with_batch_size(mut self, n: usize) -> Self {
83        self.batch_size = n;
84        self
85    }
86
87    /// Sets the kernel send buffer size (`SO_SNDBUF`) for the socket.
88    ///
89    /// This is an OS-level socket option that controls how much data the kernel
90    /// buffers for sending. Larger values can improve throughput at the cost
91    /// of memory.
92    ///
93    /// Default: `None` (OS default is used).
94    #[must_use]
95    pub fn with_send_buf_size(mut self, n: usize) -> Self {
96        self.send_buf_size = Some(n);
97        self
98    }
99
100    /// Sets the kernel receive buffer size (`SO_RCVBUF`) for the socket.
101    ///
102    /// This is an OS-level socket option. The kernel may double the requested
103    /// value. Useful for high-throughput receivers that want to avoid packet
104    /// drops in the kernel.
105    ///
106    /// Default: `None` (OS default is used).
107    #[must_use]
108    pub fn with_recv_os_buf_size(mut self, n: usize) -> Self {
109        self.recv_os_buf_size = Some(n);
110        self
111    }
112
113    /// Enables or disables adaptive batching.
114    ///
115    /// When enabled, the effective batch size is dynamically adjusted based on
116    /// the rate of `WouldBlock` events. The batch size is reduced when the
117    /// `WouldBlock` rate exceeds 30%, and increased when it drops below 10%.
118    /// Adjustments occur at most every 100 ms.
119    ///
120    /// Default: `false`.
121    ///
122    /// # Related
123    ///
124    /// * [`BingerUdp::recommended_batch_size`] — queries the current
125    ///   effective batch size.
126    #[must_use]
127    pub fn with_adaptive_batching(mut self, enabled: bool) -> Self {
128        self.adaptive_batching = enabled;
129        self
130    }
131
132    /// Enables or disables built-in metrics collection.
133    ///
134    /// When enabled, [`BingerUdp::metrics`] returns a reference to the
135    /// [`BingerMetrics`] instance with atomic counters for packets
136    /// sent/received, batch operations, syscalls, and error events.
137    ///
138    /// Default: `false`.
139    ///
140    /// Only available with the `metrics` feature.
141    #[cfg(feature = "metrics")]
142    #[must_use]
143    pub fn with_metrics(mut self, enabled: bool) -> Self {
144        self.metrics_enabled = enabled;
145        self
146    }
147}
148
149/// Reports which platform-optimized syscalls and features are available
150/// at compile time.
151///
152/// `PlatformCaps` is obtained via [`platform_capabilities()`] or
153/// [`BingerUdp::capabilities()`]. Use it to dynamically select code paths
154/// or to log which backend is active.
155///
156/// Some fields are conditionally compiled:
157///
158/// | Field | Platform / Feature |
159/// |-------|--------------------|
160/// | `supports_sendmsg_x`, `supports_recvmsg_x` | `target_os = "macos"` |
161/// | `supports_wsa_send_msg`, `supports_wsa_recv_msg` | `target_os = "windows"` |
162/// | `supports_timestamping` | `feature = "timestamping"` |
163/// | `supports_pktinfo` | `feature = "pktinfo"` |
164///
165/// # Example
166///
167/// ```rust
168/// use binger_udp::platform_capabilities;
169///
170/// let caps = platform_capabilities();
171/// println!("Backend: {}", caps.backend_name);
172/// ```
173#[allow(clippy::struct_excessive_bools)]
174pub struct PlatformCaps {
175    /// Whether `sendmmsg` is available (Linux only).
176    pub supports_sendmmsg: bool,
177    /// Whether `recvmmsg` is available (Linux only).
178    pub supports_recvmmsg: bool,
179    /// Whether `sendmsg_x` is available (macOS only, runtime-detected via dlsym).
180    #[cfg(target_os = "macos")]
181    pub supports_sendmsg_x: bool,
182    /// Whether `recvmsg_x` is available (macOS only, runtime-detected via dlsym).
183    #[cfg(target_os = "macos")]
184    pub supports_recvmsg_x: bool,
185    /// Whether `WSASendMsg` is available (Windows only, runtime-detected via `WSAIoctl`).
186    #[cfg(target_os = "windows")]
187    pub supports_wsa_send_msg: bool,
188    /// Whether `WSARecvMsg` is available (Windows only, runtime-detected via `WSAIoctl`).
189    #[cfg(target_os = "windows")]
190    pub supports_wsa_recv_msg: bool,
191    /// Whether Generic Segmentation Offload (GSO) is available (Linux, requires `gso` feature).
192    pub supports_gso: bool,
193    /// Whether Generic Receive Offload (GRO) is available (Linux, requires `gro` feature).
194    pub supports_gro: bool,
195    /// Whether `SO_BUSY_POLL` is available (Linux, requires `busy-poll` feature).
196    pub supports_busy_poll: bool,
197    /// Whether `SO_MAX_PACING_RATE` is available (Linux, requires `pacing` feature).
198    pub supports_pacing: bool,
199    /// Whether kernel timestamping is available (Linux, requires `timestamping` feature).
200    #[cfg(feature = "timestamping")]
201    pub supports_timestamping: bool,
202    /// Whether `IP_PKTINFO` / `IPV6_RECVPKTINFO` is available (Linux, requires `pktinfo` feature).
203    #[cfg(feature = "pktinfo")]
204    pub supports_pktinfo: bool,
205    /// Maximum batch size supported by the backend.
206    ///
207    /// Linux backends typically support up to 1024; other platforms up to 32.
208    pub max_batch_size: usize,
209    /// Human-readable name of the active backend.
210    ///
211    /// Examples: `"sendmmsg/recvmmsg (Linux)"`, `"fallback (loop sendto/recvfrom)"`.
212    pub backend_name: &'static str,
213}
214
215/// Returns the [`PlatformCaps`] for the current platform and enabled features.
216///
217/// This function is stateless — it returns compile-time constants that reflect
218/// the target OS and the feature flags the crate was built with.
219///
220/// # Example
221///
222/// ```rust
223/// use binger_udp::platform_capabilities;
224///
225/// let caps = platform_capabilities();
226/// assert!(!caps.backend_name.is_empty());
227/// ```
228#[must_use]
229pub fn platform_capabilities() -> PlatformCaps {
230    PlatformCaps {
231        supports_sendmmsg: cfg!(target_os = "linux"),
232        supports_recvmmsg: cfg!(target_os = "linux"),
233        #[cfg(target_os = "macos")]
234        supports_sendmsg_x: true,
235        #[cfg(target_os = "macos")]
236        supports_recvmsg_x: true,
237        #[cfg(target_os = "windows")]
238        supports_wsa_send_msg: true,
239        #[cfg(target_os = "windows")]
240        supports_wsa_recv_msg: true,
241        supports_gso: cfg!(all(target_os = "linux", feature = "gso")),
242        supports_gro: cfg!(all(target_os = "linux", feature = "gro")),
243        supports_busy_poll: cfg!(all(target_os = "linux", feature = "busy-poll")),
244        supports_pacing: cfg!(all(target_os = "linux", feature = "pacing")),
245        #[cfg(feature = "timestamping")]
246        supports_timestamping: cfg!(all(target_os = "linux", feature = "timestamping")),
247        #[cfg(feature = "pktinfo")]
248        supports_pktinfo: cfg!(all(target_os = "linux", feature = "pktinfo")),
249        max_batch_size: if cfg!(target_os = "linux") { 1024 } else { 32 },
250        backend_name: backends(),
251    }
252}
253
254const fn backends() -> &'static str {
255    #[cfg(target_os = "linux")]
256    {
257        "sendmmsg/recvmmsg (Linux)"
258    }
259    #[cfg(target_os = "macos")]
260    {
261        "sendmsg_x/recvmsg_x (macOS)"
262    }
263    #[cfg(target_os = "windows")]
264    {
265        "WSASendMsg/WSARecvMsg (Windows)"
266    }
267    #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
268    {
269        "fallback (loop sendto/recvfrom)"
270    }
271}
272
273struct AdaptiveState {
274    target_size: usize,
275    would_block_count: u64,
276    total_send_count: u64,
277    last_adjustment: std::time::Instant,
278}
279
280impl AdaptiveState {
281    const MIN_BATCH: usize = 1;
282    const MAX_BATCH: usize = 1024;
283    const ADJUSTMENT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
284
285    fn new(initial: usize) -> Self {
286        Self {
287            target_size: initial.clamp(Self::MIN_BATCH, Self::MAX_BATCH),
288            would_block_count: 0,
289            total_send_count: 0,
290            last_adjustment: std::time::Instant::now(),
291        }
292    }
293
294    fn record_would_block(&mut self) {
295        self.would_block_count += 1;
296        self.total_send_count += 1;
297    }
298
299    fn record_event(&mut self) {
300        self.total_send_count += 1;
301    }
302
303    #[allow(clippy::cast_precision_loss)]
304    fn maybe_adjust(&mut self) {
305        if self.last_adjustment.elapsed() < Self::ADJUSTMENT_INTERVAL {
306            return;
307        }
308        if self.total_send_count == 0 {
309            return;
310        }
311        let wb_rate = self.would_block_count as f64 / self.total_send_count as f64;
312        if wb_rate > 0.3 {
313            self.target_size = (self.target_size / 2).max(Self::MIN_BATCH);
314        } else if wb_rate < 0.1 && self.target_size < Self::MAX_BATCH {
315            self.target_size = (self.target_size * 3 / 2).min(Self::MAX_BATCH);
316        }
317        self.would_block_count = 0;
318        self.total_send_count = 0;
319        self.last_adjustment = std::time::Instant::now();
320    }
321
322    fn recommended_size(&self) -> usize {
323        self.target_size
324    }
325}
326
327/// A batch-native UDP socket that automatically selects the most efficient
328/// platform syscall for send and receive operations.
329///
330/// # Platform backends
331///
332/// | Platform | Send backend | Recv backend |
333/// |----------|-------------|--------------|
334/// | Linux (connected) | `sendmsg` with GSO | `recvmmsg` with GRO |
335/// | Linux (multi-dest) | `sendmmsg` | `recvmmsg` |
336/// | macOS | `sendmsg_x` (via dlsym) | `recvmsg_x` (via dlsym) |
337/// | Windows | `WSASendMsg` (via `WSAIoctl`) | `WSARecvMsg` (via `WSAIoctl`) |
338/// | Fallback | `sendto` (loop) | `recvfrom` (loop) |
339///
340/// # Async support
341///
342/// With the default `tokio` feature, all `async` methods use Tokio's
343/// readiness-based I/O ([`tokio::net::UdpSocket::readable`] /
344/// [`tokio::net::UdpSocket::writable`]) without busy-looping.
345///
346/// # Example
347///
348/// ```rust,no_run
349/// use binger_udp::{BingerUdp, SendBatch, RecvBatch, Config};
350///
351/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
352/// let socket = BingerUdp::from_std(
353///     std::net::UdpSocket::bind("0.0.0.0:0")?,
354///     Config::default(),
355/// )?;
356///
357/// let mut send = SendBatch::<32>::new();
358/// send.push(b"hello", "192.168.1.1:8080".parse().unwrap())?;
359/// socket.send_batch(&mut send).await?;
360/// # Ok(())
361/// # }
362/// ```
363pub struct BingerUdp {
364    fd: Fd,
365    #[cfg(feature = "tokio")]
366    tokio_sock: tokio::net::UdpSocket,
367    #[cfg(feature = "metrics")]
368    metrics: Option<BingerMetrics>,
369    adaptive_send: Option<std::sync::Mutex<AdaptiveState>>,
370    adaptive_recv: Option<std::sync::Mutex<AdaptiveState>>,
371}
372
373impl BingerUdp {
374    /// Get raw socket handle from a borrowed `UdpSocket`.
375    fn raw_fd_std(socket: &std::net::UdpSocket) -> Fd {
376        #[cfg(unix)]
377        {
378            socket.as_raw_fd()
379        }
380        #[cfg(windows)]
381        {
382            socket.as_raw_socket() as Fd
383        }
384    }
385
386    /// Get raw socket handle from an owned `UdpSocket` (consumes it).
387    #[cfg(not(feature = "tokio"))]
388    fn raw_fd_std_owned(socket: std::net::UdpSocket) -> Fd {
389        #[cfg(unix)]
390        {
391            socket.into_raw_fd()
392        }
393        #[cfg(windows)]
394        {
395            socket.into_raw_socket() as Fd
396        }
397    }
398
399    /// Creates a [`BingerUdp`] from a standard [`std::net::UdpSocket`] and
400    /// [`Config`].
401    ///
402    /// The socket is set to non-blocking mode. OS-level buffer sizes specified
403    /// in the config (`SO_SNDBUF`, `SO_RCVBUF`) are applied before constructing
404    /// the wrapper.
405    ///
406    /// With the default `tokio` feature, the socket is converted into a
407    /// [`tokio::net::UdpSocket`] for async readiness-based I/O.
408    ///
409    /// This is the primary (and only) way to create a `BingerUdp` instance.
410    ///
411    /// # Errors
412    ///
413    /// Returns an I/O error if:
414    /// * Setting the socket to non-blocking fails.
415    /// * Setting OS socket buffer sizes fails.
416    /// * Converting to a tokio socket fails (with `tokio` feature).
417    ///
418    /// # Example
419    ///
420    /// ```rust,no_run
421    /// use binger_udp::{BingerUdp, Config};
422    ///
423    /// let socket = BingerUdp::from_std(
424    ///     std::net::UdpSocket::bind("0.0.0.0:0").unwrap(),
425    ///     Config::default(),
426    /// ).unwrap();
427    /// ```
428    #[allow(clippy::needless_pass_by_value)]
429    pub fn from_std(socket: std::net::UdpSocket, config: Config) -> io::Result<Self> {
430        socket.set_nonblocking(true)?;
431
432        if let Some(size) = config.send_buf_size {
433            sockaddr::raw_setsockopt(
434                Self::raw_fd_std(&socket),
435                sys::SOL_SOCKET,
436                sys::SO_SNDBUF,
437                size as libc::c_int,
438            )?;
439        }
440        if let Some(size) = config.recv_os_buf_size {
441            sockaddr::raw_setsockopt(
442                Self::raw_fd_std(&socket),
443                sys::SOL_SOCKET,
444                sys::SO_RCVBUF,
445                size as libc::c_int,
446            )?;
447        }
448
449        #[cfg(feature = "tokio")]
450        let fd = Self::raw_fd_std(&socket);
451        #[cfg(feature = "tokio")]
452        let tokio_sock = tokio::net::UdpSocket::from_std(socket)?;
453
454        #[cfg(not(feature = "tokio"))]
455        let fd = Self::raw_fd_std_owned(socket);
456
457        Ok(Self {
458            fd,
459            #[cfg(feature = "tokio")]
460            tokio_sock,
461            #[cfg(feature = "metrics")]
462            metrics: if config.metrics_enabled {
463                Some(BingerMetrics::default())
464            } else {
465                None
466            },
467            adaptive_send: if config.adaptive_batching {
468                Some(std::sync::Mutex::new(AdaptiveState::new(config.batch_size)))
469            } else {
470                None
471            },
472            adaptive_recv: if config.adaptive_batching {
473                Some(std::sync::Mutex::new(AdaptiveState::new(config.batch_size)))
474            } else {
475                None
476            },
477        })
478    }
479
480    /// Sends all packets in the batch, retrying on `WouldBlock`.
481    ///
482    /// This is the primary send API. It calls [`BingerUdp::try_send_batch`] in a
483    /// loop, waiting for the socket to become writable whenever the kernel
484    /// returns `WouldBlock`.
485    ///
486    /// The `batch` argument can be a [`SendBatch<N>`](crate::batch::SendBatch)
487    /// or [`SendBatchRaw`] (the former dereferences via [`std::ops::DerefMut`]).
488    ///
489    /// Returns the total number of packets sent on success (always equal to
490    /// `batch.len()` since retries are transparent).
491    ///
492    /// # Errors
493    ///
494    /// Returns the underlying I/O error on failure. `WouldBlock` is handled
495    /// internally and never returned to the caller.
496    ///
497    /// # Related
498    ///
499    /// * [`BingerUdp::try_send_batch`] — non-blocking variant.
500    /// * [`BingerUdp::send_to`] — single-packet convenience wrapper.
501    pub async fn send_batch(&self, batch: &mut SendBatchRaw) -> io::Result<usize> {
502        loop {
503            match self.try_send_batch(batch) {
504                Ok(n) => return Ok(n),
505                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
506                    if let Some(ref state) = self.adaptive_send {
507                        if let Ok(mut s) = state.lock() {
508                            s.record_would_block();
509                            s.maybe_adjust();
510                        }
511                    }
512                    self.wait_writable().await?;
513                }
514                Err(e) => return Err(e),
515            }
516        }
517    }
518
519    /// Receives a batch of packets, retrying on `WouldBlock`.
520    ///
521    /// This is the primary receive API. It calls [`BingerUdp::try_recv_batch`]
522    /// in a loop, waiting for the socket to become readable whenever the kernel
523    /// returns `WouldBlock`.
524    ///
525    /// The `batch` argument can be a [`RecvBatch<N>`](crate::batch::RecvBatch)
526    /// or [`RecvBatchRaw`] (the former dereferences via [`std::ops::DerefMut`]).
527    ///
528    /// Returns the number of packets actually received (may be less than
529    /// `batch.capacity()`).
530    ///
531    /// # Errors
532    ///
533    /// Returns the underlying I/O error on failure. `WouldBlock` is handled
534    /// internally and never returned to the caller.
535    ///
536    /// # Related
537    ///
538    /// * [`BingerUdp::try_recv_batch`] — non-blocking variant.
539    /// * [`BingerUdp::recv_from`] — single-packet convenience wrapper.
540    pub async fn recv_batch(&self, batch: &mut RecvBatchRaw) -> io::Result<usize> {
541        loop {
542            match self.try_recv_batch(batch) {
543                Ok(0) => {
544                    if let Some(ref state) = self.adaptive_recv {
545                        if let Ok(mut s) = state.lock() {
546                            s.record_would_block();
547                            s.maybe_adjust();
548                        }
549                    }
550                    self.wait_readable().await?;
551                }
552                Ok(n) => return Ok(n),
553                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
554                    if let Some(ref state) = self.adaptive_recv {
555                        if let Ok(mut s) = state.lock() {
556                            s.record_would_block();
557                            s.maybe_adjust();
558                        }
559                    }
560                    self.wait_readable().await?;
561                }
562                Err(e) => return Err(e),
563            }
564        }
565    }
566
567    /// Attempts to send a batch of packets without retrying on `WouldBlock`.
568    ///
569    /// Unlike [`BingerUdp::send_batch`], this method returns
570    /// [`io::ErrorKind::WouldBlock`] immediately if the socket is not ready
571    /// for writing. This is useful for integrating with custom event loops or
572    /// `select!`-based concurrency.
573    ///
574    /// The `batch` argument can be a [`SendBatch<N>`](crate::batch::SendBatch)
575    /// or [`SendBatchRaw`].
576    ///
577    /// Returns the number of packets actually sent (may be less than
578    /// `batch.len()` on a partial write, or 0 if `WouldBlock`).
579    ///
580    /// # Errors
581    ///
582    /// Returns the underlying I/O error on failure, including `WouldBlock`.
583    ///
584    /// # Related
585    ///
586    /// * [`BingerUdp::send_batch`] — retry-on-WouldBlock variant.
587    pub fn try_send_batch(&self, batch: &mut SendBatchRaw) -> io::Result<usize> {
588        #[cfg(not(feature = "metrics"))]
589        let sent = crate::platform::try_send_batch(self.fd, batch)?;
590        #[cfg(feature = "metrics")]
591        let sent = crate::platform::try_send_batch(self.fd, batch).map_err(|e| {
592            if let Some(ref m) = self.metrics {
593                m.inc_send_errors();
594                if e.kind() == io::ErrorKind::WouldBlock {
595                    m.inc_send_would_block();
596                }
597            }
598            e
599        })?;
600
601        if let Some(ref state) = self.adaptive_send {
602            if let Ok(mut s) = state.lock() {
603                s.record_event();
604            }
605        }
606
607        #[cfg(feature = "metrics")]
608        if let Some(ref m) = self.metrics {
609            m.inc_packets_sent(sent as u64);
610            m.inc_batches_sent();
611            m.inc_send_syscalls();
612        }
613
614        Ok(sent)
615    }
616
617    /// Attempts to receive a batch of packets without retrying on `WouldBlock`.
618    ///
619    /// Unlike [`BingerUdp::recv_batch`], this method returns
620    /// [`io::ErrorKind::WouldBlock`] immediately if the socket is not ready
621    /// for reading.
622    ///
623    /// The `batch` argument can be a [`RecvBatch<N>`](crate::batch::RecvBatch)
624    /// or [`RecvBatchRaw`].
625    ///
626    /// Returns the number of packets actually received (0 if `WouldBlock`).
627    ///
628    /// # Errors
629    ///
630    /// Returns the underlying I/O error on failure, including `WouldBlock`.
631    ///
632    /// # Related
633    ///
634    /// * [`BingerUdp::recv_batch`] — retry-on-WouldBlock variant.
635    pub fn try_recv_batch(&self, batch: &mut RecvBatchRaw) -> io::Result<usize> {
636        #[cfg(not(feature = "metrics"))]
637        let received = crate::platform::try_recv_batch(self.fd, batch)?;
638        #[cfg(feature = "metrics")]
639        let received = crate::platform::try_recv_batch(self.fd, batch).map_err(|e| {
640            if let Some(ref m) = self.metrics {
641                m.inc_recv_errors();
642                if e.kind() == io::ErrorKind::WouldBlock {
643                    m.inc_recv_would_block();
644                }
645            }
646            e
647        })?;
648
649        if let Some(ref state) = self.adaptive_recv {
650            if let Ok(mut s) = state.lock() {
651                s.record_event();
652            }
653        }
654
655        #[cfg(feature = "metrics")]
656        if let Some(ref m) = self.metrics {
657            m.inc_packets_received(received as u64);
658            m.inc_batches_received();
659            m.inc_recv_syscalls();
660        }
661
662        Ok(received)
663    }
664
665    /// Sends a single UDP datagram to the specified address, retrying on
666    /// `WouldBlock`.
667    ///
668    /// This is a convenience wrapper around [`BingerUdp::send_batch`] that
669    /// constructs a single-element batch internally.
670    ///
671    /// # Errors
672    ///
673    /// Returns the underlying I/O error on failure.
674    ///
675    /// # Panics
676    ///
677    /// Panics if the internal single-element batch capacity is exceeded, which
678    /// should never happen.
679    ///
680    /// # Related
681    ///
682    /// * [`BingerUdp::send_batch`] — batch variant for higher throughput.
683    /// * [`BingerUdp::try_send_to`] — non-blocking single-packet variant.
684    pub async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> {
685        let mut batch = SendBatchRaw::with_capacity(1);
686        batch.push(buf, Some(addr)).expect("capacity 1");
687        self.send_batch(&mut batch).await?;
688        Ok(buf.len())
689    }
690
691    /// Receives a single UDP datagram into the buffer, retrying on `WouldBlock`.
692    ///
693    /// This is a convenience wrapper around [`BingerUdp::recv_batch`] that
694    /// constructs a single-element batch internally.
695    ///
696    /// Returns the number of bytes received and the source address.
697    ///
698    /// # Errors
699    ///
700    /// Returns the underlying I/O error on failure.
701    ///
702    /// # Related
703    ///
704    /// * [`BingerUdp::recv_batch`] — batch variant for higher throughput.
705    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
706        let mut batch = RecvBatchRaw::with_capacity(1, buf.len());
707        self.recv_batch(&mut batch).await?;
708        let data = batch.data(0);
709        let addr = batch.addr(0);
710        let len = data.len().min(buf.len());
711        buf[..len].copy_from_slice(&data[..len]);
712        Ok((len, addr))
713    }
714
715    /// Attempts to send a single UDP datagram without retrying on `WouldBlock`.
716    ///
717    /// This is a convenience wrapper around [`BingerUdp::try_send_batch`] that
718    /// constructs a single-element batch internally.
719    ///
720    /// # Errors
721    ///
722    /// Returns the underlying I/O error on failure, including `WouldBlock`.
723    ///
724    /// # Panics
725    ///
726    /// Panics if the internal single-element batch capacity is exceeded, which
727    /// should never happen.
728    ///
729    /// # Related
730    ///
731    /// * [`BingerUdp::send_to`] — retry-on-WouldBlock variant.
732    /// * [`BingerUdp::try_send_batch`] — batch variant.
733    pub fn try_send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> {
734        let mut batch = SendBatchRaw::with_capacity(1);
735        batch.push(buf, Some(addr)).expect("capacity 1");
736        self.try_send_batch(&mut batch)?;
737        Ok(buf.len())
738    }
739
740    /// Connects the UDP socket to a remote address.
741    ///
742    /// A connected UDP socket can only send to and receive from that address.
743    /// This is required for GSO (Generic Segmentation Offload) on Linux.
744    ///
745    /// # Errors
746    ///
747    /// Returns the underlying I/O error on failure (e.g., invalid address).
748    pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
749        sockaddr::raw_connect(self.fd, addr)
750    }
751
752    /// Returns the local socket address.
753    ///
754    /// # Errors
755    ///
756    /// Returns the underlying IO error on failure.
757    pub fn local_addr(&self) -> io::Result<SocketAddr> {
758        sockaddr::raw_getsockname(self.fd)
759    }
760
761    /// Returns the TTL of the socket.
762    ///
763    /// # Errors
764    ///
765    /// Returns the underlying IO error on failure.
766    pub fn ttl(&self) -> io::Result<u32> {
767        sockaddr::raw_getsockopt(self.fd, sys::IPPROTO_IP, sys::IP_TTL).map(|v| v as u32)
768    }
769
770    /// Sets the TTL of the socket.
771    ///
772    /// # Errors
773    ///
774    /// Returns the underlying IO error on failure.
775    pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
776        sockaddr::raw_setsockopt(self.fd, sys::IPPROTO_IP, sys::IP_TTL, ttl as libc::c_int)
777    }
778
779    /// Returns the underlying file descriptor (or socket handle on Windows).
780    ///
781    /// On Unix, this is a `RawFd` (`c_int`). On Windows, this is a `SOCKET`
782    /// (`usize`).
783    ///
784    /// Use this to pass the socket to low-level system calls or integration
785    /// with other I/O libraries.
786    #[must_use]
787    pub fn as_raw_fd(&self) -> Fd {
788        self.fd
789    }
790
791    /// Enables or disables Generic Segmentation Offload (GSO).
792    ///
793    /// GSO allows the kernel to segment a large UDP datagram into MTU-sized
794    /// segments, reducing the number of user-to-kernel transitions. Requires a
795    /// connected socket (see [`BingerUdp::connect`]).
796    ///
797    /// Only available on Linux with the `gso` feature.
798    ///
799    /// # Errors
800    ///
801    /// Returns an I/O error if the setsockopt call fails.
802    #[cfg(all(target_os = "linux", feature = "gso"))]
803    pub fn set_gso(&self, enabled: bool) -> io::Result<()> {
804        sockaddr::raw_setsockopt(
805            self.fd,
806            libc::IPPROTO_UDP,
807            libc::UDP_SEGMENT,
808            i32::from(enabled),
809        )
810    }
811
812    /// Enables or disables Generic Receive Offload (GRO).
813    ///
814    /// GRO allows the kernel to coalesce multiple incoming UDP datagrams into
815    /// a single large buffer, reducing the number of receive syscalls.
816    ///
817    /// Only available on Linux with the `gro` feature.
818    ///
819    /// # Errors
820    ///
821    /// Returns an I/O error if the setsockopt call fails.
822    #[cfg(all(target_os = "linux", feature = "gro"))]
823    pub fn set_gro(&self, enabled: bool) -> io::Result<()> {
824        sockaddr::raw_setsockopt(
825            self.fd,
826            libc::IPPROTO_UDP,
827            libc::UDP_GRO,
828            i32::from(enabled),
829        )
830    }
831
832    /// Sends a large datagram segmented by GSO in a single syscall
833    /// (non-blocking).
834    ///
835    /// The kernel fragments `data` into segments of `segment_size` bytes each
836    /// (the last segment may be smaller). The socket must be connected (see
837    /// [`BingerUdp::connect`]) and GSO must be enabled via
838    /// [`BingerUdp::set_gso(true)`](BingerUdp::set_gso).
839    ///
840    /// Only available on Linux with the `gso` feature and without `miri-safe`.
841    ///
842    /// Returns the total number of bytes sent (the entire `data` payload on
843    /// success).
844    ///
845    /// # Errors
846    ///
847    /// Returns the underlying I/O error on failure, including `WouldBlock`.
848    #[cfg(all(target_os = "linux", feature = "gso", not(feature = "miri-safe")))]
849    pub fn try_send_gso(&self, data: &[u8], segment_size: u16) -> io::Result<usize> {
850        crate::platform::try_send_gso(self.fd, data, segment_size)
851    }
852
853    /// Sends a large datagram segmented by GSO, retrying on `WouldBlock`.
854    ///
855    /// Calls [`BingerUdp::try_send_gso`] in a loop, waiting for the socket
856    /// to become writable on `WouldBlock`.
857    ///
858    /// Only available on Linux with the `gso` feature and without `miri-safe`.
859    ///
860    /// # Errors
861    ///
862    /// Returns the underlying I/O error on failure. `WouldBlock` is handled
863    /// internally.
864    #[cfg(all(target_os = "linux", feature = "gso", not(feature = "miri-safe")))]
865    pub async fn send_gso(&self, data: &[u8], segment_size: u16) -> io::Result<usize> {
866        loop {
867            match self.try_send_gso(data, segment_size) {
868                Ok(n) => return Ok(n),
869                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
870                    self.wait_writable().await?;
871                }
872                Err(e) => return Err(e),
873            }
874        }
875    }
876
877    /// Sets the maximum pacing rate for UDP sends.
878    ///
879    /// `bytes_per_sec` is the rate in bytes per second. Set to `0` to disable
880    /// pacing. Requires the `fq` qdisc (fair queueing) on the egress path.
881    ///
882    /// Only available on Linux with the `pacing` feature.
883    ///
884    /// # Errors
885    ///
886    /// Returns the underlying I/O error if setsockopt fails.
887    #[cfg(all(target_os = "linux", feature = "pacing"))]
888    pub fn set_pacing_rate(&self, bytes_per_sec: u32) -> io::Result<()> {
889        sockaddr::raw_setsockopt_u32(
890            self.fd,
891            libc::SOL_SOCKET,
892            crate::sys::SO_MAX_PACING_RATE,
893            bytes_per_sec,
894        )
895    }
896
897    /// Enables busy-polling for ultra-low latency (`SO_BUSY_POLL`).
898    ///
899    /// `usecs` is the time in microseconds to busy-poll. Higher values reduce
900    /// latency at the cost of CPU usage. Requires `CAP_NET_ADMIN` or the
901    /// `net.core.busy_poll` sysctl to be configured globally.
902    ///
903    /// Only available on Linux with the `busy-poll` feature.
904    ///
905    /// # Errors
906    ///
907    /// Returns the underlying I/O error if setsockopt fails.
908    #[cfg(all(target_os = "linux", feature = "busy-poll"))]
909    pub fn set_busy_poll(&self, usecs: u32) -> io::Result<()> {
910        sockaddr::raw_setsockopt(
911            self.fd,
912            libc::SOL_SOCKET,
913            crate::sys::SO_BUSY_POLL,
914            usecs as libc::c_int,
915        )
916    }
917
918    /// Enables software receive timestamping (`SO_TIMESTAMPNS`).
919    ///
920    /// When enabled, each received packet carries a kernel timestamp
921    /// accessible via [`RecvBatch::timestamp`](crate::batch::RecvBatch::timestamp).
922    ///
923    /// # Errors
924    ///
925    /// Returns the underlying IO error if setsockopt fails.
926    #[cfg(all(target_os = "linux", feature = "timestamping"))]
927    pub fn enable_timestamping(&self, enabled: bool) -> io::Result<()> {
928        sockaddr::raw_setsockopt(
929            self.fd,
930            libc::SOL_SOCKET,
931            crate::sys::SO_TIMESTAMPNS,
932            i32::from(enabled),
933        )
934    }
935
936    /// Enables `IP_PKTINFO` / `IPV6_RECVPKTINFO` for receiving destination addresses.
937    ///
938    /// When enabled, each received packet's destination address is available
939    /// via [`RecvBatch::dst_addr`](crate::batch::RecvBatch::dst_addr).
940    ///
941    /// # Errors
942    ///
943    /// Returns the underlying IO error if setsockopt fails.
944    #[cfg(all(target_os = "linux", feature = "pktinfo"))]
945    pub fn enable_pktinfo(&self, enabled: bool) -> io::Result<()> {
946        sockaddr::raw_setsockopt(
947            self.fd,
948            libc::IPPROTO_IP,
949            libc::IP_PKTINFO,
950            i32::from(enabled),
951        )?;
952        sockaddr::raw_setsockopt(
953            self.fd,
954            libc::IPPROTO_IPV6,
955            libc::IPV6_RECVPKTINFO,
956            i32::from(enabled),
957        )
958    }
959
960    /// Returns the [`PlatformCaps`] for the current platform and enabled features.
961    ///
962    /// This is a shorthand for calling [`platform_capabilities()`] directly.
963    #[must_use]
964    pub fn capabilities(&self) -> PlatformCaps {
965        platform_capabilities()
966    }
967
968    /// Returns the recommended batch size based on adaptive batching state.
969    ///
970    /// When adaptive batching is enabled via
971    /// [`Config::with_adaptive_batching`], this returns the dynamically
972    /// adjusted batch size (pass `true` at construction time). The adjustment happens at most every 100 ms:
973    ///
974    /// * When the `WouldBlock` rate exceeds 30%, the batch size is halved.
975    /// * When the rate drops below 10%, the batch size is increased by 50%.
976    /// * The batch size is clamped to the range `[1, 1024]`.
977    ///
978    /// If adaptive batching is disabled, returns a fixed default of `32`.
979    ///
980    /// # Panics
981    #[must_use]
982    pub fn recommended_batch_size(&self) -> usize {
983        self.adaptive_send
984            .as_ref()
985            .and_then(|s| s.lock().ok())
986            .map_or(32, |g| g.recommended_size())
987    }
988
989    /// Returns a reference to the [`BingerMetrics`] instance, if metrics
990    /// collection is enabled.
991    ///
992    /// Metrics must be enabled at construction time via
993    /// [`Config::with_metrics`]. This requires the `metrics` feature.
994    ///
995    /// Returns `None` if metrics are disabled.
996    #[cfg(feature = "metrics")]
997    #[must_use]
998    pub fn metrics(&self) -> Option<&BingerMetrics> {
999        self.metrics.as_ref()
1000    }
1001
1002    /// Waits until the socket becomes readable.
1003    ///
1004    /// This is a direct wrapper around
1005    /// [`tokio::net::UdpSocket::readable()`]. Use this in `tokio::select!`
1006    /// to be notified when data is available for
1007    /// [`BingerUdp::recv_batch`] or [`BingerUdp::recv_from`].
1008    ///
1009    /// Only available with the default `tokio` feature.
1010    ///
1011    /// # Errors
1012    ///
1013    /// Returns the underlying I/O error on failure.
1014    #[cfg(feature = "tokio")]
1015    pub async fn readable(&self) -> io::Result<()> {
1016        self.tokio_sock.readable().await
1017    }
1018
1019    /// Waits until the socket becomes writable.
1020    ///
1021    /// This is a direct wrapper around
1022    /// [`tokio::net::UdpSocket::writable()`]. Use this in `tokio::select!`
1023    /// to be notified when the socket can accept data for
1024    /// [`BingerUdp::send_batch`] or [`BingerUdp::send_to`].
1025    ///
1026    /// Only available with the default `tokio` feature.
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns the underlying I/O error on failure.
1031    #[cfg(feature = "tokio")]
1032    pub async fn writable(&self) -> io::Result<()> {
1033        self.tokio_sock.writable().await
1034    }
1035
1036    #[cfg(feature = "tokio")]
1037    async fn wait_writable(&self) -> io::Result<()> {
1038        self.tokio_sock.writable().await?;
1039        Ok(())
1040    }
1041
1042    #[cfg(feature = "tokio")]
1043    async fn wait_readable(&self) -> io::Result<()> {
1044        self.tokio_sock.readable().await?;
1045        Ok(())
1046    }
1047
1048    #[cfg(not(feature = "tokio"))]
1049    async fn wait_writable(&self) -> io::Result<()> {
1050        Err(io::Error::new(
1051            io::ErrorKind::Other,
1052            "tokio feature disabled",
1053        ))
1054    }
1055
1056    #[cfg(not(feature = "tokio"))]
1057    async fn wait_readable(&self) -> io::Result<()> {
1058        Err(io::Error::new(
1059            io::ErrorKind::Other,
1060            "tokio feature disabled",
1061        ))
1062    }
1063}
1064
1065#[cfg(not(feature = "tokio"))]
1066impl Drop for BingerUdp {
1067    /// Closes the underlying file descriptor.
1068    fn drop(&mut self) {
1069        sys::close_fd(self.fd);
1070    }
1071}
1072
1073// SAFETY: BingerUdp wraps a raw fd and optionally a tokio UdpSocket.
1074// Both are safe to send/share across threads — the fd is used with
1075// thread-safe syscalls, and tokio::net::UdpSocket is Send + Sync.
1076unsafe impl Send for BingerUdp {}
1077unsafe impl Sync for BingerUdp {}