solana_streamer/
packet.rs

1//! The `packet` module defines data structures and methods to pull data from the network.
2#[cfg(unix)]
3use nix::poll::{poll, PollFd, PollTimeout};
4#[cfg(any(
5    target_os = "linux",
6    target_os = "android",
7    target_os = "dragonfly",
8    target_os = "freebsd",
9))]
10use nix::{poll::ppoll, sys::time::TimeSpec};
11use {
12    crate::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
13    std::{
14        io::{ErrorKind, Result},
15        net::UdpSocket,
16        time::{Duration, Instant},
17    },
18};
19pub use {
20    solana_packet::{Meta, Packet, PACKET_DATA_SIZE},
21    solana_perf::packet::{
22        PacketBatch, PacketBatchRecycler, PacketRef, PacketRefMut, PinnedPacketBatch, NUM_PACKETS,
23        PACKETS_PER_BATCH,
24    },
25};
26
27/** Receive multiple messages from `sock` into buffer provided in `batch`.
28This is a wrapper around recvmmsg(7) call.
29
30 This function is *supposed to* timeout in 1 second and *may* block forever
31 due to a bug in the linux kernel.
32 You may want to call `sock.set_read_timeout(Some(Duration::from_secs(1)));` or similar
33 prior to calling this function if you require this to actually time out after 1 second.
34*/
35#[cfg(not(unix))]
36pub(crate) fn recv_from(
37    batch: &mut PinnedPacketBatch,
38    socket: &UdpSocket,
39    // If max_wait is None, reads from the socket until either:
40    //   * 64 packets are read (PACKETS_PER_BATCH == 64), or
41    //   * There are no more data available to read from the socket.
42    max_wait: Option<Duration>,
43) -> Result<usize> {
44    let mut i = 0;
45    //DOCUMENTED SIDE-EFFECT
46    //Performance out of the IO without poll
47    //  * block on the socket until it's readable
48    //  * set the socket to non blocking
49    //  * read until it fails
50    //  * set it back to blocking before returning
51    socket.set_nonblocking(false)?;
52    trace!("receiving on {}", socket.local_addr().unwrap());
53    let should_wait = max_wait.is_some();
54    let start = should_wait.then(Instant::now);
55    loop {
56        batch.resize(PACKETS_PER_BATCH, Packet::default());
57        match recv_mmsg(socket, &mut batch[i..]) {
58            Err(err) if i > 0 => {
59                if !should_wait && err.kind() == ErrorKind::WouldBlock {
60                    break;
61                }
62            }
63            Err(e) => {
64                trace!("recv_from err {e:?}");
65                return Err(e);
66            }
67            Ok(npkts) => {
68                if i == 0 {
69                    socket.set_nonblocking(true)?;
70                }
71                trace!("got {npkts} packets");
72                i += npkts;
73                // Try to batch into big enough buffers
74                // will cause less re-shuffling later on.
75                if i >= PACKETS_PER_BATCH {
76                    break;
77                }
78            }
79        }
80        if start.as_ref().map(Instant::elapsed) > max_wait {
81            break;
82        }
83    }
84    batch.truncate(i);
85    Ok(i)
86}
87
88/// Receive multiple messages from `sock` into buffer provided in `batch`.
89/// This is a wrapper around recvmmsg(7) call.
90#[cfg(unix)]
91pub(crate) fn recv_from(
92    batch: &mut PinnedPacketBatch,
93    socket: &UdpSocket,
94    // If max_wait is None, reads from the socket until either:
95    //   * 64 packets are read (PACKETS_PER_BATCH == 64), or
96    //   * There are no more data available to read from the socket.
97    max_wait: Option<Duration>,
98    poll_fd: &mut [PollFd],
99) -> Result<usize> {
100    use crate::streamer::SOCKET_READ_TIMEOUT;
101
102    // Implementation note:
103    // This is a reimplementation of the above (now, non-unix) `recv_from` function, and
104    // is explicitly meant to preserve the existing behavior, refactored for performance.
105    //
106    // This implementation is broken into two separate functions:
107    // 1. `recv_from_coalesce` - when `max_wait` is provided.
108    // 2. `recv_from_once` - when `max_wait` is not provided.
109    //
110    // This is done to avoid excessive branching in the main loop.
111
112    /// The initial socket polling timeout.
113    ///
114    /// The socket will be polled for this duration in the event that the initial
115    /// `recv_mmsg` call fails with `WouldBlock`.
116    ///
117    /// This is meant to emulate the blocking behavior of the original `recv_from` function.
118    /// The original implementation explicitly sets the socket its given as blocking, and implicitly
119    /// expects that the caller will set `socket.set_read_timeout(Some(Duration::from_millis(SOCKET_READ_TIMEOUT)))`
120    /// some time before invocation.
121    ///
122    /// Given that we are using `poll` in this implementation, and we assume the socket is set to
123    /// non-blocking, we don't need to worry about `recv_mmsg` hanging indefinitely.
124    const SOCKET_READ_TIMEOUT_MS: u16 = SOCKET_READ_TIMEOUT.as_millis() as u16;
125
126    /// Read and batch packets from the socket until batch size is [`PACKETS_PER_BATCH`] or there are no more packets to read.
127    ///
128    /// Upon calling, this will attempt to read packets from the socket, and poll for [`SOCKET_READ_TIMEOUT`]
129    /// when [`ErrorKind::WouldBlock`] is encountered.
130    ///
131    /// On subsequent iterations, when [`ErrorKind::WouldBlock`] is encountered:
132    /// - If any packets were read, the function will exit.
133    /// - If no packets were read, the function will return an error.
134    fn recv_from_once(
135        batch: &mut PinnedPacketBatch,
136        socket: &UdpSocket,
137        poll_fd: &mut [PollFd],
138    ) -> Result<usize> {
139        let mut i = 0;
140        let mut did_poll = false;
141
142        loop {
143            match recv_mmsg(socket, &mut batch[i..]) {
144                Ok(npkts) => {
145                    i += npkts;
146                    if i >= PACKETS_PER_BATCH {
147                        break;
148                    }
149                }
150                Err(e) if e.kind() == ErrorKind::WouldBlock => {
151                    // If we have read any packets, we can exit.
152                    if i > 0 {
153                        break;
154                    }
155                    // If we have already polled once, return the error.
156                    if did_poll {
157                        return Err(e);
158                    }
159                    did_poll = true;
160                    // If we have not read any packets or polled, poll for `SOCKET_READ_TIMEOUT`.
161                    if poll(poll_fd, PollTimeout::from(SOCKET_READ_TIMEOUT_MS))? == 0 {
162                        return Err(e);
163                    }
164                }
165                Err(e) => return Err(e),
166            }
167        }
168
169        Ok(i)
170    }
171
172    /// Read and batch packets from the socket until batch size is [`PACKETS_PER_BATCH`] or `max_wait` is reached.
173    ///
174    /// Upon calling, this will attempt to read packets from the socket, and poll for [`SOCKET_READ_TIMEOUT`]
175    /// when [`ErrorKind::WouldBlock`] is encountered.
176    ///
177    /// On subsequent iterations, when [`ErrorKind::WouldBlock`] is encountered, poll for the
178    /// saturating duration since the start of the loop.
179    fn recv_from_coalesce(
180        batch: &mut PinnedPacketBatch,
181        socket: &UdpSocket,
182        max_wait: Duration,
183        poll_fd: &mut [PollFd],
184    ) -> Result<usize> {
185        #[cfg(any(
186            target_os = "linux",
187            target_os = "android",
188            target_os = "dragonfly",
189            target_os = "freebsd",
190        ))]
191        const MIN_POLL_DURATION: Duration = Duration::from_micros(100);
192        #[cfg(not(any(
193            target_os = "linux",
194            target_os = "android",
195            target_os = "dragonfly",
196            target_os = "freebsd",
197        )))]
198        // `ppoll` is not supported on non-linuxish platforms, so we use `poll`, which only
199        // supports millisecond precision.
200        const MIN_POLL_DURATION: Duration = Duration::from_millis(1);
201
202        let mut i = 0;
203        let deadline = Instant::now() + max_wait;
204
205        loop {
206            match recv_mmsg(socket, &mut batch[i..]) {
207                Ok(npkts) => {
208                    i += npkts;
209                    if i >= PACKETS_PER_BATCH {
210                        break;
211                    }
212                }
213                Err(e) if e.kind() == ErrorKind::WouldBlock => {
214                    let timeout = if i == 0 {
215                        // This emulates the behavior of the original `recv_from` function,
216                        // where it anticipates that the first read of the socket will block for
217                        // `crate::streamer::SOCKET_READ_TIMEOUT` before failing with
218                        // `ErrorKind::WouldBlock`. The condition `i == 0` indicates that we are just
219                        // after the initial read, which did not result in any packets being read.
220                        SOCKET_READ_TIMEOUT
221                    } else {
222                        let remaining = deadline.saturating_duration_since(Instant::now());
223                        // Avoid excessively short ppoll calls.
224                        if remaining < MIN_POLL_DURATION {
225                            // Deadline reached.
226                            break;
227                        }
228                        remaining
229                    };
230                    #[cfg(any(
231                        target_os = "linux",
232                        target_os = "android",
233                        target_os = "dragonfly",
234                        target_os = "freebsd",
235                    ))]
236                    {
237                        // Use `ppoll` for its sub-millisecond precision, which ensures that
238                        // short coalescing waits (e.g., `max_wait` = 1ms, common in the codebase)
239                        // are effective.
240                        //
241                        // The `poll()` syscall takes an integer millisecond timeout. After a
242                        // `recv_mmsg` call, with `max_wait` = 1ms, the remaining wait time is
243                        // virtually guaranteed to be a sub-millisecond duration. `poll` would
244                        // truncate this remainder to 0ms, preventing any actual polling.
245                        // `ppoll` makes coalescing in 1ms windows actually viable.
246                        if ppoll(poll_fd, Some(TimeSpec::from_duration(timeout)), None)? == 0 {
247                            break;
248                        }
249                    }
250                    #[cfg(not(any(
251                        target_os = "linux",
252                        target_os = "android",
253                        target_os = "dragonfly",
254                        target_os = "freebsd",
255                    )))]
256                    {
257                        if poll(poll_fd, PollTimeout::from(timeout.as_millis() as u16))? == 0 {
258                            break;
259                        }
260                    }
261                }
262                Err(e) => return Err(e),
263            }
264        }
265
266        Ok(i)
267    }
268
269    trace!("receiving on {}", socket.local_addr().unwrap());
270
271    let i = match max_wait {
272        Some(max_wait) => recv_from_coalesce(batch, socket, max_wait, poll_fd),
273        None => recv_from_once(batch, socket, poll_fd),
274    }?;
275
276    batch.truncate(i);
277
278    Ok(i)
279}
280pub fn send_to(
281    batch: &PinnedPacketBatch,
282    socket: &UdpSocket,
283    socket_addr_space: &SocketAddrSpace,
284) -> Result<()> {
285    for p in batch.iter() {
286        let addr = p.meta().socket_addr();
287        if socket_addr_space.check(&addr) {
288            if let Some(data) = p.data(..) {
289                socket.send_to(data, addr)?;
290            }
291        }
292    }
293    Ok(())
294}
295
296#[cfg(test)]
297mod tests {
298    use {
299        super::{recv_from as recv_from_impl, *},
300        solana_net_utils::sockets::bind_to_localhost_unique,
301        std::{
302            io::{self, Write},
303            net::SocketAddr,
304        },
305    };
306
307    #[test]
308    fn test_packets_set_addr() {
309        // test that the address is actually being updated
310        let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap();
311        let packets = vec![Packet::default()];
312        let mut packet_batch = PinnedPacketBatch::new(packets);
313        packet_batch.set_addr(&send_addr);
314        assert_eq!(packet_batch[0].meta().socket_addr(), send_addr);
315    }
316
317    fn recv_from(
318        batch: &mut PinnedPacketBatch,
319        socket: &UdpSocket,
320        max_wait: Option<Duration>,
321    ) -> Result<usize> {
322        #[cfg(unix)]
323        {
324            use {nix::poll::PollFlags, std::os::fd::AsFd};
325
326            let mut poll_fd = [PollFd::new(socket.as_fd(), PollFlags::POLLIN)];
327            recv_from_impl(batch, socket, max_wait, &mut poll_fd)
328        }
329        #[cfg(not(unix))]
330        {
331            recv_from_impl(batch, socket, max_wait)
332        }
333    }
334
335    #[test]
336    pub fn packet_send_recv() {
337        agave_logger::setup();
338        let recv_socket = bind_to_localhost_unique().expect("should bind - receiver");
339        let addr = recv_socket.local_addr().unwrap();
340        let send_socket = bind_to_localhost_unique().expect("should bind - sender");
341        let saddr = send_socket.local_addr().unwrap();
342
343        let mut batch = PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH);
344        batch.resize(PACKETS_PER_BATCH, Packet::default());
345
346        for m in batch.iter_mut() {
347            m.meta_mut().set_socket_addr(&addr);
348            m.meta_mut().size = PACKET_DATA_SIZE;
349        }
350        send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
351
352        batch
353            .iter_mut()
354            .for_each(|pkt| *pkt.meta_mut() = Meta::default());
355        let recvd = recv_from(
356            &mut batch,
357            &recv_socket,
358            Some(Duration::from_millis(1)), // max_wait
359        )
360        .unwrap();
361        assert_eq!(recvd, batch.len());
362
363        for m in batch.iter() {
364            assert_eq!(m.meta().size, PACKET_DATA_SIZE);
365            assert_eq!(m.meta().socket_addr(), saddr);
366        }
367    }
368
369    #[test]
370    pub fn debug_trait() {
371        write!(io::sink(), "{:?}", Packet::default()).unwrap();
372        write!(io::sink(), "{:?}", PinnedPacketBatch::default()).unwrap();
373    }
374
375    #[test]
376    fn test_packet_partial_eq() {
377        let mut p1 = Packet::default();
378        let mut p2 = Packet::default();
379
380        p1.meta_mut().size = 1;
381        p1.buffer_mut()[0] = 0;
382
383        p2.meta_mut().size = 1;
384        p2.buffer_mut()[0] = 0;
385
386        assert!(p1 == p2);
387
388        p2.buffer_mut()[0] = 4;
389        assert!(p1 != p2);
390    }
391
392    #[test]
393    fn test_packet_resize() {
394        agave_logger::setup();
395        let recv_socket = bind_to_localhost_unique().expect("should bind - receiver");
396        let addr = recv_socket.local_addr().unwrap();
397        let send_socket = bind_to_localhost_unique().expect("should bind - sender");
398        let mut batch = PinnedPacketBatch::with_capacity(PACKETS_PER_BATCH);
399        batch.resize(PACKETS_PER_BATCH, Packet::default());
400
401        // Should only get PACKETS_PER_BATCH packets per iteration even
402        // if a lot more were sent, and regardless of packet size
403        for _ in 0..2 * PACKETS_PER_BATCH {
404            let batch_size = 1;
405            let mut batch = PinnedPacketBatch::with_capacity(batch_size);
406            batch.resize(batch_size, Packet::default());
407            for p in batch.iter_mut() {
408                p.meta_mut().set_socket_addr(&addr);
409                p.meta_mut().size = 1;
410            }
411            send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
412        }
413        let recvd = recv_from(
414            &mut batch,
415            &recv_socket,
416            Some(Duration::from_millis(100)), // max_wait
417        )
418        .unwrap();
419        // Check we only got PACKETS_PER_BATCH packets
420        assert_eq!(recvd, PACKETS_PER_BATCH);
421        assert_eq!(batch.capacity(), PACKETS_PER_BATCH);
422    }
423}