use std::{
borrow::Borrow,
fmt::{self, Debug, Formatter},
ops::Deref,
os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
sync::atomic::{AtomicU32, AtomicUsize, Ordering},
};
use aya_obj::generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ};
use libc::{MAP_SHARED, PROT_READ, PROT_WRITE};
use crate::{
maps::{MapData, MapError},
util::{MMap, page_size},
};
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")]
pub struct RingBuf<T> {
map: T,
consumer: ConsumerPos,
producer: ProducerData,
}
impl<T: Borrow<MapData>> RingBuf<T> {
pub(crate) fn new(map: T) -> Result<Self, MapError> {
let data: &MapData = map.borrow();
let page_size = page_size();
let map_fd = data.fd().as_fd();
let byte_size = data.obj.max_entries();
let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?;
let consumer = ConsumerPos::new(consumer_metadata);
let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?;
Ok(Self {
map,
consumer,
producer,
})
}
pub(crate) fn map_data(&self) -> &MapData {
self.map.borrow()
}
}
impl<T> RingBuf<T> {
#[expect(
clippy::should_implement_trait,
reason = "this is not an iterator; it yields a borrow-tied item"
)]
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
let Self {
consumer, producer, ..
} = self;
producer.next(consumer)
}
}
impl<T: Borrow<MapData>> AsFd for RingBuf<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
let Self {
map,
consumer: _,
producer: _,
} = self;
map.borrow().fd().as_fd()
}
}
impl<T: Borrow<MapData>> AsRawFd for RingBuf<T> {
fn as_raw_fd(&self) -> RawFd {
self.as_fd().as_raw_fd()
}
}
pub struct RingBufItem<'a> {
data: &'a [u8],
consumer: &'a mut ConsumerPos,
}
impl Deref for RingBufItem<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
let Self { data, .. } = self;
data
}
}
impl Drop for RingBufItem<'_> {
fn drop(&mut self) {
let Self { consumer, data } = self;
consumer.consume(data.len())
}
}
impl Debug for RingBufItem<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let Self {
data,
consumer:
ConsumerPos {
pos,
metadata: ConsumerMetadata { mmap: _ },
},
} = self;
f.debug_struct("RingBufItem")
.field("pos", pos)
.field("len", &data.len())
.finish()
}
}
struct ConsumerMetadata {
mmap: MMap,
}
impl ConsumerMetadata {
fn new(fd: BorrowedFd<'_>, offset: usize, page_size: usize) -> Result<Self, MapError> {
let mmap = MMap::new(
fd,
page_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
offset.try_into().unwrap(),
)?;
Ok(Self { mmap })
}
}
impl AsRef<AtomicUsize> for ConsumerMetadata {
fn as_ref(&self) -> &AtomicUsize {
unsafe { self.mmap.ptr().cast::<AtomicUsize>().as_ref() }
}
}
struct ConsumerPos {
pos: usize,
metadata: ConsumerMetadata,
}
impl ConsumerPos {
fn new(metadata: ConsumerMetadata) -> Self {
let pos = metadata.as_ref().load(Ordering::SeqCst);
Self { pos, metadata }
}
fn consume(&mut self, len: usize) {
let Self { pos, metadata } = self;
*pos += (usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len).next_multiple_of(8);
metadata.as_ref().store(*pos, Ordering::SeqCst);
}
}
struct ProducerData {
mmap: MMap,
data_offset: usize,
pos_cache: usize,
mask: u32,
}
impl ProducerData {
fn new(
fd: BorrowedFd<'_>,
offset: usize,
page_size: usize,
byte_size: u32,
) -> Result<Self, MapError> {
let len = page_size + 2 * usize::try_from(byte_size).unwrap();
let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?;
let pos_cache = load_producer_pos(&mmap);
debug_assert!(byte_size.is_power_of_two());
let mask = byte_size - 1;
Ok(Self {
mmap,
data_offset: page_size,
pos_cache,
mask,
})
}
fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option<RingBufItem<'a>> {
let Self {
mmap,
data_offset,
pos_cache,
mask,
} = self;
let mmap = &*mmap;
let mmap_data = mmap.as_ref();
#[expect(
clippy::panic,
reason = "invalid ring buffer layout is a fatal internal error"
)]
let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
panic!(
"offset {} out of bounds, data len {}",
data_offset,
mmap_data.len()
)
});
while data_available(mmap, pos_cache, consumer) {
match read_item(data_pages, *mask, consumer) {
Item::Busy => return None,
Item::Discard { len } => consumer.consume(len),
Item::Data(data) => return Some(RingBufItem { data, consumer }),
}
}
return None;
enum Item<'a> {
Busy,
Discard { len: usize },
Data(&'a [u8]),
}
fn data_available(
producer: &MMap,
producer_cache: &mut usize,
consumer: &ConsumerPos,
) -> bool {
let ConsumerPos { pos: consumer, .. } = consumer;
if consumer == producer_cache {
*producer_cache = load_producer_pos(producer);
}
consumer != producer_cache
}
fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
let ConsumerPos { pos, .. } = pos;
let offset = pos & usize::try_from(mask).unwrap();
#[expect(
clippy::panic,
reason = "invalid ring buffer layout is a fatal internal error"
)]
let must_get_data = |offset, len| {
data.get(offset..offset + len).unwrap_or_else(|| {
panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())
})
};
let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::<AtomicU32>())
.as_ptr()
.cast();
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
if header & BPF_RINGBUF_BUSY_BIT != 0 {
Item::Busy
} else {
let len = usize::try_from(header & mask).unwrap();
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
Item::Discard { len }
} else {
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
let data = must_get_data(data_offset, len);
Item::Data(data)
}
}
}
}
}
fn load_producer_pos(producer: &MMap) -> usize {
unsafe { producer.ptr().cast::<AtomicUsize>().as_ref() }.load(Ordering::Acquire)
}