push_packet/channels/copy.rs
1//! Module containing the implmentation for a copy [`Receiver`].
2use std::os::fd::{AsRawFd, BorrowedFd};
3
4use aya::maps::{MapData, RingBuf};
5use nix::poll::PollFlags;
6
7use crate::{channels::ChannelError, events::copy::CopyEvent};
8
9/// A receiver that receives [`CopyEvent`]s from the underlying [`RingBuf`]. Note that
10/// [`CopyEvent`]s hold a reference to the underlying [`RingBuf`] and must be dropped to restore
11/// capacity.
12pub struct Receiver {
13 pub(crate) ring_buf: RingBuf<MapData>,
14}
15
16impl From<RingBuf<MapData>> for Receiver {
17 fn from(value: RingBuf<MapData>) -> Self {
18 Self { ring_buf: value }
19 }
20}
21
22impl Receiver {
23 /// Attempts to receive a [`CopyEvent`] without blocking.
24 ///
25 /// # Errors
26 /// Returns [`ChannelError::Empty`] if there is not a
27 /// [`aya::maps::ring_buf::RingBufItem`] available.
28 pub fn try_recv(&mut self) -> Result<CopyEvent<'_>, ChannelError> {
29 self.ring_buf
30 .next()
31 .map(std::convert::Into::into)
32 .ok_or(ChannelError::Empty)
33 }
34
35 /// Blocks until a [`CopyEvent`] is available
36 ///
37 /// # Errors
38 ///
39 /// Returns [`ChannelError::Disconnected`] if the connection is dropped.
40 pub fn recv(&mut self) -> Result<CopyEvent<'_>, ChannelError> {
41 let ptr = &raw mut self.ring_buf;
42 loop {
43 // Safety: The &mut self is held through the whole fn, this satisfies the borrow
44 // checker.
45 if let Some(item) = unsafe { (*ptr).next() } {
46 return Ok(item.into());
47 }
48 // Safety: This version of aya doesn't expose as_fd(), wait for release.
49 let borrowed_fd = unsafe { BorrowedFd::borrow_raw(self.ring_buf.as_raw_fd()) };
50 crate::channels::poll::poll_fd(borrowed_fd, PollFlags::POLLIN)?;
51 }
52 }
53}