use std::{
borrow::Borrow,
ffi::{c_int, c_void},
fmt::{self, Debug, Formatter},
io, mem,
ops::Deref,
os::fd::{AsFd as _, AsRawFd, BorrowedFd, RawFd},
ptr,
ptr::NonNull,
slice,
sync::atomic::{AtomicU32, AtomicUsize, Ordering},
};
use libc::{munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE};
use crate::{
generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ},
maps::{MapData, MapError},
sys::{mmap, SyscallError},
util::page_size,
};
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
pub struct RingBuf<T> {
map: T,
consumer: ConsumerPos,
producer: ProducerData,
}
impl<T: Borrow<MapData>> RingBuf<T> {
pub(crate) fn new(map: T) -> Result<Self, MapError> {
let data: &MapData = map.borrow();
let page_size = page_size();
let map_fd = data.fd().as_fd();
let byte_size = data.obj.max_entries();
let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?;
let consumer = ConsumerPos::new(consumer_metadata);
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
Ok(Self {
map,
consumer,
producer,
})
}
}
impl<T> RingBuf<T> {
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
let Self {
consumer, producer, ..
} = self;
producer.next(consumer)
}
}
impl<T: Borrow<MapData>> AsRawFd for RingBuf<T> {
fn as_raw_fd(&self) -> RawFd {
let Self {
map,
consumer: _,
producer: _,
} = self;
map.borrow().fd().as_fd().as_raw_fd()
}
}
pub struct RingBufItem<'a> {
data: &'a [u8],
consumer: &'a mut ConsumerPos,
}
impl Deref for RingBufItem<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
let Self { data, .. } = self;
data
}
}
impl Drop for RingBufItem<'_> {
fn drop(&mut self) {
let Self { consumer, data } = self;
consumer.consume(data.len())
}
}
impl Debug for RingBufItem<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let Self {
data,
consumer:
ConsumerPos {
pos,
metadata: ConsumerMetadata { mmap: _ },
},
} = self;
f.debug_struct("RingBufItem")
.field("pos", pos)
.field("len", &data.len())
.finish()
}
}
struct ConsumerMetadata {
mmap: MMap,
}
impl ConsumerMetadata {
fn new(fd: BorrowedFd<'_>, offset: usize, page_size: usize) -> Result<Self, MapError> {
let mmap = MMap::new(
fd,
page_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
offset.try_into().unwrap(),
)?;
Ok(Self { mmap })
}
}
impl AsRef<AtomicUsize> for ConsumerMetadata {
fn as_ref(&self) -> &AtomicUsize {
let Self {
mmap: MMap { ptr, .. },
} = self;
unsafe { ptr.cast::<AtomicUsize>().as_ref() }
}
}
struct ConsumerPos {
pos: usize,
metadata: ConsumerMetadata,
}
impl ConsumerPos {
fn new(metadata: ConsumerMetadata) -> Self {
let pos = metadata.as_ref().load(Ordering::SeqCst);
Self { pos, metadata }
}
fn consume(&mut self, len: usize) {
let Self { pos, metadata } = self;
fn next_multiple_of(n: usize, multiple: usize) -> usize {
match n % multiple {
0 => n,
rem => n + (multiple - rem),
}
}
*pos += next_multiple_of(usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len, 8);
metadata.as_ref().store(*pos, Ordering::SeqCst);
}
}
struct ProducerData {
mmap: MMap,
data_offset: usize,
pos_cache: usize,
mask: u32,
}
impl ProducerData {
fn new(
fd: BorrowedFd<'_>,
offset: usize,
page_size: usize,
byte_size: u32,
) -> Result<Self, MapError> {
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
debug_assert!(byte_size.is_power_of_two());
let mask = byte_size - 1;
Ok(Self {
mmap,
data_offset: page_size,
pos_cache: 0,
mask,
})
}
fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
let Self {
ref mmap,
data_offset,
pos_cache,
mask,
} = self;
let pos = unsafe { mmap.ptr.cast().as_ref() };
let mmap_data = mmap.as_ref();
let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
panic!(
"offset {} out of bounds, data len {}",
data_offset,
mmap_data.len()
)
});
while data_available(pos, pos_cache, consumer) {
match read_item(data_pages, *mask, consumer) {
Item::Busy => return None,
Item::Discard { len } => consumer.consume(len),
Item::Data(data) => return Some(RingBufItem { data, consumer }),
}
}
return None;
enum Item<'a> {
Busy,
Discard { len: usize },
Data(&'a [u8]),
}
fn data_available(
producer: &AtomicUsize,
cache: &mut usize,
consumer: &ConsumerPos,
) -> bool {
let ConsumerPos { pos: consumer, .. } = consumer;
if consumer == cache {
*cache = producer.load(Ordering::Acquire);
}
consumer != cache
}
fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
let ConsumerPos { pos, .. } = pos;
let offset = pos & usize::try_from(mask).unwrap();
let must_get_data = |offset, len| {
data.get(offset..offset + len).unwrap_or_else(|| {
panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())
})
};
let header_ptr =
must_get_data(offset, mem::size_of::<AtomicU32>()).as_ptr() as *const AtomicU32;
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
if header & BPF_RINGBUF_BUSY_BIT != 0 {
Item::Busy
} else {
let len = usize::try_from(header & mask).unwrap();
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
Item::Discard { len }
} else {
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
let data = must_get_data(data_offset, len);
Item::Data(data)
}
}
}
}
}
struct MMap {
ptr: NonNull<c_void>,
len: usize,
}
unsafe impl Send for MMap {}
unsafe impl Sync for MMap {}
impl MMap {
fn new(
fd: BorrowedFd<'_>,
len: usize,
prot: c_int,
flags: c_int,
offset: off_t,
) -> Result<Self, MapError> {
match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } {
MAP_FAILED => Err(MapError::SyscallError(SyscallError {
call: "mmap",
io_error: io::Error::last_os_error(),
})),
ptr => Ok(Self {
ptr: NonNull::new(ptr).ok_or(
MapError::SyscallError(SyscallError {
call: "mmap",
io_error: io::Error::new(
io::ErrorKind::Other,
"mmap returned null pointer",
),
}),
)?,
len,
}),
}
}
}
impl AsRef<[u8]> for MMap {
fn as_ref(&self) -> &[u8] {
let Self { ptr, len } = self;
unsafe { slice::from_raw_parts(ptr.as_ptr().cast(), *len) }
}
}
impl Drop for MMap {
fn drop(&mut self) {
let Self { ptr, len } = *self;
unsafe { munmap(ptr.as_ptr(), len) };
}
}