Skip to main content

push_packet/channels/
copy.rs

1//! Module containing the implmentation for a copy [`Receiver`].
2use std::os::fd::{AsRawFd, BorrowedFd};
3
4use aya::maps::{MapData, RingBuf};
5use nix::poll::PollFlags;
6
7use crate::{channels::ChannelError, events::copy::CopyEvent};
8
9/// A receiver that receives [`CopyEvent`]s from the underlying [`RingBuf`]. Note that
10/// [`CopyEvent`]s hold a reference to the underlying [`RingBuf`] and must be dropped to restore
11/// capacity.
12pub struct Receiver {
13    pub(crate) ring_buf: RingBuf<MapData>,
14}
15
16impl From<RingBuf<MapData>> for Receiver {
17    fn from(value: RingBuf<MapData>) -> Self {
18        Self { ring_buf: value }
19    }
20}
21
22impl Receiver {
23    /// Attempts to receive a [`CopyEvent`] without blocking.
24    ///
25    /// # Errors
26    /// Returns [`ChannelError::Empty`] if there is not a
27    /// [`aya::maps::ring_buf::RingBufItem`] available.
28    pub fn try_recv(&mut self) -> Result<CopyEvent<'_>, ChannelError> {
29        self.ring_buf
30            .next()
31            .map(std::convert::Into::into)
32            .ok_or(ChannelError::Empty)
33    }
34
35    /// Blocks until a [`CopyEvent`] is available
36    ///
37    /// # Errors
38    ///
39    /// Returns [`ChannelError::Disconnected`] if the connection is dropped.
40    pub fn recv(&mut self) -> Result<CopyEvent<'_>, ChannelError> {
41        let ptr = &raw mut self.ring_buf;
42        loop {
43            // Safety: The &mut self is held through the whole fn, this satisfies the borrow
44            // checker.
45            if let Some(item) = unsafe { (*ptr).next() } {
46                return Ok(item.into());
47            }
48            // Safety: This version of aya doesn't expose as_fd(), wait for release.
49            let borrowed_fd = unsafe { BorrowedFd::borrow_raw(self.ring_buf.as_raw_fd()) };
50            crate::channels::poll::poll_fd(borrowed_fd, PollFlags::POLLIN)?;
51        }
52    }
53}