1use std::net::SocketAddr;
2
3use crate::error::BingerError;
4
5#[cfg(feature = "timestamping")]
23#[derive(Clone, Copy, Debug, Default)]
24pub struct Timestamp {
25 pub tv_sec: i64,
26 pub tv_nsec: i64,
27}
28
29#[cfg(feature = "timestamping")]
30impl Timestamp {
31 #[must_use]
36 pub fn as_duration(&self) -> std::time::Duration {
37 std::time::Duration::new(self.tv_sec as u64, self.tv_nsec as u32)
38 }
39}
40
41#[allow(clippy::module_name_repetitions)]
61pub struct SendBatch<const N: usize> {
62 raw: SendBatchRaw,
63}
64
65impl<const N: usize> SendBatch<N> {
66 #[must_use]
71 pub fn new() -> Self {
72 Self {
73 raw: SendBatchRaw::with_capacity(N),
74 }
75 }
76
77 pub fn push(&mut self, buf: &[u8], addr: SocketAddr) -> Result<(), BingerError> {
92 self.raw.push(buf, Some(addr))
93 }
94
95 pub fn push_connected(&mut self, buf: &[u8]) -> Result<(), BingerError> {
106 self.raw.push(buf, None)
107 }
108
109 #[must_use]
111 pub fn len(&self) -> usize {
112 self.raw.len()
113 }
114
115 #[must_use]
117 pub fn is_empty(&self) -> bool {
118 self.len() == 0
119 }
120
121 #[must_use]
125 pub const fn capacity(&self) -> usize {
126 N
127 }
128
129 pub fn clear(&mut self) {
134 self.raw.clear();
135 }
136}
137
138impl<const N: usize> Default for SendBatch<N> {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl<const N: usize> std::ops::Deref for SendBatch<N> {
145 type Target = SendBatchRaw;
146
147 fn deref(&self) -> &Self::Target {
149 &self.raw
150 }
151}
152
153impl<const N: usize> std::ops::DerefMut for SendBatch<N> {
154 fn deref_mut(&mut self) -> &mut Self::Target {
156 &mut self.raw
157 }
158}
159
160#[allow(clippy::module_name_repetitions)]
182pub struct RecvBatch<const N: usize> {
183 raw: RecvBatchRaw,
184}
185
186impl<const N: usize> RecvBatch<N> {
187 #[must_use]
196 pub fn new(buf_size: usize) -> Self {
197 Self {
198 raw: RecvBatchRaw::with_capacity(N, buf_size),
199 }
200 }
201
202 #[must_use]
204 pub fn len(&self) -> usize {
205 self.raw.len()
206 }
207
208 #[must_use]
210 pub fn is_empty(&self) -> bool {
211 self.len() == 0
212 }
213
214 #[must_use]
218 pub const fn capacity(&self) -> usize {
219 N
220 }
221
222 #[must_use]
231 pub fn data(&self, idx: usize) -> &[u8] {
232 self.raw.data(idx)
233 }
234
235 #[must_use]
241 pub fn addr(&self, idx: usize) -> SocketAddr {
242 self.raw.addr(idx)
243 }
244
245 pub fn clear(&mut self) {
250 self.raw.clear();
251 }
252
253 pub fn iter(&self) -> impl Iterator<Item = (&[u8], SocketAddr)> + '_ {
255 (0..self.len()).map(|i| (self.data(i), self.addr(i)))
256 }
257
258 #[cfg(feature = "timestamping")]
268 #[must_use]
269 pub fn timestamp(&self, idx: usize) -> Option<Timestamp> {
270 self.raw.timestamp(idx)
271 }
272
273 #[cfg(feature = "pktinfo")]
285 #[must_use]
286 pub fn dst_addr(&self, idx: usize) -> Option<SocketAddr> {
287 self.raw.dst_addr(idx)
288 }
289}
290
291impl<const N: usize> std::ops::Deref for RecvBatch<N> {
292 type Target = RecvBatchRaw;
293
294 fn deref(&self) -> &Self::Target {
296 &self.raw
297 }
298}
299
300impl<const N: usize> std::ops::DerefMut for RecvBatch<N> {
301 fn deref_mut(&mut self) -> &mut Self::Target {
303 &mut self.raw
304 }
305}
306
307struct SendSlot {
308 data_ptr: *const u8,
309 data_len: usize,
310 addr: Option<SocketAddr>,
311 _marker: std::marker::PhantomData<*const [u8]>,
312}
313
314pub struct SendBatchRaw {
322 slots: Vec<SendSlot>,
323 len: usize,
324}
325
326impl SendBatchRaw {
327 #[must_use]
331 pub fn with_capacity(capacity: usize) -> Self {
332 Self {
333 slots: Vec::with_capacity(capacity),
334 len: 0,
335 }
336 }
337
338 pub fn push(&mut self, data: &[u8], addr: Option<SocketAddr>) -> Result<(), BingerError> {
348 if self.len >= self.slots.capacity() {
349 return Err(BingerError::BatchFull {
350 capacity: self.slots.capacity(),
351 });
352 }
353 let slot = SendSlot {
354 data_ptr: data.as_ptr(),
355 data_len: data.len(),
356 addr,
357 _marker: std::marker::PhantomData,
358 };
359 if self.slots.len() <= self.len {
360 self.slots.push(slot);
361 } else {
362 self.slots[self.len] = slot;
363 }
364 self.len += 1;
365 Ok(())
366 }
367
368 #[must_use]
370 pub fn len(&self) -> usize {
371 self.len
372 }
373
374 #[must_use]
376 pub fn is_empty(&self) -> bool {
377 self.len == 0
378 }
379
380 #[must_use]
386 pub fn entry(&self, idx: usize) -> (&[u8], Option<SocketAddr>) {
387 let slot = &self.slots[idx];
388 let data = unsafe { std::slice::from_raw_parts(slot.data_ptr, slot.data_len) };
389 (data, slot.addr)
390 }
391
392 pub fn clear(&mut self) {
396 self.len = 0;
397 }
398}
399
400struct RecvSlot {
401 buf: Vec<u8>,
402 addr: SocketAddr,
403 recv_len: u16,
404 #[cfg(feature = "timestamping")]
405 timestamp: Option<Timestamp>,
406 #[cfg(feature = "pktinfo")]
407 dst_addr: Option<SocketAddr>,
408}
409
410pub struct RecvBatchRaw {
419 slots: Vec<RecvSlot>,
420 len: usize,
421}
422
423impl RecvBatchRaw {
424 #[must_use]
429 pub fn with_capacity(capacity: usize, buf_size: usize) -> Self {
430 let slots = (0..capacity)
431 .map(|_| RecvSlot {
432 buf: vec![0u8; buf_size],
433 addr: SocketAddr::from(([0, 0, 0, 0], 0)),
434 recv_len: 0,
435 #[cfg(feature = "timestamping")]
436 timestamp: None,
437 #[cfg(feature = "pktinfo")]
438 dst_addr: None,
439 })
440 .collect();
441 Self { slots, len: 0 }
442 }
443
444 #[must_use]
446 pub fn capacity(&self) -> usize {
447 self.slots.len()
448 }
449
450 #[must_use]
452 pub fn len(&self) -> usize {
453 self.len
454 }
455
456 #[must_use]
458 pub fn is_empty(&self) -> bool {
459 self.len == 0
460 }
461
462 pub fn set_len(&mut self, len: usize) {
467 self.len = len;
468 }
469
470 pub unsafe fn set_recv_len(&mut self, idx: usize, n: usize) {
477 self.slots[idx].recv_len = n as u16;
478 }
479
480 pub fn buffer_mut(&mut self, idx: usize) -> (&mut [u8], &mut SocketAddr) {
489 let slot = &mut self.slots[idx];
490 (&mut slot.buf, &mut slot.addr)
491 }
492
493 #[must_use]
501 pub fn data(&self, idx: usize) -> &[u8] {
502 let slot = &self.slots[idx];
503 &slot.buf[..slot.recv_len as usize]
504 }
505
506 #[must_use]
512 pub fn addr(&self, idx: usize) -> SocketAddr {
513 self.slots[idx].addr
514 }
515
516 pub fn clear(&mut self) {
521 self.len = 0;
522 for slot in &mut self.slots {
523 slot.recv_len = 0;
524 #[cfg(feature = "timestamping")]
525 {
526 slot.timestamp = None;
527 }
528 #[cfg(feature = "pktinfo")]
529 {
530 slot.dst_addr = None;
531 }
532 }
533 }
534
535 #[cfg(feature = "timestamping")]
540 #[must_use]
541 pub fn timestamp(&self, idx: usize) -> Option<Timestamp> {
542 self.slots[idx].timestamp
543 }
544
545 #[cfg(feature = "timestamping")]
546 #[allow(dead_code)]
547 pub(crate) fn set_timestamp(&mut self, idx: usize, ts: Option<Timestamp>) {
548 self.slots[idx].timestamp = ts;
549 }
550
551 #[cfg(feature = "pktinfo")]
556 #[must_use]
557 pub fn dst_addr(&self, idx: usize) -> Option<SocketAddr> {
558 self.slots[idx].dst_addr
559 }
560
561 #[cfg(feature = "pktinfo")]
562 #[allow(dead_code)]
563 pub(crate) fn set_dst_addr(&mut self, idx: usize, addr: Option<SocketAddr>) {
564 self.slots[idx].dst_addr = addr;
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571 use std::net::SocketAddr;
572
573 #[test]
578 fn send_batch_new_is_empty() {
579 let batch = SendBatch::<4>::new();
580 assert_eq!(batch.len(), 0);
581 assert!(batch.is_empty());
582 assert_eq!(batch.capacity(), 4);
583 }
584
585 #[test]
586 fn send_batch_push_increments_len() {
587 let mut batch = SendBatch::<4>::new();
588 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
589 batch.push(b"hello", addr).unwrap();
590 assert_eq!(batch.len(), 1);
591 assert!(!batch.is_empty());
592 }
593
594 #[test]
595 fn send_batch_push_multiple() {
596 let mut batch = SendBatch::<4>::new();
597 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
598 batch.push(b"a", addr).unwrap();
599 batch.push(b"b", addr).unwrap();
600 batch.push(b"c", addr).unwrap();
601 assert_eq!(batch.len(), 3);
602 assert_eq!(batch.entry(0).0, b"a");
603 assert_eq!(batch.entry(1).0, b"b");
604 assert_eq!(batch.entry(2).0, b"c");
605 }
606
607 #[test]
608 fn send_batch_push_batch_full() {
609 let mut batch = SendBatch::<2>::new();
610 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
611 batch.push(b"a", addr).unwrap();
612 batch.push(b"b", addr).unwrap();
613 let err = batch.push(b"c", addr).unwrap_err();
614 assert!(matches!(err, BingerError::BatchFull { capacity: 2 }));
615 }
616
617 #[test]
618 fn send_batch_push_connected_addr_is_none() {
619 let mut batch = SendBatch::<4>::new();
620 batch.push_connected(b"hello").unwrap();
621 assert_eq!(batch.len(), 1);
622 let (data, addr_opt) = batch.entry(0);
623 assert_eq!(data, b"hello");
624 assert!(addr_opt.is_none());
625 }
626
627 #[test]
628 fn send_batch_clear_resets_len() {
629 let mut batch = SendBatch::<4>::new();
630 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
631 batch.push(b"a", addr).unwrap();
632 batch.push(b"b", addr).unwrap();
633 assert_eq!(batch.len(), 2);
634 batch.clear();
635 assert_eq!(batch.len(), 0);
636 assert!(batch.is_empty());
637 }
638
639 #[test]
640 fn send_batch_default_trait() {
641 let batch: SendBatch<4> = SendBatch::default();
642 assert_eq!(batch.len(), 0);
643 assert!(batch.is_empty());
644 assert_eq!(batch.capacity(), 4);
645 }
646
647 #[test]
648 fn send_batch_capacity_returns_const_generic() {
649 let batch = SendBatch::<16>::new();
650 assert_eq!(batch.capacity(), 16);
651
652 let batch_small = SendBatch::<1>::new();
653 assert_eq!(batch_small.capacity(), 1);
654 }
655
656 #[test]
657 fn send_batch_deref_to_raw() {
658 let mut batch = SendBatch::<4>::new();
659 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
660 batch.push(b"data", addr).unwrap();
661 let (data, addr_opt) = batch.entry(0);
662 assert_eq!(data, b"data");
663 assert_eq!(addr_opt, Some(addr));
664 }
665
666 #[test]
671 fn recv_batch_new_has_correct_capacity() {
672 let batch = RecvBatch::<4>::new(1024);
673 assert_eq!(batch.capacity(), 4);
674 assert_eq!(batch.len(), 0);
675 assert!(batch.is_empty());
676 }
677
678 #[test]
679 fn recv_batch_data_returns_empty_slice_initial() {
680 let batch = RecvBatch::<4>::new(1024);
681 assert!(batch.data(0).is_empty());
682 assert!(batch.data(1).is_empty());
683 assert!(batch.data(2).is_empty());
684 assert!(batch.data(3).is_empty());
685 }
686
687 #[test]
688 fn recv_batch_addr_returns_default_initial() {
689 let batch = RecvBatch::<4>::new(1024);
690 let default: SocketAddr = ([0, 0, 0, 0], 0).into();
691 assert_eq!(batch.addr(0), default);
692 assert_eq!(batch.addr(3), default);
693 }
694
695 #[test]
696 fn recv_batch_simulate_receive() {
697 let mut batch = RecvBatch::<4>::new(64);
698 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
699
700 let (buf, slot_addr) = batch.buffer_mut(0);
701 buf[..5].copy_from_slice(b"hello");
702 *slot_addr = addr;
703 unsafe {
705 batch.set_recv_len(0, 5);
706 }
707 batch.set_len(1);
708
709 assert_eq!(batch.len(), 1);
710 assert_eq!(batch.data(0), b"hello");
711 assert_eq!(batch.addr(0), addr);
712 }
713
714 #[test]
715 fn recv_batch_clear_resets_len_and_recv_data() {
716 let mut batch = RecvBatch::<4>::new(64);
717 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
718
719 let (buf, slot_addr) = batch.buffer_mut(0);
720 buf[..5].copy_from_slice(b"hello");
721 *slot_addr = addr;
722 unsafe {
723 batch.set_recv_len(0, 5);
724 }
725 batch.set_len(1);
726
727 batch.clear();
728 assert_eq!(batch.len(), 0);
729 assert!(batch.is_empty());
730 assert!(batch.data(0).is_empty());
731 }
732
733 #[test]
734 fn recv_batch_iter_empty() {
735 let batch = RecvBatch::<4>::new(64);
736 let items: Vec<_> = batch.iter().collect();
737 assert!(items.is_empty());
738 }
739
740 #[test]
741 fn recv_batch_iter_yields_correct_items() {
742 let mut batch = RecvBatch::<4>::new(64);
743 let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
744 let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
745
746 let (buf, slot_addr) = batch.buffer_mut(0);
747 buf[..2].copy_from_slice(b"ab");
748 *slot_addr = addr1;
749 unsafe {
750 batch.set_recv_len(0, 2);
751 }
752
753 let (buf, slot_addr) = batch.buffer_mut(1);
754 buf[..3].copy_from_slice(b"cde");
755 *slot_addr = addr2;
756 unsafe {
757 batch.set_recv_len(1, 3);
758 }
759
760 batch.set_len(2);
761
762 let items: Vec<_> = batch.iter().collect();
763 assert_eq!(items.len(), 2);
764 assert_eq!(items[0], (&b"ab"[..], addr1));
765 assert_eq!(items[1], (&b"cde"[..], addr2));
766 }
767
768 #[test]
769 fn recv_batch_capacity_returns_const_generic() {
770 let batch = RecvBatch::<8>::new(512);
771 assert_eq!(batch.capacity(), 8);
772
773 let batch_small = RecvBatch::<2>::new(512);
774 assert_eq!(batch_small.capacity(), 2);
775 }
776
777 #[test]
782 fn send_raw_with_capacity_creates_empty() {
783 let raw = SendBatchRaw::with_capacity(4);
784 assert_eq!(raw.len(), 0);
785 assert!(raw.is_empty());
786 }
787
788 #[test]
789 fn send_raw_push_with_addr() {
790 let mut raw = SendBatchRaw::with_capacity(4);
791 let addr: SocketAddr = "10.0.0.1:1234".parse().unwrap();
792 raw.push(b"test data", Some(addr)).unwrap();
793 assert_eq!(raw.len(), 1);
794
795 let (data, addr_opt) = raw.entry(0);
796 assert_eq!(data, b"test data");
797 assert_eq!(addr_opt, Some(addr));
798 }
799
800 #[test]
801 fn send_raw_push_without_addr() {
802 let mut raw = SendBatchRaw::with_capacity(4);
803 raw.push(b"no-destination", None).unwrap();
804
805 let (data, addr_opt) = raw.entry(0);
806 assert_eq!(data, b"no-destination");
807 assert!(addr_opt.is_none());
808 }
809
810 #[test]
811 fn send_raw_batch_full() {
812 let mut raw = SendBatchRaw::with_capacity(2);
813 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
814 raw.push(b"a", Some(addr)).unwrap();
815 raw.push(b"b", Some(addr)).unwrap();
816 let err = raw.push(b"c", Some(addr)).unwrap_err();
817 assert!(matches!(err, BingerError::BatchFull { capacity: 2 }));
818 }
819
820 #[test]
821 fn send_raw_clear_resets_len() {
822 let mut raw = SendBatchRaw::with_capacity(4);
823 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
824 raw.push(b"a", Some(addr)).unwrap();
825 raw.push(b"b", Some(addr)).unwrap();
826 assert_eq!(raw.len(), 2);
827 raw.clear();
828 assert_eq!(raw.len(), 0);
829 assert!(raw.is_empty());
830 }
831
832 #[test]
833 fn send_raw_after_clear_can_push_again() {
834 let mut raw = SendBatchRaw::with_capacity(2);
835 let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
836 raw.push(b"a", Some(addr)).unwrap();
837 raw.push(b"b", Some(addr)).unwrap();
838 raw.clear();
839 raw.push(b"c", Some(addr)).unwrap();
840 assert_eq!(raw.len(), 1);
841 assert_eq!(raw.entry(0).0, b"c");
842 }
843
844 #[test]
849 fn recv_raw_with_capacity() {
850 let raw = RecvBatchRaw::with_capacity(3, 128);
851 assert_eq!(raw.capacity(), 3);
852 assert_eq!(raw.len(), 0);
853 assert!(raw.is_empty());
854 }
855
856 #[test]
857 fn recv_raw_buffer_mut_returns_slice_and_addr_mut() {
858 let mut raw = RecvBatchRaw::with_capacity(2, 64);
859 let (buf, addr) = raw.buffer_mut(0);
860 assert_eq!(buf.len(), 64);
861 assert_eq!(*addr, SocketAddr::from(([0, 0, 0, 0], 0)));
862
863 let new_addr: SocketAddr = "192.168.1.1:9999".parse().unwrap();
864 *addr = new_addr;
865
866 let (_, addr_back) = raw.buffer_mut(0);
867 assert_eq!(*addr_back, new_addr);
868 }
869
870 #[test]
871 fn recv_raw_set_recv_len_and_data() {
872 let mut raw = RecvBatchRaw::with_capacity(1, 32);
873 let (buf, _addr) = raw.buffer_mut(0);
874 buf[..4].copy_from_slice(b"data");
875
876 unsafe {
878 raw.set_recv_len(0, 4);
879 }
880
881 assert_eq!(raw.data(0), b"data");
882 assert_eq!(raw.data(0).len(), 4);
883 }
884
885 #[test]
886 fn recv_raw_set_len_updates_len() {
887 let mut raw = RecvBatchRaw::with_capacity(4, 32);
888 assert_eq!(raw.len(), 0);
889
890 raw.set_len(2);
891 assert_eq!(raw.len(), 2);
892 assert!(!raw.is_empty());
893
894 raw.set_len(0);
895 assert_eq!(raw.len(), 0);
896 assert!(raw.is_empty());
897 }
898
899 #[test]
900 fn recv_raw_clear_resets_len_and_recv_len() {
901 let mut raw = RecvBatchRaw::with_capacity(2, 32);
902 let addr: SocketAddr = "10.0.0.1:5555".parse().unwrap();
903
904 let (buf, slot_addr) = raw.buffer_mut(0);
905 buf[..3].copy_from_slice(b"foo");
906 *slot_addr = addr;
907 unsafe {
908 raw.set_recv_len(0, 3);
909 }
910
911 let (buf, slot_addr) = raw.buffer_mut(1);
912 buf[..3].copy_from_slice(b"bar");
913 *slot_addr = addr;
914 unsafe {
915 raw.set_recv_len(1, 3);
916 }
917
918 raw.set_len(2);
919
920 raw.clear();
921
922 assert_eq!(raw.len(), 0);
923 assert!(raw.is_empty());
924 assert!(raw.data(0).is_empty());
925 assert!(raw.data(1).is_empty());
926 }
927
928 #[test]
929 fn recv_raw_untouched_slot_data_is_empty() {
930 let raw = RecvBatchRaw::with_capacity(4, 32);
931 assert!(raw.data(0).is_empty());
932 assert!(raw.data(3).is_empty());
933 }
934
935 #[cfg(feature = "timestamping")]
940 #[test]
941 fn timestamp_as_duration_basic() {
942 let ts = Timestamp {
943 tv_sec: 1,
944 tv_nsec: 500_000_000,
945 };
946 let dur = ts.as_duration();
947 assert_eq!(dur.as_secs(), 1);
948 assert_eq!(dur.subsec_nanos(), 500_000_000);
949 }
950
951 #[cfg(feature = "timestamping")]
952 #[test]
953 fn timestamp_as_duration_zero() {
954 let ts = Timestamp {
955 tv_sec: 0,
956 tv_nsec: 0,
957 };
958 let dur = ts.as_duration();
959 assert_eq!(dur.as_secs(), 0);
960 assert_eq!(dur.subsec_nanos(), 0);
961 }
962
963 #[cfg(feature = "timestamping")]
964 #[test]
965 fn timestamp_as_duration_large_values() {
966 let ts = Timestamp {
967 tv_sec: 1_000_000,
968 tv_nsec: 123_456_789,
969 };
970 let dur = ts.as_duration();
971 assert_eq!(dur.as_secs(), 1_000_000);
972 assert_eq!(dur.subsec_nanos(), 123_456_789);
973 }
974
975 #[cfg(feature = "timestamping")]
976 #[test]
977 fn timestamp_default_is_zero() {
978 let ts: Timestamp = Timestamp::default();
979 assert_eq!(ts.tv_sec, 0);
980 assert_eq!(ts.tv_nsec, 0);
981 }
982
983 #[cfg(feature = "timestamping")]
984 #[test]
985 fn timestamp_clone_copy_debug() {
986 let ts = Timestamp {
987 tv_sec: 42,
988 tv_nsec: 7,
989 };
990 let cloned = ts;
991 assert_eq!(cloned.tv_sec, 42);
992 assert_eq!(cloned.tv_nsec, 7);
993 let debug_str = format!("{ts:?}");
994 assert!(debug_str.contains("42"));
995 assert!(debug_str.contains('7'));
996 }
997}