use core::sync::atomic::{AtomicU32, Ordering};
use ruvix_types::{KernelError, MsgPriority, RegionHandle};
use crate::Result;
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct RingEntry {
pub length: u32,
pub priority: u8,
pub flags: u8,
pub sequence: u16,
}
impl RingEntry {
pub const HEADER_SIZE: usize = core::mem::size_of::<Self>();
pub const FLAG_DESCRIPTOR: u8 = 1 << 0;
pub const FLAG_CONTINUATION: u8 = 1 << 1;
pub const FLAG_FINAL: u8 = 1 << 2;
#[inline]
pub const fn new_inline(length: u32, priority: MsgPriority, sequence: u16) -> Self {
Self {
length,
priority: priority as u8,
flags: 0,
sequence,
}
}
#[inline]
pub const fn new_descriptor(priority: MsgPriority, sequence: u16) -> Self {
Self {
length: 24, priority: priority as u8,
flags: Self::FLAG_DESCRIPTOR,
sequence,
}
}
#[inline]
pub const fn is_descriptor(&self) -> bool {
(self.flags & Self::FLAG_DESCRIPTOR) != 0
}
#[inline]
pub const fn is_continuation(&self) -> bool {
(self.flags & Self::FLAG_CONTINUATION) != 0
}
#[inline]
pub fn priority(&self) -> MsgPriority {
MsgPriority::from_u8(self.priority).unwrap_or(MsgPriority::Normal)
}
}
#[derive(Debug, Clone, Default)]
pub struct RingStats {
pub enqueued: u64,
pub dequeued: u64,
pub bytes_enqueued: u64,
pub bytes_dequeued: u64,
pub full_count: u64,
pub empty_count: u64,
}
pub struct RingBuffer {
region: RegionHandle,
size: u32,
mask: u32,
max_msg_size: u32,
entry_size: u32,
sq_head: AtomicU32,
sq_tail: AtomicU32,
sequence: AtomicU32,
#[cfg(feature = "std")]
buffer: *mut u8,
#[cfg(feature = "std")]
buffer_len: usize,
#[cfg(feature = "stats")]
stats: RingStats,
}
unsafe impl Send for RingBuffer {}
unsafe impl Sync for RingBuffer {}
impl RingBuffer {
#[cfg(feature = "std")]
pub fn new(
region: RegionHandle,
size: u32,
max_msg_size: u32,
buffer: *mut u8,
buffer_len: usize,
) -> Result<Self> {
if size == 0 || (size & (size - 1)) != 0 {
return Err(KernelError::InvalidArgument);
}
let entry_size = RingEntry::HEADER_SIZE as u32 + max_msg_size;
let required_size = (size as usize) * (entry_size as usize);
if buffer_len < required_size {
return Err(KernelError::OutOfMemory);
}
Ok(Self {
region,
size,
mask: size - 1,
max_msg_size,
entry_size,
sq_head: AtomicU32::new(0),
sq_tail: AtomicU32::new(0),
sequence: AtomicU32::new(0),
buffer,
buffer_len,
#[cfg(feature = "stats")]
stats: RingStats::default(),
})
}
#[inline]
pub fn region(&self) -> RegionHandle {
self.region
}
#[inline]
pub fn size(&self) -> u32 {
self.size
}
#[inline]
pub fn max_msg_size(&self) -> u32 {
self.max_msg_size
}
#[inline]
pub fn len(&self) -> u32 {
let head = self.sq_head.load(Ordering::Acquire);
let tail = self.sq_tail.load(Ordering::Acquire);
head.wrapping_sub(tail)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() >= self.size
}
#[inline]
pub fn available(&self) -> u32 {
self.size.saturating_sub(self.len())
}
#[cfg(feature = "std")]
pub fn enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
if data.len() > self.max_msg_size as usize {
return Err(KernelError::MessageTooLarge);
}
let head = self.sq_head.load(Ordering::Relaxed);
let tail = self.sq_tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= self.size {
#[cfg(feature = "stats")]
{
self.stats.full_count += 1;
}
return Err(KernelError::QueueFull);
}
let index = head & self.mask;
let offset = (index as usize) * (self.entry_size as usize);
let seq = self.sequence.fetch_add(1, Ordering::Relaxed) as u16;
let entry = RingEntry::new_inline(data.len() as u32, priority, seq);
unsafe {
let entry_ptr = self.buffer.add(offset);
core::ptr::write(entry_ptr as *mut RingEntry, entry);
let data_ptr = entry_ptr.add(RingEntry::HEADER_SIZE);
core::ptr::copy_nonoverlapping(data.as_ptr(), data_ptr, data.len());
}
self.sq_head.store(head.wrapping_add(1), Ordering::Release);
#[cfg(feature = "stats")]
{
self.stats.enqueued += 1;
self.stats.bytes_enqueued += data.len() as u64;
}
Ok(())
}
#[cfg(feature = "std")]
pub fn enqueue_descriptor(
&mut self,
descriptor: &crate::MessageDescriptor,
priority: MsgPriority,
) -> Result<()> {
let head = self.sq_head.load(Ordering::Relaxed);
let tail = self.sq_tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= self.size {
#[cfg(feature = "stats")]
{
self.stats.full_count += 1;
}
return Err(KernelError::QueueFull);
}
let index = head & self.mask;
let offset = (index as usize) * (self.entry_size as usize);
let seq = self.sequence.fetch_add(1, Ordering::Relaxed) as u16;
let entry = RingEntry::new_descriptor(priority, seq);
unsafe {
let entry_ptr = self.buffer.add(offset);
core::ptr::write(entry_ptr as *mut RingEntry, entry);
let desc_ptr = entry_ptr.add(RingEntry::HEADER_SIZE);
core::ptr::write(desc_ptr as *mut crate::MessageDescriptor, *descriptor);
}
self.sq_head.store(head.wrapping_add(1), Ordering::Release);
#[cfg(feature = "stats")]
{
self.stats.enqueued += 1;
self.stats.bytes_enqueued += descriptor.length as u64;
}
Ok(())
}
#[cfg(feature = "std")]
pub fn dequeue(&mut self, buf: &mut [u8]) -> Result<RingEntry> {
let head = self.sq_head.load(Ordering::Acquire);
let tail = self.sq_tail.load(Ordering::Relaxed);
if head == tail {
#[cfg(feature = "stats")]
{
self.stats.empty_count += 1;
}
return Err(KernelError::QueueEmpty);
}
let index = tail & self.mask;
let offset = (index as usize) * (self.entry_size as usize);
let entry = unsafe {
let entry_ptr = self.buffer.add(offset);
core::ptr::read(entry_ptr as *const RingEntry)
};
let payload_len = entry.length as usize;
if payload_len > buf.len() {
return Err(KernelError::MessageTooLarge);
}
unsafe {
let data_ptr = self.buffer.add(offset + RingEntry::HEADER_SIZE);
core::ptr::copy_nonoverlapping(data_ptr, buf.as_mut_ptr(), payload_len);
}
self.sq_tail.store(tail.wrapping_add(1), Ordering::Release);
#[cfg(feature = "stats")]
{
self.stats.dequeued += 1;
self.stats.bytes_dequeued += payload_len as u64;
}
Ok(entry)
}
#[cfg(feature = "std")]
pub fn peek(&self) -> Option<RingEntry> {
let head = self.sq_head.load(Ordering::Acquire);
let tail = self.sq_tail.load(Ordering::Relaxed);
if head == tail {
return None;
}
let index = tail & self.mask;
let offset = (index as usize) * (self.entry_size as usize);
let entry = unsafe {
let entry_ptr = self.buffer.add(offset);
core::ptr::read(entry_ptr as *const RingEntry)
};
Some(entry)
}
#[cfg(feature = "stats")]
pub fn stats(&self) -> &RingStats {
&self.stats
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "std")]
#[test]
fn test_ring_entry_size() {
assert_eq!(RingEntry::HEADER_SIZE, 8);
}
#[cfg(feature = "std")]
#[test]
fn test_ring_buffer_basic() {
let mut backing = vec![0u8; 64 * (RingEntry::HEADER_SIZE + 4096)];
let mut ring = RingBuffer::new(
RegionHandle::null(),
64,
4096,
backing.as_mut_ptr(),
backing.len(),
)
.unwrap();
assert!(ring.is_empty());
assert!(!ring.is_full());
assert_eq!(ring.len(), 0);
assert_eq!(ring.available(), 64);
ring.enqueue(b"hello", MsgPriority::Normal).unwrap();
assert_eq!(ring.len(), 1);
assert!(!ring.is_empty());
let mut buf = [0u8; 4096];
let entry = ring.dequeue(&mut buf).unwrap();
assert_eq!(entry.length, 5);
assert_eq!(&buf[..5], b"hello");
assert!(ring.is_empty());
}
#[cfg(feature = "std")]
#[test]
fn test_ring_buffer_full() {
let mut backing = vec![0u8; 4 * 1024]; let mut ring = RingBuffer::new(
RegionHandle::null(),
4,
256,
backing.as_mut_ptr(),
backing.len(),
)
.unwrap();
for i in 0..4 {
let msg = format!("msg{}", i);
ring.enqueue(msg.as_bytes(), MsgPriority::Normal).unwrap();
}
assert!(ring.is_full());
let result = ring.enqueue(b"overflow", MsgPriority::Normal);
assert!(matches!(result, Err(KernelError::QueueFull)));
}
#[cfg(feature = "std")]
#[test]
fn test_ring_buffer_wraparound() {
let mut backing = vec![0u8; 4 * 1024];
let mut ring = RingBuffer::new(
RegionHandle::null(),
4,
256,
backing.as_mut_ptr(),
backing.len(),
)
.unwrap();
let mut buf = [0u8; 256];
for round in 0..10 {
for i in 0..4 {
let msg = format!("r{}m{}", round, i);
ring.enqueue(msg.as_bytes(), MsgPriority::Normal).unwrap();
}
for i in 0..4 {
let entry = ring.dequeue(&mut buf).unwrap();
let expected = format!("r{}m{}", round, i);
assert_eq!(&buf[..entry.length as usize], expected.as_bytes());
}
assert!(ring.is_empty());
}
}
#[cfg(feature = "std")]
#[test]
fn test_ring_buffer_priority() {
let mut backing = vec![0u8; 8 * 1024];
let mut ring = RingBuffer::new(
RegionHandle::null(),
8,
256,
backing.as_mut_ptr(),
backing.len(),
)
.unwrap();
ring.enqueue(b"low", MsgPriority::Low).unwrap();
ring.enqueue(b"high", MsgPriority::High).unwrap();
ring.enqueue(b"urgent", MsgPriority::Urgent).unwrap();
let mut buf = [0u8; 256];
let e1 = ring.dequeue(&mut buf).unwrap();
assert_eq!(e1.priority(), MsgPriority::Low);
let e2 = ring.dequeue(&mut buf).unwrap();
assert_eq!(e2.priority(), MsgPriority::High);
let e3 = ring.dequeue(&mut buf).unwrap();
assert_eq!(e3.priority(), MsgPriority::Urgent);
}
#[cfg(feature = "std")]
#[test]
fn test_ring_buffer_invalid_size() {
let mut backing = vec![0u8; 1024];
let result = RingBuffer::new(
RegionHandle::null(),
3,
256,
backing.as_mut_ptr(),
backing.len(),
);
assert!(matches!(result, Err(KernelError::InvalidArgument)));
let result = RingBuffer::new(
RegionHandle::null(),
0,
256,
backing.as_mut_ptr(),
backing.len(),
);
assert!(matches!(result, Err(KernelError::InvalidArgument)));
}
}