use crate::{
Umem,
libc::{self, rings},
};
pub struct FillRing {
ring: super::XskProducer<u64>,
_mmap: crate::mmap::Mmap,
}
impl FillRing {
pub(crate) fn new(
socket: std::os::fd::RawFd,
cfg: &super::RingConfig,
offsets: &rings::xdp_mmap_offsets,
) -> Result<Self, crate::socket::SocketError> {
let (_mmap, mut ring) = super::map_ring(
socket,
cfg.fill_count,
rings::RingPageOffsets::Fill,
&offsets.fill,
)
.map_err(|inner| crate::socket::SocketError::RingMap {
inner,
ring: super::Ring::Fill,
})?;
ring.cached_consumed = cfg.fill_count;
ring.cached_produced = 0;
Ok(Self {
ring: super::XskProducer(ring),
_mmap,
})
}
pub unsafe fn enqueue(&mut self, umem: &mut Umem, num_packets: usize) -> usize {
let available = umem.available();
let requested = std::cmp::min(available.len(), num_packets);
if requested == 0 {
return 0;
}
let (actual, idx) = self.ring.reserve(requested as _);
if actual > 0 {
for i in idx..idx + actual {
self.ring.set(i, available.pop_front().unwrap());
}
self.ring.submit(actual as _);
}
actual
}
}
pub struct WakableFillRing {
inner: FillRing,
socket: std::os::fd::RawFd,
}
impl WakableFillRing {
pub(crate) fn new(
socket: std::os::fd::RawFd,
cfg: &super::RingConfig,
offsets: &rings::xdp_mmap_offsets,
) -> Result<Self, crate::socket::SocketError> {
let inner = FillRing::new(socket, cfg, offsets)?;
Ok(Self { inner, socket })
}
#[inline]
pub unsafe fn enqueue(
&mut self,
umem: &mut Umem,
num_packets: usize,
wakeup: bool,
) -> std::io::Result<usize> {
let queued = unsafe { self.inner.enqueue(umem, num_packets) };
if queued > 0 && wakeup {
let ret = unsafe {
libc::socket::recvfrom(
self.socket,
std::ptr::null_mut(),
0,
libc::socket::MsgFlags::DONTWAIT,
std::ptr::null_mut(),
std::ptr::null_mut(),
)
};
if ret < 0 {
let err = std::io::Error::last_os_error();
if err.kind() != std::io::ErrorKind::Interrupted {
return Err(err);
}
}
}
Ok(queued)
}
}