solana_streamer/packet.rs
1//! The `packet` module defines data structures and methods to pull data from the network.
2#[cfg(unix)]
3use nix::poll::{poll, PollFd, PollTimeout};
4#[cfg(any(
5 target_os = "linux",
6 target_os = "android",
7 target_os = "dragonfly",
8 target_os = "freebsd",
9))]
10use nix::{poll::ppoll, sys::time::TimeSpec};
11use {
12 crate::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
13 std::{
14 io::{ErrorKind, Result},
15 net::UdpSocket,
16 time::{Duration, Instant},
17 },
18};
19pub use {
20 solana_packet::{Meta, Packet, PACKET_DATA_SIZE},
21 solana_perf::packet::{
22 PacketBatch, PacketBatchRecycler, PacketRef, PacketRefMut, PinnedPacketBatch, NUM_PACKETS,
23 PACKETS_PER_BATCH,
24 },
25};
26
27/** Receive multiple messages from `sock` into buffer provided in `batch`.
28This is a wrapper around recvmmsg(7) call.
29
30 This function is *supposed to* timeout in 1 second and *may* block forever
31 due to a bug in the linux kernel.
32 You may want to call `sock.set_read_timeout(Some(Duration::from_secs(1)));` or similar
33 prior to calling this function if you require this to actually time out after 1 second.
34*/
35#[cfg(not(unix))]
36pub(crate) fn recv_from(
37 batch: &mut PinnedPacketBatch,
38 socket: &UdpSocket,
39 // If max_wait is None, reads from the socket until either:
40 // * 64 packets are read (PACKETS_PER_BATCH == 64), or
41 // * There are no more data available to read from the socket.
42 max_wait: Option<Duration>,
43) -> Result<usize> {
44 let mut i = 0;
45 //DOCUMENTED SIDE-EFFECT
46 //Performance out of the IO without poll
47 // * block on the socket until it's readable
48 // * set the socket to non blocking
49 // * read until it fails
50 // * set it back to blocking before returning
51 socket.set_nonblocking(false)?;
52 trace!("receiving on {}", socket.local_addr().unwrap());
53 let should_wait = max_wait.is_some();
54 let start = should_wait.then(Instant::now);
55 loop {
56 batch.resize(PACKETS_PER_BATCH, Packet::default());
57 match recv_mmsg(socket, &mut batch[i..]) {
58 Err(err) if i > 0 => {
59 if !should_wait && err.kind() == ErrorKind::WouldBlock {
60 break;
61 }
62 }
63 Err(e) => {
64 trace!("recv_from err {e:?}");
65 return Err(e);
66 }
67 Ok(npkts) => {
68 if i == 0 {
69 socket.set_nonblocking(true)?;
70 }
71 trace!("got {npkts} packets");
72 i += npkts;
73 // Try to batch into big enough buffers
74 // will cause less re-shuffling later on.
75 if i >= PACKETS_PER_BATCH {
76 break;
77 }
78 }
79 }
80 if start.as_ref().map(Instant::elapsed) > max_wait {
81 break;
82 }
83 }
84 batch.truncate(i);
85 Ok(i)
86}
87
88/// Receive multiple messages from `sock` into buffer provided in `batch`.
89/// This is a wrapper around recvmmsg(7) call.
90#[cfg(unix)]
91pub(crate) fn recv_from(
92 batch: &mut PinnedPacketBatch,
93 socket: &UdpSocket,
94 // If max_wait is None, reads from the socket until either:
95 // * 64 packets are read (PACKETS_PER_BATCH == 64), or
96 // * There are no more data available to read from the socket.
97 max_wait: Option<Duration>,
98 poll_fd: &mut [PollFd],
99) -> Result<usize> {
100 use crate::streamer::SOCKET_READ_TIMEOUT;
101
102 // Implementation note:
103 // This is a reimplementation of the above (now, non-unix) `recv_from` function, and
104 // is explicitly meant to preserve the existing behavior, refactored for performance.
105 //
106 // This implementation is broken into two separate functions:
107 // 1. `recv_from_coalesce` - when `max_wait` is provided.
108 // 2. `recv_from_once` - when `max_wait` is not provided.
109 //
110 // This is done to avoid excessive branching in the main loop.
111
112 /// The initial socket polling timeout.
113 ///
114 /// The socket will be polled for this duration in the event that the initial
115 /// `recv_mmsg` call fails with `WouldBlock`.
116 ///
117 /// This is meant to emulate the blocking behavior of the original `recv_from` function.
118 /// The original implementation explicitly sets the socket its given as blocking, and implicitly
119 /// expects that the caller will set `socket.set_read_timeout(Some(Duration::from_millis(SOCKET_READ_TIMEOUT)))`
120 /// some time before invocation.
121 ///
122 /// Given that we are using `poll` in this implementation, and we assume the socket is set to
123 /// non-blocking, we don't need to worry about `recv_mmsg` hanging indefinitely.
124 const SOCKET_READ_TIMEOUT_MS: u16 = SOCKET_READ_TIMEOUT.as_millis() as u16;
125
126 /// Read and batch packets from the socket until batch size is [`PACKETS_PER_BATCH`] or there are no more packets to read.
127 ///
128 /// Upon calling, this will attempt to read packets from the socket, and poll for [`SOCKET_READ_TIMEOUT`]
129 /// when [`ErrorKind::WouldBlock`] is encountered.
130 ///
131 /// On subsequent iterations, when [`ErrorKind::WouldBlock`] is encountered:
132 /// - If any packets were read, the function will exit.
133 /// - If no packets were read, the function will return an error.
134 fn recv_from_once(
135 batch: &mut PinnedPacketBatch,
136 socket: &UdpSocket,
137 poll_fd: &mut [PollFd],
138 ) -> Result<usize> {
139 let mut i = 0;
140 let mut did_poll = false;
141
142 loop {
143 match recv_mmsg(socket, &mut batch[i..]) {
144 Ok(npkts) => {
145 i += npkts;
146 if i >= PACKETS_PER_BATCH {
147 break;
148 }
149 }
150 Err(e) if e.kind() == ErrorKind::WouldBlock => {
151 // If we have read any packets, we can exit.
152 if i > 0 {
153 break;
154 }
155 // If we have already polled once, return the error.
156 if did_poll {
157 return Err(e);
158 }
159 did_poll = true;
160 // If we have not read any packets or polled, poll for `SOCKET_READ_TIMEOUT`.
161 if poll(poll_fd, PollTimeout::from(SOCKET_READ_TIMEOUT_MS))? == 0 {
162 return Err(e);
163 }
164 }
165 Err(e) => return Err(e),
166 }
167 }
168
169 Ok(i)
170 }
171
172 /// Read and batch packets from the socket until batch size is [`PACKETS_PER_BATCH`] or `max_wait` is reached.
173 ///
174 /// Upon calling, this will attempt to read packets from the socket, and poll for [`SOCKET_READ_TIMEOUT`]
175 /// when [`ErrorKind::WouldBlock`] is encountered.
176 ///
177 /// On subsequent iterations, when [`ErrorKind::WouldBlock`] is encountered, poll for the
178 /// saturating duration since the start of the loop.
179 fn recv_from_coalesce(
180 batch: &mut PinnedPacketBatch,
181 socket: &UdpSocket,
182 max_wait: Duration,
183 poll_fd: &mut [PollFd],
184 ) -> Result<usize> {
185 #[cfg(any(
186 target_os = "linux",
187 target_os = "android",
188 target_os = "dragonfly",
189 target_os = "freebsd",
190 ))]
191 const MIN_POLL_DURATION: Duration = Duration::from_micros(100);
192 #[cfg(not(any(
193 target_os = "linux",
194 target_os = "android",
195 target_os = "dragonfly",
196 target_os = "freebsd",
197 )))]
198 // `ppoll` is not supported on non-linuxish platforms, so we use `poll`, which only
199 // supports millisecond precision.
200 const MIN_POLL_DURATION: Duration = Duration::from_millis(1);
201
202 let mut i = 0;
203 let deadline = Instant::now() + max_wait;
204
205 loop {
206 match recv_mmsg(socket, &mut batch[i..]) {
207 Ok(npkts) => {
208 i += npkts;
209 if i >= PACKETS_PER_BATCH {
210 break;
211 }
212 }
213 Err(e) if e.kind() == ErrorKind::WouldBlock => {
214 let timeout = if i == 0 {
215 // This emulates the behavior of the original `recv_from` function,
216 // where it anticipates that the first read of the socket will block for
217 // `crate::streamer::SOCKET_READ_TIMEOUT` before failing with
218 // `ErrorKind::WouldBlock`. The condition `i == 0` indicates that we are just
219 // after the initial read, which did not result in any packets being read.
220 SOCKET_READ_TIMEOUT
221 } else {
222 let remaining = deadline.saturating_duration_since(Instant::now());
223 // Avoid excessively short ppoll calls.
224 if remaining < MIN_POLL_DURATION {
225 // Deadline reached.
226 break;
227 }
228 remaining
229 };
230 #[cfg(any(
231 target_os = "linux",
232 target_os = "android",
233 target_os = "dragonfly",
234 target_os = "freebsd",
235 ))]
236 {
237 // Use `ppoll` for its sub-millisecond precision, which ensures that
238 // short coalescing waits (e.g., `max_wait` = 1ms, common in the codebase)
239 // are effective.
240 //
241 // The `poll()` syscall takes an integer millisecond timeout. After a
242 // `recv_mmsg` call, with `max_wait` = 1ms, the remaining wait time is
243 // virtually guaranteed to be a sub-millisecond duration. `poll` would
244 // truncate this remainder to 0ms, preventing any actual polling.
245 // `ppoll` makes coalescing in 1ms windows actually viable.
246 if ppoll(poll_fd, Some(TimeSpec::from_duration(timeout)), None)? == 0 {
247 break;
248 }
249 }
250 #[cfg(not(any(
251 target_os = "linux",
252 target_os = "android",
253 target_os = "dragonfly",
254 target_os = "freebsd",
255 )))]
256 {
257 if poll(poll_fd, PollTimeout::from(timeout.as_millis() as u16))? == 0 {
258 break;
259 }
260 }
261 }
262 Err(e) => return Err(e),
263 }
264 }
265
266 Ok(i)
267 }
268
269 trace!("receiving on {}", socket.local_addr().unwrap());
270
271 let i = match max_wait {
272 Some(max_wait) => recv_from_coalesce(batch, socket, max_wait, poll_fd),
273 None => recv_from_once(batch, socket, poll_fd),
274 }?;
275
276 batch.truncate(i);
277
278 Ok(i)
279}
280pub fn send_to(
281 batch: &PinnedPacketBatch,
282 socket: &UdpSocket,
283 socket_addr_space: &SocketAddrSpace,
284) -> Result<()> {
285 for p in batch.iter() {
286 let addr = p.meta().socket_addr();
287 if socket_addr_space.check(&addr) {
288 if let Some(data) = p.data(..) {
289 socket.send_to(data, addr)?;
290 }
291 }
292 }
293 Ok(())
294}
295
296#[cfg(test)]
297mod tests {
298 use {
299 super::{recv_from as recv_from_impl, *},
300 solana_net_utils::sockets::bind_to_localhost_unique,
301 std::{
302 io::{self, Write},
303 net::SocketAddr,
304 },
305 };
306
307 #[test]
308 fn test_packets_set_addr() {
309 // test that the address is actually being updated
310 let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
311 let packets = vec![Packet::default()];
312 let mut packet_batch = PinnedPacketBatch::new(packets);
313 packet_batch.set_addr(&send_addr);
314 assert_eq!(packet_batch[0].meta().socket_addr(), send_addr);
315 }
316
317 fn recv_from(
318 batch: &mut PinnedPacketBatch,
319 socket: &UdpSocket,
320 max_wait: Option<Duration>,
321 ) -> Result<usize> {
322 #[cfg(unix)]
323 {
324 use {nix::poll::PollFlags, std::os::fd::AsFd};
325
326 let mut poll_fd = [PollFd::new(socket.as_fd(), PollFlags::POLLIN)];
327 recv_from_impl(batch, socket, max_wait, &mut poll_fd)
328 }
329 #[cfg(not(unix))]
330 {
331 recv_from_impl(batch, socket, max_wait)
332 }
333 }
334
335 #[test]
336 pub fn packet_send_recv() {
337 agave_logger::setup();
338 let recv_socket = bind_to_localhost_unique().expect("should bind - receiver");
339 let addr = recv_socket.local_addr().unwrap();
340 let send_socket = bind_to_localhost_unique().expect("should bind - sender");
341 let saddr = send_socket.local_addr().unwrap();
342
343 let mut batch = PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH);
344 batch.resize(PACKETS_PER_BATCH, Packet::default());
345
346 for m in batch.iter_mut() {
347 m.meta_mut().set_socket_addr(&addr);
348 m.meta_mut().size = PACKET_DATA_SIZE;
349 }
350 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
351
352 batch
353 .iter_mut()
354 .for_each(|pkt| *pkt.meta_mut() = Meta::default());
355 let recvd = recv_from(
356 &mut batch,
357 &recv_socket,
358 Some(Duration::from_millis(1)), // max_wait
359 )
360 .unwrap();
361 assert_eq!(recvd, batch.len());
362
363 for m in batch.iter() {
364 assert_eq!(m.meta().size, PACKET_DATA_SIZE);
365 assert_eq!(m.meta().socket_addr(), saddr);
366 }
367 }
368
369 #[test]
370 pub fn debug_trait() {
371 write!(io::sink(), "{:?}", Packet::default()).unwrap();
372 write!(io::sink(), "{:?}", PinnedPacketBatch::default()).unwrap();
373 }
374
375 #[test]
376 fn test_packet_partial_eq() {
377 let mut p1 = Packet::default();
378 let mut p2 = Packet::default();
379
380 p1.meta_mut().size = 1;
381 p1.buffer_mut()[0] = 0;
382
383 p2.meta_mut().size = 1;
384 p2.buffer_mut()[0] = 0;
385
386 assert!(p1 == p2);
387
388 p2.buffer_mut()[0] = 4;
389 assert!(p1 != p2);
390 }
391
392 #[test]
393 fn test_packet_resize() {
394 agave_logger::setup();
395 let recv_socket = bind_to_localhost_unique().expect("should bind - receiver");
396 let addr = recv_socket.local_addr().unwrap();
397 let send_socket = bind_to_localhost_unique().expect("should bind - sender");
398 let mut batch = PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH);
399 batch.resize(PACKETS_PER_BATCH, Packet::default());
400
401 // Should only get PACKETS_PER_BATCH packets per iteration even
402 // if a lot more were sent, and regardless of packet size
403 for _ in 0..2 * PACKETS_PER_BATCH {
404 let batch_size = 1;
405 let mut batch = PinnedPacketBatch::with_capacity(batch_size);
406 batch.resize(batch_size, Packet::default());
407 for p in batch.iter_mut() {
408 p.meta_mut().set_socket_addr(&addr);
409 p.meta_mut().size = 1;
410 }
411 send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
412 }
413 let recvd = recv_from(
414 &mut batch,
415 &recv_socket,
416 Some(Duration::from_millis(100)), // max_wait
417 )
418 .unwrap();
419 // Check we only got PACKETS_PER_BATCH packets
420 assert_eq!(recvd, PACKETS_PER_BATCH);
421 assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
422 }
423}