solana_streamer/nonblocking/
recvmmsg.rs1use {
4 crate::{
5 packet::{Meta, Packet},
6 recvmmsg::NUM_RCVMMSGS,
7 },
8 std::{cmp, io},
9 tokio::net::UdpSocket,
10};
11
12pub async fn recv_mmsg(
13 socket: &UdpSocket,
14 packets: &mut [Packet],
15) -> io::Result<usize> {
16 debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
17 let count = cmp::min(NUM_RCVMMSGS, packets.len());
18 socket.readable().await?;
19 let mut i = 0;
20 for p in packets.iter_mut().take(count) {
21 p.meta.size = 0;
22 match socket.try_recv_from(p.buffer_mut()) {
23 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
24 break;
25 }
26 Err(e) => {
27 return Err(e);
28 }
29 Ok((nrecv, from)) => {
30 p.meta.size = nrecv;
31 p.meta.set_socket_addr(&from);
32 }
33 }
34 i += 1;
35 }
36 Ok(i)
37}
38
39#[cfg(test)]
40mod tests {
41 use {
42 crate::{nonblocking::recvmmsg::*, packet::PACKET_DATA_SIZE},
43 std::{net::SocketAddr, time::Instant},
44 tokio::net::UdpSocket,
45 };
46
47 type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
48
49 async fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
50 let reader = UdpSocket::bind(ip_str).await?;
51 let addr = reader.local_addr()?;
52 let sender = UdpSocket::bind(ip_str).await?;
53 let saddr = sender.local_addr()?;
54 Ok((reader, addr, sender, saddr))
55 }
56
57 const TEST_NUM_MSGS: usize = 32;
58
59 async fn test_one_iter((reader, addr, sender, saddr): TestConfig) {
60 let sent = TEST_NUM_MSGS - 1;
61 for _ in 0..sent {
62 let data = [0; PACKET_DATA_SIZE];
63 sender.send_to(&data[..], &addr).await.unwrap();
64 }
65
66 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
67 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
68 assert_eq!(sent, recv);
69 for packet in packets.iter().take(recv) {
70 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
71 assert_eq!(packet.meta.socket_addr(), saddr);
72 }
73 }
74
75 #[tokio::test]
76 async fn test_recv_mmsg_one_iter() {
77 test_one_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
78
79 match test_setup_reader_sender("::1:0").await {
80 Ok(config) => test_one_iter(config).await,
81 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
82 }
83 }
84
85 async fn test_multi_iter((reader, addr, sender, saddr): TestConfig) {
86 let sent = TEST_NUM_MSGS + 10;
87 for _ in 0..sent {
88 let data = [0; PACKET_DATA_SIZE];
89 sender.send_to(&data[..], &addr).await.unwrap();
90 }
91
92 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
93 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
94 assert_eq!(TEST_NUM_MSGS, recv);
95 for packet in packets.iter().take(recv) {
96 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
97 assert_eq!(packet.meta.socket_addr(), saddr);
98 }
99
100 packets
101 .iter_mut()
102 .for_each(|pkt| pkt.meta = Meta::default());
103 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
104 assert_eq!(sent - TEST_NUM_MSGS, recv);
105 for packet in packets.iter().take(recv) {
106 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
107 assert_eq!(packet.meta.socket_addr(), saddr);
108 }
109 }
110
111 #[tokio::test]
112 async fn test_recv_mmsg_multi_iter() {
113 test_multi_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
114
115 match test_setup_reader_sender("::1:0").await {
116 Ok(config) => test_multi_iter(config).await,
117 Err(e) => warn!("Failed to configure IPv6: {:?}", e),
118 }
119 }
120
121 #[tokio::test]
122 async fn test_recv_mmsg_multi_iter_timeout() {
123 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
124 let addr = reader.local_addr().unwrap();
125 let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
126 let saddr = sender.local_addr().unwrap();
127 let sent = TEST_NUM_MSGS;
128 for _ in 0..sent {
129 let data = [0; PACKET_DATA_SIZE];
130 sender.send_to(&data[..], &addr).await.unwrap();
131 }
132
133 let start = Instant::now();
134 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
135 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
136 assert_eq!(TEST_NUM_MSGS, recv);
137 for packet in packets.iter().take(recv) {
138 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
139 assert_eq!(packet.meta.socket_addr(), saddr);
140 }
141
142 packets
143 .iter_mut()
144 .for_each(|pkt| pkt.meta = Meta::default());
145 let _recv = recv_mmsg(&reader, &mut packets[..]).await;
146 assert!(start.elapsed().as_secs() < 5);
147 }
148
149 #[tokio::test]
150 async fn test_recv_mmsg_multi_addrs() {
151 let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
152 let addr = reader.local_addr().unwrap();
153
154 let sender1 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
155 let saddr1 = sender1.local_addr().unwrap();
156 let sent1 = TEST_NUM_MSGS - 1;
157
158 let sender2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
159 let saddr2 = sender2.local_addr().unwrap();
160 let sent2 = TEST_NUM_MSGS + 1;
161
162 for _ in 0..sent1 {
163 let data = [0; PACKET_DATA_SIZE];
164 sender1.send_to(&data[..], &addr).await.unwrap();
165 }
166
167 for _ in 0..sent2 {
168 let data = [0; PACKET_DATA_SIZE];
169 sender2.send_to(&data[..], &addr).await.unwrap();
170 }
171
172 let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
173
174 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
175 assert_eq!(TEST_NUM_MSGS, recv);
176 for packet in packets.iter().take(sent1) {
177 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
178 assert_eq!(packet.meta.socket_addr(), saddr1);
179 }
180 for packet in packets.iter().skip(sent1).take(recv - sent1) {
181 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
182 assert_eq!(packet.meta.socket_addr(), saddr2);
183 }
184
185 packets
186 .iter_mut()
187 .for_each(|pkt| pkt.meta = Meta::default());
188 let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
189 assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
190 for packet in packets.iter().take(recv) {
191 assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
192 assert_eq!(packet.meta.socket_addr(), saddr2);
193 }
194 }
195}