use std::{collections::VecDeque, os::fd::BorrowedFd, sync::Arc};
use crossbeam_queue::ArrayQueue;
use nix::poll::PollFlags;
use xdpilone::{CompletionQueue, FillQueue, RingRx, RingTx, xdp::XdpDesc};
use crate::{af_xdp::OwnedUmem, cast, channels::ChannelError, events::route::RouteEvent};
const CACHE_CAPACITY: u32 = 64;
const FREE_LIST_BATCH: u32 = 64;
pub struct Receiver {
rx: RingRx,
fill_queue: FillQueue,
umem: Arc<OwnedUmem>,
cache: VecDeque<XdpDesc>,
free_list: Arc<ArrayQueue<u64>>,
}
impl Receiver {
pub(crate) fn new(
rx: RingRx,
fill_queue: FillQueue,
umem: Arc<OwnedUmem>,
free_list: Arc<ArrayQueue<u64>>,
) -> Self {
Self {
rx,
fill_queue,
umem,
cache: VecDeque::with_capacity(
CACHE_CAPACITY
.try_into()
.expect("u32 must fit in usize for targets"),
),
free_list,
}
}
fn replenish_fill_queue(&mut self) {
if self.free_list.len()
< FREE_LIST_BATCH
.try_into()
.expect("u32 must fit in usize for targets")
{
return;
}
{
let mut wf = self.fill_queue.fill(FREE_LIST_BATCH);
while let Some(addr) = self.free_list.pop() {
if !wf.insert_once(addr) {
self.free_list
.push(addr)
.expect("Frame count == free list cap");
break;
}
}
wf.commit();
}
if self.fill_queue.needs_wakeup() {
self.fill_queue.wake();
}
}
#[allow(clippy::missing_panics_doc)]
pub fn recv(&mut self) -> Result<RouteEvent<'_>, ChannelError> {
self.replenish_fill_queue();
while self.cache.is_empty() {
{
let mut reader = self.rx.receive(CACHE_CAPACITY);
self.cache.extend(reader.by_ref());
reader.release();
}
if self.cache.is_empty() {
let fd = unsafe { BorrowedFd::borrow_raw(self.rx.as_raw_fd()) };
crate::channels::poll::poll_fd(fd, PollFlags::POLLIN)?;
}
}
let XdpDesc { addr, len, .. } = self
.cache
.pop_front()
.expect("cache non-empty after loop and poll");
Ok(RouteEvent {
address: addr,
len,
umem: &self.umem,
free_list: &self.free_list,
})
}
pub fn try_recv(&mut self) -> Result<RouteEvent<'_>, ChannelError> {
self.replenish_fill_queue();
if self.cache.is_empty() {
let mut reader = self.rx.receive(CACHE_CAPACITY);
self.cache.extend(reader.by_ref());
reader.release();
}
let XdpDesc { addr, len, .. } = self.cache.pop_front().ok_or(ChannelError::Empty)?;
Ok(RouteEvent {
len,
address: addr,
umem: &self.umem,
free_list: &self.free_list,
})
}
}
pub struct Sender {
tx: RingTx,
completion_queue: CompletionQueue,
umem: Arc<OwnedUmem>,
free_list: Arc<ArrayQueue<u64>>,
}
impl Sender {
pub(crate) fn new(
tx: RingTx,
completion_queue: CompletionQueue,
umem: Arc<OwnedUmem>,
free_list: Arc<ArrayQueue<u64>>,
) -> Self {
Self {
tx,
completion_queue,
umem,
free_list,
}
}
fn drain_completion_queue(&mut self) {
let mut rc = self.completion_queue.complete(FREE_LIST_BATCH);
if rc.capacity() < FREE_LIST_BATCH {
return;
}
while let Some(addr) = rc.read() {
self.free_list.push(addr).expect("free list = frame count");
}
rc.release();
}
#[allow(clippy::missing_panics_doc)]
pub fn try_send(&mut self, data: impl AsRef<[u8]>) -> Result<(), ChannelError> {
self.drain_completion_queue();
let bytes = data.as_ref();
let address = self.free_list.pop().ok_or(ChannelError::Empty)?;
unsafe {
self.umem
.write_at(cast::umem_offset_to_usize(address), bytes);
}
{
let mut wt = self.tx.transmit(1);
if !wt.insert_once(XdpDesc {
addr: address,
len: cast::packet_len_to_u32(bytes.len()),
options: 0,
}) {
self.free_list
.push(address)
.expect("free list cap = frame count");
return Err(ChannelError::Empty);
}
wt.commit();
}
if self.tx.needs_wakeup() {
self.tx.wake();
}
Ok(())
}
pub fn send(&mut self, data: impl AsRef<[u8]>) -> Result<(), ChannelError> {
let bytes = data.as_ref();
let address = loop {
self.drain_completion_queue();
if let Some(address) = self.free_list.pop() {
break address;
}
let fd = unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) };
crate::channels::poll::poll_fd(fd, PollFlags::POLLIN | PollFlags::POLLOUT)?;
};
unsafe {
self.umem
.write_at(cast::umem_offset_to_usize(address), bytes);
}
loop {
let mut wt = self.tx.transmit(1);
if wt.insert_once(XdpDesc {
addr: address,
len: cast::packet_len_to_u32(bytes.len()),
options: 0,
}) {
wt.commit();
break;
}
drop(wt);
let fd = unsafe { BorrowedFd::borrow_raw(self.tx.as_raw_fd()) };
crate::channels::poll::poll_fd(fd, PollFlags::POLLIN | PollFlags::POLLOUT)?;
}
if self.tx.needs_wakeup() {
self.tx.wake();
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::channels::route::{Receiver, Sender};
fn assert_send<T: Send>() {}
#[test]
fn sender_is_send() {
assert_send::<Sender>();
}
#[test]
fn receiver_is_send() {
assert_send::<Receiver>();
}
}