use std::num::NonZeroUsize;
use std::os::fd::BorrowedFd;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, Ordering};
use nix::sys::mman::{MapFlags, ProtFlags};
use super::ffi;
use crate::error::Error;
#[derive(Debug, Clone, Copy)]
pub(crate) struct PeekToken {
pub(crate) start: u32,
pub(crate) n: u32,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct ReserveToken {
pub(crate) start: u32,
pub(crate) n: u32,
}
pub(crate) struct XdpRing<T: Copy> {
base: NonNull<u8>,
mmap_size: usize,
size: u32,
mask: u32, producer: *const AtomicU32,
consumer: *const AtomicU32,
#[allow(dead_code)] flags: *const u32,
descs: *mut T,
cached_prod: u32,
cached_cons: u32,
}
impl<T: Copy> XdpRing<T> {
pub(crate) unsafe fn mmap(
fd: BorrowedFd<'_>,
size: u32,
offsets: &ffi::xdp_ring_offset,
pgoff: libc::off_t,
) -> Result<Self, Error> {
let desc_end = offsets.desc as usize + (size as usize) * std::mem::size_of::<T>();
let mmap_size = desc_end;
let base = unsafe {
nix::sys::mman::mmap(
None,
NonZeroUsize::new(mmap_size)
.ok_or_else(|| Error::Config("ring mmap size is 0".into()))?,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED | MapFlags::MAP_POPULATE,
fd,
pgoff,
)
.map_err(|e| Error::Mmap(e.into()))?
};
let base_ptr = base.as_ptr().cast::<u8>();
let producer = base_ptr
.map_addr(|a| a + offsets.producer as usize)
.cast::<AtomicU32>();
let consumer = base_ptr
.map_addr(|a| a + offsets.consumer as usize)
.cast::<AtomicU32>();
let flags = base_ptr
.map_addr(|a| a + offsets.flags as usize)
.cast::<u32>();
let descs = base_ptr.map_addr(|a| a + offsets.desc as usize).cast::<T>();
Ok(Self {
base: unsafe { NonNull::new_unchecked(base_ptr) },
mmap_size,
size,
mask: size - 1,
producer,
consumer,
flags,
descs,
cached_prod: 0,
cached_cons: 0,
})
}
#[inline]
fn free_slots(&self) -> u32 {
self.size - (self.cached_prod.wrapping_sub(self.cached_cons))
}
#[inline]
pub(crate) fn producer_reserve(&mut self, n: u32) -> Option<ReserveToken> {
if self.free_slots() < n {
self.cached_cons = unsafe { &*self.consumer }.load(Ordering::Acquire);
}
if self.free_slots() < n {
return None;
}
let start = self.cached_prod;
self.cached_prod = self.cached_prod.wrapping_add(n);
Some(ReserveToken { start, n })
}
#[inline]
pub(crate) fn write_at(&mut self, tok: ReserveToken, offset: u32, val: T) {
assert!(
offset < tok.n,
"write_at: offset {offset} out of range for ReserveToken {{ start: {}, n: {} }}",
tok.start,
tok.n
);
let idx = tok.start.wrapping_add(offset);
unsafe { self.descs.add((idx & self.mask) as usize).write(val) };
}
#[inline]
pub(crate) fn producer_submit(&self, _tok: ReserveToken) {
unsafe { &*self.producer }.store(self.cached_prod, Ordering::Release);
}
#[inline]
pub(crate) fn cached_count(&self) -> u32 {
self.cached_prod.wrapping_sub(self.cached_cons)
}
#[inline]
pub(crate) fn consumer_peek(&mut self, max: u32) -> Option<PeekToken> {
let mut available = self.cached_prod.wrapping_sub(self.cached_cons);
if available == 0 {
self.cached_prod = unsafe { &*self.producer }.load(Ordering::Acquire);
available = self.cached_prod.wrapping_sub(self.cached_cons);
}
let n = available.min(max);
if n == 0 {
return None;
}
Some(PeekToken {
start: self.cached_cons,
n,
})
}
#[inline]
pub(crate) fn read_at(&self, tok: PeekToken, offset: u32) -> T {
assert!(
offset < tok.n,
"read_at: offset {offset} out of range for PeekToken {{ start: {}, n: {} }}",
tok.start,
tok.n
);
let idx = tok.start.wrapping_add(offset);
unsafe { self.descs.add((idx & self.mask) as usize).read() }
}
#[inline]
pub(crate) fn consumer_release(&mut self, tok: PeekToken) {
self.cached_cons = self.cached_cons.wrapping_add(tok.n);
unsafe { &*self.consumer }.store(self.cached_cons, Ordering::Release);
}
#[inline]
pub(crate) fn needs_wakeup(&self) -> bool {
(unsafe { self.flags.read_volatile() }) & ffi::XDP_RING_NEED_WAKEUP != 0
}
}
impl<T: Copy> Drop for XdpRing<T> {
fn drop(&mut self) {
let _ = unsafe { nix::sys::mman::munmap(self.base.cast(), self.mmap_size) };
}
}
unsafe impl<T: Copy + Send> Send for XdpRing<T> {}
#[cfg(test)]
const _: () = {
const fn assert_send<T: Send>() {}
assert_send::<XdpRing<u64>>();
assert_send::<XdpRing<libc::xdp_desc>>();
};
pub(crate) type FillRing = XdpRing<u64>;
pub(crate) type CompletionRing = XdpRing<u64>;
pub(crate) type RxRing = XdpRing<libc::xdp_desc>;
pub(crate) type TxRing = XdpRing<libc::xdp_desc>;
#[cfg(test)]
mod tests {
use super::*;
use std::alloc::{Layout, alloc_zeroed, dealloc};
struct MockRing<T: Copy> {
ring: XdpRing<T>,
ptr: *mut u8,
layout: Layout,
}
impl<T: Copy> MockRing<T> {
fn new(size: u32) -> Self {
assert!(size.is_power_of_two());
let desc_offset = 16usize; let total = desc_offset + (size as usize) * std::mem::size_of::<T>();
let layout = Layout::from_size_align(total, 8).unwrap();
let ptr = unsafe { alloc_zeroed(layout) };
assert!(!ptr.is_null());
let producer = ptr.cast::<AtomicU32>();
let consumer = unsafe { ptr.add(4) }.cast::<AtomicU32>();
let flags = unsafe { ptr.add(8) }.cast::<u32>();
let descs = unsafe { ptr.add(desc_offset) }.cast::<T>();
let ring = XdpRing {
base: unsafe { NonNull::new_unchecked(ptr) },
mmap_size: 0, size,
mask: size - 1,
producer,
consumer,
flags,
descs,
cached_prod: 0,
cached_cons: 0,
};
Self { ring, ptr, layout }
}
fn kernel_set_producer(&self, val: u32) {
unsafe { &*self.ring.producer }.store(val, Ordering::Release);
}
fn read_producer(&self) -> u32 {
unsafe { &*self.ring.producer }.load(Ordering::Acquire)
}
fn read_consumer(&self) -> u32 {
unsafe { &*self.ring.consumer }.load(Ordering::Acquire)
}
fn set_flags(&self, val: u32) {
unsafe { std::ptr::write_volatile(self.ring.flags as *mut u32, val) };
}
}
impl<T: Copy> Drop for MockRing<T> {
fn drop(&mut self) {
self.ring.mmap_size = 0;
unsafe { dealloc(self.ptr, self.layout) };
}
}
#[test]
fn producer_reserve_and_submit() {
let mut mock = MockRing::<u64>::new(4);
let tok = mock.ring.producer_reserve(1).unwrap();
assert_eq!(tok.start, 0);
assert_eq!(tok.n, 1);
mock.ring.write_at(tok, 0, 42u64);
mock.ring.producer_submit(tok);
assert_eq!(mock.read_producer(), 1);
}
#[test]
fn producer_reserve_full() {
let mut mock = MockRing::<u64>::new(4);
let tok = mock.ring.producer_reserve(4).unwrap();
mock.ring.producer_submit(tok);
assert!(mock.ring.producer_reserve(1).is_none());
}
#[test]
fn consumer_peek_empty() {
let mut mock = MockRing::<u64>::new(4);
assert!(mock.ring.consumer_peek(4).is_none());
}
#[test]
fn consumer_peek_after_produce() {
let mut mock = MockRing::<u64>::new(4);
mock.kernel_set_producer(2);
let tok = mock.ring.consumer_peek(4).unwrap();
assert_eq!(tok.n, 2);
assert_eq!(tok.start, 0);
}
#[test]
fn consumer_read_and_release() {
let mut mock = MockRing::<u64>::new(4);
unsafe {
mock.ring.descs.add(0).write(100u64);
mock.ring.descs.add(1).write(200u64);
}
mock.kernel_set_producer(2);
let tok = mock.ring.consumer_peek(4).unwrap();
assert_eq!(tok.n, 2);
assert_eq!(mock.ring.read_at(tok, 0), 100);
assert_eq!(mock.ring.read_at(tok, 1), 200);
mock.ring.consumer_release(tok);
assert_eq!(mock.read_consumer(), 2);
}
#[test]
#[should_panic(expected = "read_at: offset")]
fn read_at_panics_past_token_end() {
let mut mock = MockRing::<u64>::new(4);
mock.kernel_set_producer(2);
let tok = mock.ring.consumer_peek(4).unwrap();
let _ = mock.ring.read_at(tok, 5);
}
#[test]
#[should_panic(expected = "write_at: offset")]
fn write_at_panics_past_token_end() {
let mut mock = MockRing::<u64>::new(4);
let tok = mock.ring.producer_reserve(2).unwrap();
mock.ring.write_at(tok, 5, 0u64);
}
#[test]
fn index_wrapping() {
let mut mock = MockRing::<u64>::new(4);
let tok3 = mock.ring.producer_reserve(3).unwrap();
mock.ring.producer_submit(tok3);
unsafe { &*mock.ring.consumer }.store(3, Ordering::Release);
mock.ring.cached_cons = 3;
let tok = mock.ring.producer_reserve(2).unwrap();
assert_eq!(tok.start, 3);
assert_eq!(tok.n, 2);
mock.ring.write_at(tok, 0, 30u64);
mock.ring.write_at(tok, 1, 40u64);
let v0 = unsafe { mock.ring.descs.add(3).read() };
let v1 = unsafe { mock.ring.descs.add(0).read() };
assert_eq!(v0, 30);
assert_eq!(v1, 40);
}
#[test]
fn needs_wakeup_flag() {
let mock = MockRing::<u64>::new(4);
assert!(!mock.ring.needs_wakeup());
mock.set_flags(ffi::XDP_RING_NEED_WAKEUP);
assert!(mock.ring.needs_wakeup());
mock.set_flags(0);
assert!(!mock.ring.needs_wakeup());
}
#[test]
fn producer_reclaim_after_consume() {
let mut mock = MockRing::<u64>::new(4);
let tok = mock.ring.producer_reserve(4).unwrap();
mock.ring.producer_submit(tok);
assert!(mock.ring.producer_reserve(1).is_none());
unsafe { &*mock.ring.consumer }.store(2, Ordering::Release);
assert!(mock.ring.producer_reserve(2).is_some());
}
}