Skip to main content

binger_udp/
batch.rs

1use std::net::SocketAddr;
2
3use crate::error::BingerError;
4
5/// A kernel-provided software timestamp for a received UDP datagram.
6///
7/// Represents the [`libc::timespec`] structure attached to a packet when the
8/// `SO_TIMESTAMPNS` socket option is enabled. The timestamp is recorded
9/// by the network stack at the moment of reception.
10///
11/// Only available on Linux with the `timestamping` feature enabled.
12///
13/// # Fields
14///
15/// * `tv_sec` — Seconds since the Unix epoch.
16/// * `tv_nsec` — Sub-second nanoseconds.
17///
18/// # Related
19///
20/// * [`RecvBatch::timestamp`] — retrieves the timestamp for a received packet.
21/// * `BingerUdp::enable_timestamping` — enables kernel timestamping on the socket (Linux only).
22#[cfg(feature = "timestamping")]
23#[derive(Clone, Copy, Debug, Default)]
24pub struct Timestamp {
25    pub tv_sec: i64,
26    pub tv_nsec: i64,
27}
28
29#[cfg(feature = "timestamping")]
30impl Timestamp {
31    /// Converts the timestamp to a [`std::time::Duration`] relative to the Unix epoch.
32    ///
33    /// This is useful for comparing timestamps or computing inter-packet arrival
34    /// times. The duration is always non-negative.
35    #[must_use]
36    pub fn as_duration(&self) -> std::time::Duration {
37        std::time::Duration::new(self.tv_sec as u64, self.tv_nsec as u32)
38    }
39}
40
41/// A fixed-capacity batch of outgoing UDP datagrams.
42///
43/// The capacity `N` is a compile-time constant, allowing the batch to be
44/// allocated on the stack with no heap allocation in the hot path.
45///
46/// `SendBatch` dereferences to [`SendBatchRaw`] via [`std::ops::DerefMut`], so it can be
47/// passed directly to [`BingerUdp::send_batch`](crate::socket::BingerUdp::send_batch)
48/// and [`BingerUdp::try_send_batch`](crate::socket::BingerUdp::try_send_batch).
49///
50/// # Example
51///
52/// ```rust,no_run
53/// use binger_udp::SendBatch;
54///
55/// let mut batch = SendBatch::<32>::new();
56/// let addr = "192.168.1.1:8080".parse().unwrap();
57/// batch.push(b"hello", addr).unwrap();
58/// assert_eq!(batch.len(), 1);
59/// ```
60#[allow(clippy::module_name_repetitions)]
61pub struct SendBatch<const N: usize> {
62    raw: SendBatchRaw,
63}
64
65impl<const N: usize> SendBatch<N> {
66    /// Creates a new empty send batch with capacity `N`.
67    ///
68    /// No heap allocation occurs — the internal slot array is pre-allocated
69    /// once to the compile-time capacity.
70    #[must_use]
71    pub fn new() -> Self {
72        Self {
73            raw: SendBatchRaw::with_capacity(N),
74        }
75    }
76
77    /// Pushes a buffer and destination address into the send batch.
78    ///
79    /// Use this method for connectionless (multi-destination) sends. Each call
80    /// adds one datagram to the batch.
81    ///
82    /// # Errors
83    ///
84    /// Returns [`BingerError::BatchFull`] if the batch has reached its capacity
85    /// of `N` entries.
86    ///
87    /// # Related
88    ///
89    /// * [`SendBatch::push_connected`] — push without a destination for
90    ///   pre-connected sockets.
91    pub fn push(&mut self, buf: &[u8], addr: SocketAddr) -> Result<(), BingerError> {
92        self.raw.push(buf, Some(addr))
93    }
94
95    /// Pushes a buffer into the send batch for a pre-connected socket.
96    ///
97    /// Unlike [`SendBatch::push`], this variant omits the destination address
98    /// because the socket is already connected. This is required when using
99    /// GSO (Generic Segmentation Offload) on Linux.
100    ///
101    /// # Errors
102    ///
103    /// Returns [`BingerError::BatchFull`] if the batch has reached its capacity
104    /// of `N` entries.
105    pub fn push_connected(&mut self, buf: &[u8]) -> Result<(), BingerError> {
106        self.raw.push(buf, None)
107    }
108
109    /// Returns the number of packets currently in the batch.
110    #[must_use]
111    pub fn len(&self) -> usize {
112        self.raw.len()
113    }
114
115    /// Returns `true` if the batch contains no packets.
116    #[must_use]
117    pub fn is_empty(&self) -> bool {
118        self.len() == 0
119    }
120
121    /// Returns the maximum number of packets this batch can hold.
122    ///
123    /// This is always the compile-time constant `N`.
124    #[must_use]
125    pub const fn capacity(&self) -> usize {
126        N
127    }
128
129    /// Removes all packets from the batch, resetting it for reuse.
130    ///
131    /// After calling `clear`, [`SendBatch::len`] returns `0` and the batch
132    /// can be re-filled with new packets. No memory is deallocated.
133    pub fn clear(&mut self) {
134        self.raw.clear();
135    }
136}
137
138impl<const N: usize> Default for SendBatch<N> {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl<const N: usize> std::ops::Deref for SendBatch<N> {
145    type Target = SendBatchRaw;
146
147    /// Delegates to [`SendBatchRaw`].
148    fn deref(&self) -> &Self::Target {
149        &self.raw
150    }
151}
152
153impl<const N: usize> std::ops::DerefMut for SendBatch<N> {
154    /// Delegates to [`SendBatchRaw`].
155    fn deref_mut(&mut self) -> &mut Self::Target {
156        &mut self.raw
157    }
158}
159
160/// A fixed-capacity batch for receiving UDP datagrams.
161///
162/// The capacity `N` is a compile-time constant. Each slot is pre-allocated
163/// with a buffer of `buf_size` bytes, so no allocation occurs during receive.
164///
165/// `RecvBatch` dereferences to [`RecvBatchRaw`] via [`std::ops::DerefMut`], so it can be
166/// passed directly to [`BingerUdp::recv_batch`](crate::socket::BingerUdp::recv_batch)
167/// and [`BingerUdp::try_recv_batch`](crate::socket::BingerUdp::try_recv_batch).
168///
169/// # Example
170///
171/// ```rust,no_run
172/// use binger_udp::RecvBatch;
173///
174/// let mut batch = RecvBatch::<32>::new(2048);
175/// // After recv_batch fills the batch:
176/// for i in 0..batch.len() {
177///     let data = batch.data(i);
178///     let addr = batch.addr(i);
179/// }
180/// ```
181#[allow(clippy::module_name_repetitions)]
182pub struct RecvBatch<const N: usize> {
183    raw: RecvBatchRaw,
184}
185
186impl<const N: usize> RecvBatch<N> {
187    /// Creates a new empty receive batch with capacity `N`.
188    ///
189    /// Each of the `N` slots is pre-allocated with a buffer of `buf_size` bytes.
190    /// This allocation happens once at construction time; no further allocation
191    /// occurs during receive operations.
192    ///
193    /// `buf_size` should be large enough to hold the largest expected datagram.
194    /// Datagrams exceeding `buf_size` will be truncated.
195    #[must_use]
196    pub fn new(buf_size: usize) -> Self {
197        Self {
198            raw: RecvBatchRaw::with_capacity(N, buf_size),
199        }
200    }
201
202    /// Returns the number of packets received in the last batch operation.
203    #[must_use]
204    pub fn len(&self) -> usize {
205        self.raw.len()
206    }
207
208    /// Returns `true` if no packets were received in the last batch operation.
209    #[must_use]
210    pub fn is_empty(&self) -> bool {
211        self.len() == 0
212    }
213
214    /// Returns the maximum number of packets this batch can hold.
215    ///
216    /// This is always the compile-time constant `N`.
217    #[must_use]
218    pub const fn capacity(&self) -> usize {
219        N
220    }
221
222    /// Returns the data payload of the packet at index `idx`.
223    ///
224    /// The returned slice length matches the actual received datagram size,
225    /// not the buffer capacity.
226    ///
227    /// # Panics
228    ///
229    /// Panics if `idx` is out of bounds.
230    #[must_use]
231    pub fn data(&self, idx: usize) -> &[u8] {
232        self.raw.data(idx)
233    }
234
235    /// Returns the source address of the packet at index `idx`.
236    ///
237    /// # Panics
238    ///
239    /// Panics if `idx` is out of bounds.
240    #[must_use]
241    pub fn addr(&self, idx: usize) -> SocketAddr {
242        self.raw.addr(idx)
243    }
244
245    /// Removes all received packets, resetting the batch for reuse.
246    ///
247    /// After `clear`, [`RecvBatch::len`] returns `0`. Internal buffers are
248    /// retained and will be overwritten on the next receive.
249    pub fn clear(&mut self) {
250        self.raw.clear();
251    }
252
253    /// Iterates over all received packets, yielding `(data, source_addr)` pairs.
254    pub fn iter(&self) -> impl Iterator<Item = (&[u8], SocketAddr)> + '_ {
255        (0..self.len()).map(|i| (self.data(i), self.addr(i)))
256    }
257
258    /// Returns the kernel-provided software timestamp for the packet at `idx`,
259    /// if available.
260    ///
261    /// Requires the `timestamping` feature and `BingerUdp::enable_timestamping(true)`
262    /// to be called on the socket before receiving (Linux only).
263    ///
264    /// # Panics
265    ///
266    /// Panics if `idx` is out of bounds.
267    #[cfg(feature = "timestamping")]
268    #[must_use]
269    pub fn timestamp(&self, idx: usize) -> Option<Timestamp> {
270        self.raw.timestamp(idx)
271    }
272
273    /// Returns the destination address (local interface address) for the packet
274    /// at `idx`, if available.
275    ///
276    /// Requires the `pktinfo` feature and `BingerUdp::enable_pktinfo(true)`
277    /// to be called on the socket before receiving (Linux only). Useful for servers that
278    /// need to know which local address a packet was sent to (e.g., multi-homed
279    /// hosts).
280    ///
281    /// # Panics
282    ///
283    /// Panics if `idx` is out of bounds.
284    #[cfg(feature = "pktinfo")]
285    #[must_use]
286    pub fn dst_addr(&self, idx: usize) -> Option<SocketAddr> {
287        self.raw.dst_addr(idx)
288    }
289}
290
291impl<const N: usize> std::ops::Deref for RecvBatch<N> {
292    type Target = RecvBatchRaw;
293
294    /// Delegates to [`RecvBatchRaw`].
295    fn deref(&self) -> &Self::Target {
296        &self.raw
297    }
298}
299
300impl<const N: usize> std::ops::DerefMut for RecvBatch<N> {
301    /// Delegates to [`RecvBatchRaw`].
302    fn deref_mut(&mut self) -> &mut Self::Target {
303        &mut self.raw
304    }
305}
306
307struct SendSlot {
308    data_ptr: *const u8,
309    data_len: usize,
310    addr: Option<SocketAddr>,
311    _marker: std::marker::PhantomData<*const [u8]>,
312}
313
314/// A raw send batch with dynamic capacity.
315///
316/// This is the underlying type behind [`SendBatch<N>`]. It manages a
317/// dynamically-allocated slot array. The typed [`SendBatch<N>`] wrapper
318/// provides a compile-time capacity via [`std::ops::DerefMut`] to this type.
319///
320/// Most users should use [`SendBatch<N>`] instead.
321pub struct SendBatchRaw {
322    slots: Vec<SendSlot>,
323    len: usize,
324}
325
326impl SendBatchRaw {
327    /// Creates a new raw send batch with the given capacity.
328    ///
329    /// The internal slot vector is pre-allocated to `capacity` elements.
330    #[must_use]
331    pub fn with_capacity(capacity: usize) -> Self {
332        Self {
333            slots: Vec::with_capacity(capacity),
334            len: 0,
335        }
336    }
337
338    /// Pushes a buffer and optional destination address into the raw send batch.
339    ///
340    /// When `addr` is `None`, the packet will be sent on a connected socket
341    /// (equivalent to [`SendBatch::push_connected`]). When `addr` is `Some(...)`,
342    /// the packet is sent to that specific address (connectionless mode).
343    ///
344    /// # Errors
345    ///
346    /// Returns [`BingerError::BatchFull`] if the batch has reached its capacity.
347    pub fn push(&mut self, data: &[u8], addr: Option<SocketAddr>) -> Result<(), BingerError> {
348        if self.len >= self.slots.capacity() {
349            return Err(BingerError::BatchFull {
350                capacity: self.slots.capacity(),
351            });
352        }
353        let slot = SendSlot {
354            data_ptr: data.as_ptr(),
355            data_len: data.len(),
356            addr,
357            _marker: std::marker::PhantomData,
358        };
359        if self.slots.len() <= self.len {
360            self.slots.push(slot);
361        } else {
362            self.slots[self.len] = slot;
363        }
364        self.len += 1;
365        Ok(())
366    }
367
368    /// Returns the number of packets currently in the batch.
369    #[must_use]
370    pub fn len(&self) -> usize {
371        self.len
372    }
373
374    /// Returns `true` if the batch contains no packets.
375    #[must_use]
376    pub fn is_empty(&self) -> bool {
377        self.len == 0
378    }
379
380    /// Returns the data and optional destination address at index `idx`.
381    ///
382    /// # Panics
383    ///
384    /// Panics if `idx` is out of bounds.
385    #[must_use]
386    pub fn entry(&self, idx: usize) -> (&[u8], Option<SocketAddr>) {
387        let slot = &self.slots[idx];
388        let data = unsafe { std::slice::from_raw_parts(slot.data_ptr, slot.data_len) };
389        (data, slot.addr)
390    }
391
392    /// Removes all packets from the batch, resetting it for reuse.
393    ///
394    /// No memory is deallocated.
395    pub fn clear(&mut self) {
396        self.len = 0;
397    }
398}
399
400struct RecvSlot {
401    buf: Vec<u8>,
402    addr: SocketAddr,
403    recv_len: u16,
404    #[cfg(feature = "timestamping")]
405    timestamp: Option<Timestamp>,
406    #[cfg(feature = "pktinfo")]
407    dst_addr: Option<SocketAddr>,
408}
409
410/// A raw receive batch with dynamic capacity.
411///
412/// This is the underlying type behind [`RecvBatch<N>`]. It manages a
413/// dynamically-allocated array of receive slots, each with a pre-allocated
414/// buffer. The typed [`RecvBatch<N>`] wrapper provides a compile-time capacity
415/// via [`std::ops::DerefMut`] to this type.
416///
417/// Most users should use [`RecvBatch<N>`] instead.
418pub struct RecvBatchRaw {
419    slots: Vec<RecvSlot>,
420    len: usize,
421}
422
423impl RecvBatchRaw {
424    /// Creates a new raw receive batch with the given capacity and buffer size.
425    ///
426    /// All `capacity` slots are pre-allocated, each with a buffer of `buf_size`
427    /// bytes. This allocation happens once at construction.
428    #[must_use]
429    pub fn with_capacity(capacity: usize, buf_size: usize) -> Self {
430        let slots = (0..capacity)
431            .map(|_| RecvSlot {
432                buf: vec![0u8; buf_size],
433                addr: SocketAddr::from(([0, 0, 0, 0], 0)),
434                recv_len: 0,
435                #[cfg(feature = "timestamping")]
436                timestamp: None,
437                #[cfg(feature = "pktinfo")]
438                dst_addr: None,
439            })
440            .collect();
441        Self { slots, len: 0 }
442    }
443
444    /// Returns the maximum number of packets this batch can hold.
445    #[must_use]
446    pub fn capacity(&self) -> usize {
447        self.slots.len()
448    }
449
450    /// Returns the number of packets received in the last batch operation.
451    #[must_use]
452    pub fn len(&self) -> usize {
453        self.len
454    }
455
456    /// Returns `true` if no packets were received in the last batch operation.
457    #[must_use]
458    pub fn is_empty(&self) -> bool {
459        self.len == 0
460    }
461
462    /// Sets the number of received packets.
463    ///
464    /// This is called internally by the platform receive functions to record
465    /// how many packets were received.
466    pub fn set_len(&mut self, len: usize) {
467        self.len = len;
468    }
469
470    /// Sets the actual received byte count for slot `idx`.
471    ///
472    /// # Safety
473    ///
474    /// `idx` must be within bounds and `n` must not exceed the buffer size
475    /// of the slot at `idx`.
476    pub unsafe fn set_recv_len(&mut self, idx: usize, n: usize) {
477        self.slots[idx].recv_len = n as u16;
478    }
479
480    /// Returns a mutable reference to the buffer and source address of slot `idx`.
481    ///
482    /// This is used internally by the platform receive functions to populate
483    /// the received data and its source address.
484    ///
485    /// # Panics
486    ///
487    /// Panics if `idx` is out of bounds.
488    pub fn buffer_mut(&mut self, idx: usize) -> (&mut [u8], &mut SocketAddr) {
489        let slot = &mut self.slots[idx];
490        (&mut slot.buf, &mut slot.addr)
491    }
492
493    /// Returns the data payload of the packet at index `idx`.
494    ///
495    /// The returned slice length matches the actual received datagram size.
496    ///
497    /// # Panics
498    ///
499    /// Panics if `idx` is out of bounds.
500    #[must_use]
501    pub fn data(&self, idx: usize) -> &[u8] {
502        let slot = &self.slots[idx];
503        &slot.buf[..slot.recv_len as usize]
504    }
505
506    /// Returns the source address of the packet at index `idx`.
507    ///
508    /// # Panics
509    ///
510    /// Panics if `idx` is out of bounds.
511    #[must_use]
512    pub fn addr(&self, idx: usize) -> SocketAddr {
513        self.slots[idx].addr
514    }
515
516    /// Removes all received packets, resetting the batch for reuse.
517    ///
518    /// All recv lengths are reset to 0 and metadata (timestamps, dst addresses)
519    /// are cleared. Internal buffers are retained.
520    pub fn clear(&mut self) {
521        self.len = 0;
522        for slot in &mut self.slots {
523            slot.recv_len = 0;
524            #[cfg(feature = "timestamping")]
525            {
526                slot.timestamp = None;
527            }
528            #[cfg(feature = "pktinfo")]
529            {
530                slot.dst_addr = None;
531            }
532        }
533    }
534
535    /// Returns the kernel-provided software timestamp for the packet at `idx`,
536    /// if available.
537    ///
538    /// Requires the `timestamping` feature.
539    #[cfg(feature = "timestamping")]
540    #[must_use]
541    pub fn timestamp(&self, idx: usize) -> Option<Timestamp> {
542        self.slots[idx].timestamp
543    }
544
545    #[cfg(feature = "timestamping")]
546    #[allow(dead_code)]
547    pub(crate) fn set_timestamp(&mut self, idx: usize, ts: Option<Timestamp>) {
548        self.slots[idx].timestamp = ts;
549    }
550
551    /// Returns the destination address (local interface address) for the packet
552    /// at `idx`, if available.
553    ///
554    /// Requires the `pktinfo` feature.
555    #[cfg(feature = "pktinfo")]
556    #[must_use]
557    pub fn dst_addr(&self, idx: usize) -> Option<SocketAddr> {
558        self.slots[idx].dst_addr
559    }
560
561    #[cfg(feature = "pktinfo")]
562    #[allow(dead_code)]
563    pub(crate) fn set_dst_addr(&mut self, idx: usize, addr: Option<SocketAddr>) {
564        self.slots[idx].dst_addr = addr;
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use std::net::SocketAddr;
572
573    // -----------------------------------------------------------------------
574    // SendBatch<N> tests
575    // -----------------------------------------------------------------------
576
577    #[test]
578    fn send_batch_new_is_empty() {
579        let batch = SendBatch::<4>::new();
580        assert_eq!(batch.len(), 0);
581        assert!(batch.is_empty());
582        assert_eq!(batch.capacity(), 4);
583    }
584
585    #[test]
586    fn send_batch_push_increments_len() {
587        let mut batch = SendBatch::<4>::new();
588        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
589        batch.push(b"hello", addr).unwrap();
590        assert_eq!(batch.len(), 1);
591        assert!(!batch.is_empty());
592    }
593
594    #[test]
595    fn send_batch_push_multiple() {
596        let mut batch = SendBatch::<4>::new();
597        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
598        batch.push(b"a", addr).unwrap();
599        batch.push(b"b", addr).unwrap();
600        batch.push(b"c", addr).unwrap();
601        assert_eq!(batch.len(), 3);
602        assert_eq!(batch.entry(0).0, b"a");
603        assert_eq!(batch.entry(1).0, b"b");
604        assert_eq!(batch.entry(2).0, b"c");
605    }
606
607    #[test]
608    fn send_batch_push_batch_full() {
609        let mut batch = SendBatch::<2>::new();
610        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
611        batch.push(b"a", addr).unwrap();
612        batch.push(b"b", addr).unwrap();
613        let err = batch.push(b"c", addr).unwrap_err();
614        assert!(matches!(err, BingerError::BatchFull { capacity: 2 }));
615    }
616
617    #[test]
618    fn send_batch_push_connected_addr_is_none() {
619        let mut batch = SendBatch::<4>::new();
620        batch.push_connected(b"hello").unwrap();
621        assert_eq!(batch.len(), 1);
622        let (data, addr_opt) = batch.entry(0);
623        assert_eq!(data, b"hello");
624        assert!(addr_opt.is_none());
625    }
626
627    #[test]
628    fn send_batch_clear_resets_len() {
629        let mut batch = SendBatch::<4>::new();
630        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
631        batch.push(b"a", addr).unwrap();
632        batch.push(b"b", addr).unwrap();
633        assert_eq!(batch.len(), 2);
634        batch.clear();
635        assert_eq!(batch.len(), 0);
636        assert!(batch.is_empty());
637    }
638
639    #[test]
640    fn send_batch_default_trait() {
641        let batch: SendBatch<4> = SendBatch::default();
642        assert_eq!(batch.len(), 0);
643        assert!(batch.is_empty());
644        assert_eq!(batch.capacity(), 4);
645    }
646
647    #[test]
648    fn send_batch_capacity_returns_const_generic() {
649        let batch = SendBatch::<16>::new();
650        assert_eq!(batch.capacity(), 16);
651
652        let batch_small = SendBatch::<1>::new();
653        assert_eq!(batch_small.capacity(), 1);
654    }
655
656    #[test]
657    fn send_batch_deref_to_raw() {
658        let mut batch = SendBatch::<4>::new();
659        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
660        batch.push(b"data", addr).unwrap();
661        let (data, addr_opt) = batch.entry(0);
662        assert_eq!(data, b"data");
663        assert_eq!(addr_opt, Some(addr));
664    }
665
666    // -----------------------------------------------------------------------
667    // RecvBatch<N> tests
668    // -----------------------------------------------------------------------
669
670    #[test]
671    fn recv_batch_new_has_correct_capacity() {
672        let batch = RecvBatch::<4>::new(1024);
673        assert_eq!(batch.capacity(), 4);
674        assert_eq!(batch.len(), 0);
675        assert!(batch.is_empty());
676    }
677
678    #[test]
679    fn recv_batch_data_returns_empty_slice_initial() {
680        let batch = RecvBatch::<4>::new(1024);
681        assert!(batch.data(0).is_empty());
682        assert!(batch.data(1).is_empty());
683        assert!(batch.data(2).is_empty());
684        assert!(batch.data(3).is_empty());
685    }
686
687    #[test]
688    fn recv_batch_addr_returns_default_initial() {
689        let batch = RecvBatch::<4>::new(1024);
690        let default: SocketAddr = ([0, 0, 0, 0], 0).into();
691        assert_eq!(batch.addr(0), default);
692        assert_eq!(batch.addr(3), default);
693    }
694
695    #[test]
696    fn recv_batch_simulate_receive() {
697        let mut batch = RecvBatch::<4>::new(64);
698        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
699
700        let (buf, slot_addr) = batch.buffer_mut(0);
701        buf[..5].copy_from_slice(b"hello");
702        *slot_addr = addr;
703        // Safety: idx=0 is within bounds, 5 <= buf_size=64
704        unsafe {
705            batch.set_recv_len(0, 5);
706        }
707        batch.set_len(1);
708
709        assert_eq!(batch.len(), 1);
710        assert_eq!(batch.data(0), b"hello");
711        assert_eq!(batch.addr(0), addr);
712    }
713
714    #[test]
715    fn recv_batch_clear_resets_len_and_recv_data() {
716        let mut batch = RecvBatch::<4>::new(64);
717        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
718
719        let (buf, slot_addr) = batch.buffer_mut(0);
720        buf[..5].copy_from_slice(b"hello");
721        *slot_addr = addr;
722        unsafe {
723            batch.set_recv_len(0, 5);
724        }
725        batch.set_len(1);
726
727        batch.clear();
728        assert_eq!(batch.len(), 0);
729        assert!(batch.is_empty());
730        assert!(batch.data(0).is_empty());
731    }
732
733    #[test]
734    fn recv_batch_iter_empty() {
735        let batch = RecvBatch::<4>::new(64);
736        let items: Vec<_> = batch.iter().collect();
737        assert!(items.is_empty());
738    }
739
740    #[test]
741    fn recv_batch_iter_yields_correct_items() {
742        let mut batch = RecvBatch::<4>::new(64);
743        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
744        let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
745
746        let (buf, slot_addr) = batch.buffer_mut(0);
747        buf[..2].copy_from_slice(b"ab");
748        *slot_addr = addr1;
749        unsafe {
750            batch.set_recv_len(0, 2);
751        }
752
753        let (buf, slot_addr) = batch.buffer_mut(1);
754        buf[..3].copy_from_slice(b"cde");
755        *slot_addr = addr2;
756        unsafe {
757            batch.set_recv_len(1, 3);
758        }
759
760        batch.set_len(2);
761
762        let items: Vec<_> = batch.iter().collect();
763        assert_eq!(items.len(), 2);
764        assert_eq!(items[0], (&b"ab"[..], addr1));
765        assert_eq!(items[1], (&b"cde"[..], addr2));
766    }
767
768    #[test]
769    fn recv_batch_capacity_returns_const_generic() {
770        let batch = RecvBatch::<8>::new(512);
771        assert_eq!(batch.capacity(), 8);
772
773        let batch_small = RecvBatch::<2>::new(512);
774        assert_eq!(batch_small.capacity(), 2);
775    }
776
777    // -----------------------------------------------------------------------
778    // SendBatchRaw tests
779    // -----------------------------------------------------------------------
780
781    #[test]
782    fn send_raw_with_capacity_creates_empty() {
783        let raw = SendBatchRaw::with_capacity(4);
784        assert_eq!(raw.len(), 0);
785        assert!(raw.is_empty());
786    }
787
788    #[test]
789    fn send_raw_push_with_addr() {
790        let mut raw = SendBatchRaw::with_capacity(4);
791        let addr: SocketAddr = "10.0.0.1:1234".parse().unwrap();
792        raw.push(b"test data", Some(addr)).unwrap();
793        assert_eq!(raw.len(), 1);
794
795        let (data, addr_opt) = raw.entry(0);
796        assert_eq!(data, b"test data");
797        assert_eq!(addr_opt, Some(addr));
798    }
799
800    #[test]
801    fn send_raw_push_without_addr() {
802        let mut raw = SendBatchRaw::with_capacity(4);
803        raw.push(b"no-destination", None).unwrap();
804
805        let (data, addr_opt) = raw.entry(0);
806        assert_eq!(data, b"no-destination");
807        assert!(addr_opt.is_none());
808    }
809
810    #[test]
811    fn send_raw_batch_full() {
812        let mut raw = SendBatchRaw::with_capacity(2);
813        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
814        raw.push(b"a", Some(addr)).unwrap();
815        raw.push(b"b", Some(addr)).unwrap();
816        let err = raw.push(b"c", Some(addr)).unwrap_err();
817        assert!(matches!(err, BingerError::BatchFull { capacity: 2 }));
818    }
819
820    #[test]
821    fn send_raw_clear_resets_len() {
822        let mut raw = SendBatchRaw::with_capacity(4);
823        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
824        raw.push(b"a", Some(addr)).unwrap();
825        raw.push(b"b", Some(addr)).unwrap();
826        assert_eq!(raw.len(), 2);
827        raw.clear();
828        assert_eq!(raw.len(), 0);
829        assert!(raw.is_empty());
830    }
831
832    #[test]
833    fn send_raw_after_clear_can_push_again() {
834        let mut raw = SendBatchRaw::with_capacity(2);
835        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
836        raw.push(b"a", Some(addr)).unwrap();
837        raw.push(b"b", Some(addr)).unwrap();
838        raw.clear();
839        raw.push(b"c", Some(addr)).unwrap();
840        assert_eq!(raw.len(), 1);
841        assert_eq!(raw.entry(0).0, b"c");
842    }
843
844    // -----------------------------------------------------------------------
845    // RecvBatchRaw tests
846    // -----------------------------------------------------------------------
847
848    #[test]
849    fn recv_raw_with_capacity() {
850        let raw = RecvBatchRaw::with_capacity(3, 128);
851        assert_eq!(raw.capacity(), 3);
852        assert_eq!(raw.len(), 0);
853        assert!(raw.is_empty());
854    }
855
856    #[test]
857    fn recv_raw_buffer_mut_returns_slice_and_addr_mut() {
858        let mut raw = RecvBatchRaw::with_capacity(2, 64);
859        let (buf, addr) = raw.buffer_mut(0);
860        assert_eq!(buf.len(), 64);
861        assert_eq!(*addr, SocketAddr::from(([0, 0, 0, 0], 0)));
862
863        let new_addr: SocketAddr = "192.168.1.1:9999".parse().unwrap();
864        *addr = new_addr;
865
866        let (_, addr_back) = raw.buffer_mut(0);
867        assert_eq!(*addr_back, new_addr);
868    }
869
870    #[test]
871    fn recv_raw_set_recv_len_and_data() {
872        let mut raw = RecvBatchRaw::with_capacity(1, 32);
873        let (buf, _addr) = raw.buffer_mut(0);
874        buf[..4].copy_from_slice(b"data");
875
876        // Safety: idx=0 is within bounds, 4 <= buf_size=32
877        unsafe {
878            raw.set_recv_len(0, 4);
879        }
880
881        assert_eq!(raw.data(0), b"data");
882        assert_eq!(raw.data(0).len(), 4);
883    }
884
885    #[test]
886    fn recv_raw_set_len_updates_len() {
887        let mut raw = RecvBatchRaw::with_capacity(4, 32);
888        assert_eq!(raw.len(), 0);
889
890        raw.set_len(2);
891        assert_eq!(raw.len(), 2);
892        assert!(!raw.is_empty());
893
894        raw.set_len(0);
895        assert_eq!(raw.len(), 0);
896        assert!(raw.is_empty());
897    }
898
899    #[test]
900    fn recv_raw_clear_resets_len_and_recv_len() {
901        let mut raw = RecvBatchRaw::with_capacity(2, 32);
902        let addr: SocketAddr = "10.0.0.1:5555".parse().unwrap();
903
904        let (buf, slot_addr) = raw.buffer_mut(0);
905        buf[..3].copy_from_slice(b"foo");
906        *slot_addr = addr;
907        unsafe {
908            raw.set_recv_len(0, 3);
909        }
910
911        let (buf, slot_addr) = raw.buffer_mut(1);
912        buf[..3].copy_from_slice(b"bar");
913        *slot_addr = addr;
914        unsafe {
915            raw.set_recv_len(1, 3);
916        }
917
918        raw.set_len(2);
919
920        raw.clear();
921
922        assert_eq!(raw.len(), 0);
923        assert!(raw.is_empty());
924        assert!(raw.data(0).is_empty());
925        assert!(raw.data(1).is_empty());
926    }
927
928    #[test]
929    fn recv_raw_untouched_slot_data_is_empty() {
930        let raw = RecvBatchRaw::with_capacity(4, 32);
931        assert!(raw.data(0).is_empty());
932        assert!(raw.data(3).is_empty());
933    }
934
935    // -----------------------------------------------------------------------
936    // Timestamp tests (only when feature "timestamping" is enabled)
937    // -----------------------------------------------------------------------
938
939    #[cfg(feature = "timestamping")]
940    #[test]
941    fn timestamp_as_duration_basic() {
942        let ts = Timestamp {
943            tv_sec: 1,
944            tv_nsec: 500_000_000,
945        };
946        let dur = ts.as_duration();
947        assert_eq!(dur.as_secs(), 1);
948        assert_eq!(dur.subsec_nanos(), 500_000_000);
949    }
950
951    #[cfg(feature = "timestamping")]
952    #[test]
953    fn timestamp_as_duration_zero() {
954        let ts = Timestamp {
955            tv_sec: 0,
956            tv_nsec: 0,
957        };
958        let dur = ts.as_duration();
959        assert_eq!(dur.as_secs(), 0);
960        assert_eq!(dur.subsec_nanos(), 0);
961    }
962
963    #[cfg(feature = "timestamping")]
964    #[test]
965    fn timestamp_as_duration_large_values() {
966        let ts = Timestamp {
967            tv_sec: 1_000_000,
968            tv_nsec: 123_456_789,
969        };
970        let dur = ts.as_duration();
971        assert_eq!(dur.as_secs(), 1_000_000);
972        assert_eq!(dur.subsec_nanos(), 123_456_789);
973    }
974
975    #[cfg(feature = "timestamping")]
976    #[test]
977    fn timestamp_default_is_zero() {
978        let ts: Timestamp = Timestamp::default();
979        assert_eq!(ts.tv_sec, 0);
980        assert_eq!(ts.tv_nsec, 0);
981    }
982
983    #[cfg(feature = "timestamping")]
984    #[test]
985    fn timestamp_clone_copy_debug() {
986        let ts = Timestamp {
987            tv_sec: 42,
988            tv_nsec: 7,
989        };
990        let cloned = ts;
991        assert_eq!(cloned.tv_sec, 42);
992        assert_eq!(cloned.tv_nsec, 7);
993        let debug_str = format!("{ts:?}");
994        assert!(debug_str.contains("42"));
995        assert!(debug_str.contains('7'));
996    }
997}