solana_streamer/nonblocking/
recvmmsg.rs

1//! The `recvmmsg` module provides a nonblocking recvmmsg() API implementation
2
3use {
4    crate::{
5        packet::{Meta, Packet},
6        recvmmsg::NUM_RCVMMSGS,
7    },
8    std::{cmp, io},
9    tokio::net::UdpSocket,
10};
11
12pub async fn recv_mmsg(
13    socket: &UdpSocket,
14    packets: &mut [Packet],
15) -> io::Result</*num packets:*/ usize> {
16    debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default()));
17    let count = cmp::min(NUM_RCVMMSGS, packets.len());
18    socket.readable().await?;
19    let mut i = 0;
20    for p in packets.iter_mut().take(count) {
21        p.meta.size = 0;
22        match socket.try_recv_from(p.buffer_mut()) {
23            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
24                break;
25            }
26            Err(e) => {
27                return Err(e);
28            }
29            Ok((nrecv, from)) => {
30                p.meta.size = nrecv;
31                p.meta.set_socket_addr(&from);
32            }
33        }
34        i += 1;
35    }
36    Ok(i)
37}
38
39#[cfg(test)]
40mod tests {
41    use {
42        crate::{nonblocking::recvmmsg::*, packet::PACKET_DATA_SIZE},
43        std::{net::SocketAddr, time::Instant},
44        tokio::net::UdpSocket,
45    };
46
47    type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr);
48
49    async fn test_setup_reader_sender(ip_str: &str) -> io::Result<TestConfig> {
50        let reader = UdpSocket::bind(ip_str).await?;
51        let addr = reader.local_addr()?;
52        let sender = UdpSocket::bind(ip_str).await?;
53        let saddr = sender.local_addr()?;
54        Ok((reader, addr, sender, saddr))
55    }
56
57    const TEST_NUM_MSGS: usize = 32;
58
59    async fn test_one_iter((reader, addr, sender, saddr): TestConfig) {
60        let sent = TEST_NUM_MSGS - 1;
61        for _ in 0..sent {
62            let data = [0; PACKET_DATA_SIZE];
63            sender.send_to(&data[..], &addr).await.unwrap();
64        }
65
66        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
67        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
68        assert_eq!(sent, recv);
69        for packet in packets.iter().take(recv) {
70            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
71            assert_eq!(packet.meta.socket_addr(), saddr);
72        }
73    }
74
75    #[tokio::test]
76    async fn test_recv_mmsg_one_iter() {
77        test_one_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
78
79        match test_setup_reader_sender("::1:0").await {
80            Ok(config) => test_one_iter(config).await,
81            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
82        }
83    }
84
85    async fn test_multi_iter((reader, addr, sender, saddr): TestConfig) {
86        let sent = TEST_NUM_MSGS + 10;
87        for _ in 0..sent {
88            let data = [0; PACKET_DATA_SIZE];
89            sender.send_to(&data[..], &addr).await.unwrap();
90        }
91
92        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
93        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
94        assert_eq!(TEST_NUM_MSGS, recv);
95        for packet in packets.iter().take(recv) {
96            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
97            assert_eq!(packet.meta.socket_addr(), saddr);
98        }
99
100        packets
101            .iter_mut()
102            .for_each(|pkt| pkt.meta = Meta::default());
103        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
104        assert_eq!(sent - TEST_NUM_MSGS, recv);
105        for packet in packets.iter().take(recv) {
106            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
107            assert_eq!(packet.meta.socket_addr(), saddr);
108        }
109    }
110
111    #[tokio::test]
112    async fn test_recv_mmsg_multi_iter() {
113        test_multi_iter(test_setup_reader_sender("127.0.0.1:0").await.unwrap()).await;
114
115        match test_setup_reader_sender("::1:0").await {
116            Ok(config) => test_multi_iter(config).await,
117            Err(e) => warn!("Failed to configure IPv6: {:?}", e),
118        }
119    }
120
121    #[tokio::test]
122    async fn test_recv_mmsg_multi_iter_timeout() {
123        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
124        let addr = reader.local_addr().unwrap();
125        let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
126        let saddr = sender.local_addr().unwrap();
127        let sent = TEST_NUM_MSGS;
128        for _ in 0..sent {
129            let data = [0; PACKET_DATA_SIZE];
130            sender.send_to(&data[..], &addr).await.unwrap();
131        }
132
133        let start = Instant::now();
134        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
135        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
136        assert_eq!(TEST_NUM_MSGS, recv);
137        for packet in packets.iter().take(recv) {
138            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
139            assert_eq!(packet.meta.socket_addr(), saddr);
140        }
141
142        packets
143            .iter_mut()
144            .for_each(|pkt| pkt.meta = Meta::default());
145        let _recv = recv_mmsg(&reader, &mut packets[..]).await;
146        assert!(start.elapsed().as_secs() < 5);
147    }
148
149    #[tokio::test]
150    async fn test_recv_mmsg_multi_addrs() {
151        let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
152        let addr = reader.local_addr().unwrap();
153
154        let sender1 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
155        let saddr1 = sender1.local_addr().unwrap();
156        let sent1 = TEST_NUM_MSGS - 1;
157
158        let sender2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind");
159        let saddr2 = sender2.local_addr().unwrap();
160        let sent2 = TEST_NUM_MSGS + 1;
161
162        for _ in 0..sent1 {
163            let data = [0; PACKET_DATA_SIZE];
164            sender1.send_to(&data[..], &addr).await.unwrap();
165        }
166
167        for _ in 0..sent2 {
168            let data = [0; PACKET_DATA_SIZE];
169            sender2.send_to(&data[..], &addr).await.unwrap();
170        }
171
172        let mut packets = vec![Packet::default(); TEST_NUM_MSGS];
173
174        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
175        assert_eq!(TEST_NUM_MSGS, recv);
176        for packet in packets.iter().take(sent1) {
177            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
178            assert_eq!(packet.meta.socket_addr(), saddr1);
179        }
180        for packet in packets.iter().skip(sent1).take(recv - sent1) {
181            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
182            assert_eq!(packet.meta.socket_addr(), saddr2);
183        }
184
185        packets
186            .iter_mut()
187            .for_each(|pkt| pkt.meta = Meta::default());
188        let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap();
189        assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
190        for packet in packets.iter().take(recv) {
191            assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
192            assert_eq!(packet.meta.socket_addr(), saddr2);
193        }
194    }
195}