use crate::region::Region;
use crate::sync::{AtomicU32, Ordering};
#[repr(C, align(64))]
pub struct BipBufHeader {
pub write: AtomicU32,
pub watermark: AtomicU32,
pub capacity: u32,
_pad0: [u8; 52],
pub read: AtomicU32,
_pad1: [u8; 60],
}
#[cfg(not(loom))]
const _: () = assert!(core::mem::size_of::<BipBufHeader>() == 128);
pub const BIPBUF_HEADER_SIZE: usize = 128;
impl BipBufHeader {
pub fn init(&mut self, capacity: u32) {
assert!(capacity > 0, "capacity must be > 0");
self.write = AtomicU32::new(0);
self.watermark = AtomicU32::new(0);
self.capacity = capacity;
self._pad0 = [0; 52];
self.read = AtomicU32::new(0);
self._pad1 = [0; 60];
}
pub fn reset(&mut self) {
self.write = AtomicU32::new(0);
self.watermark = AtomicU32::new(0);
self.read = AtomicU32::new(0);
}
}
pub struct BipBuf {
#[allow(dead_code)]
region: Region,
inner: BipBufRaw,
}
unsafe impl Send for BipBuf {}
unsafe impl Sync for BipBuf {}
impl BipBuf {
pub unsafe fn init(region: Region, header_offset: usize, capacity: u32) -> Self {
assert!(capacity > 0, "capacity must be > 0");
assert!(
header_offset.is_multiple_of(64),
"header_offset must be 64-byte aligned"
);
let data_offset = header_offset + BIPBUF_HEADER_SIZE;
let required = data_offset + capacity as usize;
assert!(required <= region.len(), "region too small for bipbuf");
let header_ptr = region.offset(header_offset) as *mut BipBufHeader;
let data_ptr = region.offset(data_offset);
unsafe { (*header_ptr).init(capacity) };
let inner = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
Self { region, inner }
}
pub unsafe fn attach(region: Region, header_offset: usize) -> Self {
assert!(
header_offset.is_multiple_of(64),
"header_offset must be 64-byte aligned"
);
let data_offset = header_offset + BIPBUF_HEADER_SIZE;
let header_ptr = region.offset(header_offset) as *mut BipBufHeader;
let capacity = unsafe { (*header_ptr).capacity };
assert!(capacity > 0, "invalid bipbuf capacity");
let required = data_offset + capacity as usize;
assert!(required <= region.len(), "region too small for bipbuf");
let data_ptr = region.offset(data_offset);
let inner = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
Self { region, inner }
}
#[inline]
pub fn inner(&self) -> &BipBufRaw {
&self.inner
}
pub fn split(&self) -> (BipBufProducer<'_>, BipBufConsumer<'_>) {
(BipBufProducer { buf: self }, BipBufConsumer { buf: self })
}
#[inline]
pub fn capacity(&self) -> u32 {
self.inner.capacity()
}
pub fn reset(&self) {
self.inner.reset();
}
}
pub struct BipBufProducer<'a> {
buf: &'a BipBuf,
}
pub struct BipBufConsumer<'a> {
buf: &'a BipBuf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BipBufFull;
impl<'a> BipBufProducer<'a> {
pub fn can_grant(&self, len: u32) -> bool {
self.buf.inner.can_grant(len)
}
pub fn try_grant(&mut self, len: u32) -> Option<&mut [u8]> {
self.buf.inner.try_grant(len)
}
pub fn commit(&mut self, len: u32) {
self.buf.inner.commit(len);
}
}
impl<'a> BipBufConsumer<'a> {
pub fn try_read(&mut self) -> Option<&[u8]> {
self.buf.inner.try_read()
}
pub fn release(&mut self, len: u32) {
self.buf.inner.release(len);
}
}
pub struct BipBufRaw {
header: *mut BipBufHeader,
data: *mut u8,
}
unsafe impl Send for BipBufRaw {}
unsafe impl Sync for BipBufRaw {}
impl BipBufRaw {
#[inline]
pub unsafe fn from_raw(header: *mut BipBufHeader, data: *mut u8) -> Self {
Self { header, data }
}
#[inline]
fn header(&self) -> &BipBufHeader {
unsafe { &*self.header }
}
#[inline]
pub fn capacity(&self) -> u32 {
self.header().capacity
}
#[allow(clippy::mut_from_ref)]
pub fn try_grant(&self, len: u32) -> Option<&mut [u8]> {
if len == 0 {
return Some(&mut []);
}
let header = self.header();
let capacity = header.capacity;
let write = header.write.load(Ordering::Relaxed);
let read = header.read.load(Ordering::Acquire);
if write >= read {
let space_at_end = capacity - write;
if space_at_end >= len {
let ptr = unsafe { self.data.add(write as usize) };
return Some(unsafe { core::slice::from_raw_parts_mut(ptr, len as usize) });
}
if read == 0 {
return None;
}
header.watermark.store(write, Ordering::Release);
header.write.store(0, Ordering::Release);
if len < read {
let ptr = self.data;
return Some(unsafe { core::slice::from_raw_parts_mut(ptr, len as usize) });
}
None
} else {
if write + len < read {
let ptr = unsafe { self.data.add(write as usize) };
Some(unsafe { core::slice::from_raw_parts_mut(ptr, len as usize) })
} else {
None
}
}
}
pub fn can_grant(&self, len: u32) -> bool {
if len == 0 {
return true;
}
let header = self.header();
let capacity = header.capacity;
if len > capacity {
return false;
}
let write = header.write.load(Ordering::Relaxed);
let read = header.read.load(Ordering::Acquire);
if write >= read {
let space_at_end = capacity - write;
if space_at_end >= len {
true
} else {
read != 0 && len < read
}
} else {
write + len < read
}
}
pub fn commit(&self, len: u32) {
let header = self.header();
let write = header.write.load(Ordering::Relaxed);
let new_write = write.checked_add(len).expect("commit: write overflow");
assert!(
new_write <= header.capacity,
"commit: write ({new_write}) exceeds capacity ({})",
header.capacity,
);
header.write.store(new_write, Ordering::Release);
}
pub fn try_read(&self) -> Option<&[u8]> {
let header = self.header();
let read = header.read.load(Ordering::Relaxed);
let write = header.write.load(Ordering::Acquire);
let watermark = header.watermark.load(Ordering::Acquire);
if watermark != 0 {
if read < watermark {
let len = watermark - read;
let ptr = unsafe { self.data.add(read as usize) };
return Some(unsafe { core::slice::from_raw_parts(ptr, len as usize) });
}
header.read.store(0, Ordering::Release);
header.watermark.store(0, Ordering::Release);
let current_write = header.write.load(Ordering::Acquire);
if current_write > 0 {
let ptr = self.data;
return Some(unsafe { core::slice::from_raw_parts(ptr, current_write as usize) });
}
return None;
}
if read < write {
let len = write - read;
let ptr = unsafe { self.data.add(read as usize) };
Some(unsafe { core::slice::from_raw_parts(ptr, len as usize) })
} else {
None
}
}
pub fn release(&self, len: u32) {
let header = self.header();
let read = header.read.load(Ordering::Relaxed);
let new_read = read.checked_add(len).expect("release: read overflow");
assert!(
new_read <= header.capacity,
"release: read ({new_read}) exceeds capacity ({})",
header.capacity,
);
let watermark = header.watermark.load(Ordering::Acquire);
if watermark != 0 && new_read >= watermark {
header.read.store(0, Ordering::Release);
header.watermark.store(0, Ordering::Release);
} else {
header.read.store(new_read, Ordering::Release);
}
}
pub fn reset(&self) {
let header = self.header();
header.write.store(0, Ordering::Release);
header.watermark.store(0, Ordering::Release);
header.read.store(0, Ordering::Release);
}
#[inline]
pub fn is_empty(&self) -> bool {
let header = self.header();
let read = header.read.load(Ordering::Relaxed);
let write = header.write.load(Ordering::Acquire);
let watermark = header.watermark.load(Ordering::Acquire);
read == write && watermark == 0
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::region::HeapRegion;
fn make_bipbuf(capacity: u32) -> (HeapRegion, BipBuf) {
let size = BIPBUF_HEADER_SIZE + capacity as usize;
let region = HeapRegion::new_zeroed(size);
let buf = unsafe { BipBuf::init(region.region(), 0, capacity) };
(region, buf)
}
#[test]
fn header_size() {
assert_eq!(core::mem::size_of::<BipBufHeader>(), 128);
}
#[test]
fn basic_write_read() {
let (_region, buf) = make_bipbuf(256);
let (mut producer, mut consumer) = buf.split();
let grant = producer.try_grant(10).unwrap();
grant.copy_from_slice(b"helloworld");
producer.commit(10);
let data = consumer.try_read().unwrap();
assert_eq!(&data[..10], b"helloworld");
consumer.release(10);
assert!(consumer.try_read().is_none());
}
#[test]
fn multiple_writes_and_reads() {
let (_region, buf) = make_bipbuf(256);
let (mut producer, mut consumer) = buf.split();
for i in 0..3u8 {
let grant = producer.try_grant(4).unwrap();
grant.copy_from_slice(&[i, i + 1, i + 2, i + 3]);
producer.commit(4);
}
let data = consumer.try_read().unwrap();
assert_eq!(data.len(), 12);
assert_eq!(&data[0..4], &[0, 1, 2, 3]);
assert_eq!(&data[4..8], &[1, 2, 3, 4]);
assert_eq!(&data[8..12], &[2, 3, 4, 5]);
consumer.release(12);
}
#[test]
fn wraparound() {
let (_region, buf) = make_bipbuf(32);
let (mut producer, mut consumer) = buf.split();
let grant = producer.try_grant(24).unwrap();
for (i, byte) in grant.iter_mut().enumerate() {
*byte = i as u8;
}
producer.commit(24);
let data = consumer.try_read().unwrap();
assert_eq!(data.len(), 24);
consumer.release(20);
let grant = producer.try_grant(16).unwrap();
for (i, byte) in grant.iter_mut().enumerate() {
*byte = 100 + i as u8;
}
producer.commit(16);
let data = consumer.try_read().unwrap();
assert_eq!(data.len(), 4);
assert_eq!(data, &[20, 21, 22, 23]);
consumer.release(4);
let data = consumer.try_read().unwrap();
assert_eq!(data.len(), 16);
for (i, &byte) in data.iter().enumerate() {
assert_eq!(byte, 100 + i as u8);
}
consumer.release(16);
assert!(consumer.try_read().is_none());
}
#[test]
fn full_buffer_returns_none() {
let (_region, buf) = make_bipbuf(32);
let (mut producer, _consumer) = buf.split();
let grant = producer.try_grant(32).unwrap();
grant.fill(0xAB);
producer.commit(32);
assert!(producer.try_grant(1).is_none());
}
#[test]
fn zero_length_grant() {
let (_region, buf) = make_bipbuf(32);
let (mut producer, _consumer) = buf.split();
let grant = producer.try_grant(0).unwrap();
assert_eq!(grant.len(), 0);
}
#[test]
fn exact_capacity_grant() {
let (_region, buf) = make_bipbuf(64);
let (mut producer, mut consumer) = buf.split();
let grant = producer.try_grant(64).unwrap();
grant.fill(0xFF);
producer.commit(64);
let data = consumer.try_read().unwrap();
assert_eq!(data.len(), 64);
assert!(data.iter().all(|&b| b == 0xFF));
consumer.release(64);
}
#[test]
fn grant_too_large() {
let (_region, buf) = make_bipbuf(32);
let (mut producer, _consumer) = buf.split();
assert!(producer.try_grant(33).is_none());
assert!(!producer.can_grant(33));
}
#[test]
fn can_grant_is_non_mutating() {
let (_region, buf) = make_bipbuf(32);
let (mut producer, mut consumer) = buf.split();
let grant = producer.try_grant(24).unwrap();
grant.fill(0xAA);
producer.commit(24);
consumer.release(20);
let header = unsafe { &*buf.inner().header };
assert_eq!(header.write.load(Ordering::Relaxed), 24);
assert_eq!(header.watermark.load(Ordering::Relaxed), 0);
assert!(producer.can_grant(16));
assert_eq!(header.write.load(Ordering::Relaxed), 24);
assert_eq!(header.watermark.load(Ordering::Relaxed), 0);
}
#[test]
fn interleaved_operations() {
let (_region, buf) = make_bipbuf(64);
let (mut producer, mut consumer) = buf.split();
for round in 0..10u8 {
let grant = producer.try_grant(8).unwrap();
for (i, byte) in grant.iter_mut().enumerate() {
*byte = round * 8 + i as u8;
}
producer.commit(8);
let data = consumer.try_read().unwrap();
assert_eq!(data.len(), 8);
for (i, &byte) in data.iter().enumerate() {
assert_eq!(byte, round * 8 + i as u8);
}
consumer.release(8);
}
}
#[test]
fn wraparound_edge_case_read_at_zero() {
let (_region, buf) = make_bipbuf(16);
let (mut producer, _consumer) = buf.split();
let grant = producer.try_grant(12).unwrap();
grant.fill(0xAA);
producer.commit(12);
assert!(producer.try_grant(8).is_none());
let grant = producer.try_grant(4).unwrap();
grant.fill(0xBB);
producer.commit(4);
}
#[test]
fn failed_wrap_grant_keeps_wrapped_state() {
let region = HeapRegion::new_zeroed(BIPBUF_HEADER_SIZE + 32);
let header_ptr = region.region().as_ptr() as *mut BipBufHeader;
let data_ptr = unsafe { region.region().as_ptr().add(BIPBUF_HEADER_SIZE) };
unsafe { (*header_ptr).init(32) };
let raw = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
let grant = raw.try_grant(24).unwrap();
grant.fill(0xAA);
raw.commit(24);
let tail = raw.try_read().unwrap();
assert_eq!(tail.len(), 24);
raw.release(20);
assert!(raw.try_grant(21).is_none());
let header = unsafe { &*header_ptr };
assert_eq!(header.write.load(Ordering::Acquire), 0);
assert_eq!(header.watermark.load(Ordering::Acquire), 24);
let remaining = raw.try_read().unwrap();
assert_eq!(remaining.len(), 4);
raw.release(4);
assert!(raw.try_read().is_none());
}
#[test]
fn stress_many_small_messages() {
let (_region, buf) = make_bipbuf(1024);
let (mut producer, mut consumer) = buf.split();
let mut write_count = 0u32;
let mut read_count = 0u32;
for _ in 0..1000 {
if let Some(grant) = producer.try_grant(4) {
grant.copy_from_slice(&write_count.to_le_bytes());
producer.commit(4);
write_count += 1;
}
if let Some(data) = consumer.try_read() {
let msg_count = data.len() / 4;
for i in 0..msg_count {
let val = u32::from_le_bytes([
data[i * 4],
data[i * 4 + 1],
data[i * 4 + 2],
data[i * 4 + 3],
]);
assert_eq!(val, read_count);
read_count += 1;
}
consumer.release((msg_count * 4) as u32);
}
}
while let Some(data) = consumer.try_read() {
let msg_count = data.len() / 4;
for i in 0..msg_count {
let val = u32::from_le_bytes([
data[i * 4],
data[i * 4 + 1],
data[i * 4 + 2],
data[i * 4 + 3],
]);
assert_eq!(val, read_count);
read_count += 1;
}
consumer.release((msg_count * 4) as u32);
}
assert_eq!(write_count, read_count);
}
#[test]
fn raw_api_basic() {
let region = HeapRegion::new_zeroed(BIPBUF_HEADER_SIZE + 64);
let header_ptr = region.region().as_ptr() as *mut BipBufHeader;
let data_ptr = unsafe { region.region().as_ptr().add(BIPBUF_HEADER_SIZE) };
unsafe { (*header_ptr).init(64) };
let raw = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
let grant = raw.try_grant(10).unwrap();
grant.copy_from_slice(b"0123456789");
raw.commit(10);
let data = raw.try_read().unwrap();
assert_eq!(&data[..10], b"0123456789");
raw.release(10);
assert!(raw.is_empty());
}
#[test]
fn raw_api_wraparound() {
let region = HeapRegion::new_zeroed(BIPBUF_HEADER_SIZE + 32);
let header_ptr = region.region().as_ptr() as *mut BipBufHeader;
let data_ptr = unsafe { region.region().as_ptr().add(BIPBUF_HEADER_SIZE) };
unsafe { (*header_ptr).init(32) };
let raw = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
let grant = raw.try_grant(28).unwrap();
grant.fill(0xAA);
raw.commit(28);
let data = raw.try_read().unwrap();
assert_eq!(data.len(), 28);
raw.release(24);
let grant = raw.try_grant(20).unwrap();
grant.fill(0xBB);
raw.commit(20);
let data = raw.try_read().unwrap();
assert_eq!(data.len(), 4);
assert!(data.iter().all(|&b| b == 0xAA));
raw.release(4);
let data = raw.try_read().unwrap();
assert_eq!(data.len(), 20);
assert!(data.iter().all(|&b| b == 0xBB));
raw.release(20);
assert!(raw.is_empty());
}
#[test]
fn watermark_priority_reads_tail_before_front() {
let region = HeapRegion::new_zeroed(BIPBUF_HEADER_SIZE + 64);
let header_ptr = region.region().as_ptr() as *mut BipBufHeader;
let data_ptr = unsafe { region.region().as_ptr().add(BIPBUF_HEADER_SIZE) };
unsafe { (*header_ptr).init(64) };
let raw = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
unsafe {
(*header_ptr).read.store(20, Ordering::Release);
(*header_ptr).watermark.store(40, Ordering::Release);
(*header_ptr).write.store(10, Ordering::Release);
}
for i in 20u8..40 {
unsafe { *data_ptr.add(i as usize) = i };
}
for i in 0u8..10 {
unsafe { *data_ptr.add(i as usize) = 100 + i };
}
let tail = raw.try_read().expect("expected tail first");
assert_eq!(tail.len(), 20);
assert_eq!(tail[0], 20);
assert_eq!(tail[19], 39);
raw.release(20);
let front = raw.try_read().expect("expected wrapped front second");
assert_eq!(front.len(), 10);
assert_eq!(front[0], 100);
assert_eq!(front[9], 109);
}
#[test]
fn reset_clears_state() {
let region = HeapRegion::new_zeroed(BIPBUF_HEADER_SIZE + 64);
let header_ptr = region.region().as_ptr() as *mut BipBufHeader;
let data_ptr = unsafe { region.region().as_ptr().add(BIPBUF_HEADER_SIZE) };
unsafe { (*header_ptr).init(64) };
let raw = unsafe { BipBufRaw::from_raw(header_ptr, data_ptr) };
let grant = raw.try_grant(32).unwrap();
grant.fill(0xFF);
raw.commit(32);
unsafe { (*header_ptr).reset() };
assert!(raw.is_empty());
assert!(raw.try_read().is_none());
let grant = raw.try_grant(64).unwrap();
assert_eq!(grant.len(), 64);
}
}