use core::sync::atomic::{AtomicU32, Ordering};
use ruvix_types::{KernelError, MsgPriority, RegionHandle};
use crate::Result;
const CACHE_LINE_SIZE: usize = 64;
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct OptimizedRingEntry {
pub length: u16,
pub priority: u8,
pub flags: u8,
pub sequence: u32,
}
impl OptimizedRingEntry {
pub const HEADER_SIZE: usize = 8;
pub const FLAG_DESCRIPTOR: u8 = 1 << 0;
pub const FLAG_VALID: u8 = 1 << 1;
#[inline]
pub const fn new_inline(length: u16, priority: MsgPriority, sequence: u32) -> Self {
Self {
length,
priority: priority as u8,
flags: Self::FLAG_VALID,
sequence,
}
}
#[inline]
pub const fn new_descriptor(priority: MsgPriority, sequence: u32) -> Self {
Self {
length: 24, priority: priority as u8,
flags: Self::FLAG_DESCRIPTOR | Self::FLAG_VALID,
sequence,
}
}
#[inline]
pub const fn empty() -> Self {
Self {
length: 0,
priority: 0,
flags: 0,
sequence: 0,
}
}
#[inline]
pub const fn is_valid(&self) -> bool {
(self.flags & Self::FLAG_VALID) != 0
}
#[inline]
pub const fn is_descriptor(&self) -> bool {
(self.flags & Self::FLAG_DESCRIPTOR) != 0
}
#[inline]
pub fn priority(&self) -> MsgPriority {
MsgPriority::from_u8(self.priority).unwrap_or(MsgPriority::Normal)
}
}
#[derive(Clone, Copy)]
#[repr(C, align(64))]
pub struct OptimizedRingSlot {
pub entry: OptimizedRingEntry,
pub payload: [u8; 56],
}
impl OptimizedRingSlot {
pub const MAX_INLINE_SIZE: usize = 56;
#[inline]
pub const fn empty() -> Self {
Self {
entry: OptimizedRingEntry::empty(),
payload: [0; 56],
}
}
}
const _: () = assert!(core::mem::size_of::<OptimizedRingSlot>() == CACHE_LINE_SIZE);
pub struct OptimizedRingBuffer<const N: usize = 64> {
region: RegionHandle,
slots: [OptimizedRingSlot; N],
mask: u32,
head: AtomicU32,
tail: AtomicU32,
sequence: AtomicU32,
}
impl<const N: usize> OptimizedRingBuffer<N> {
#[must_use]
pub fn new(region: RegionHandle) -> Self {
assert!(N > 0 && (N & (N - 1)) == 0, "Ring size must be power of 2");
Self {
region,
slots: [OptimizedRingSlot::empty(); N],
mask: (N - 1) as u32,
head: AtomicU32::new(0),
tail: AtomicU32::new(0),
sequence: AtomicU32::new(0),
}
}
#[inline]
pub fn region(&self) -> RegionHandle {
self.region
}
#[inline]
pub const fn capacity(&self) -> usize {
N
}
#[inline]
pub fn len(&self) -> u32 {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
head.wrapping_sub(tail)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() >= N as u32
}
#[inline]
pub fn available(&self) -> u32 {
(N as u32).saturating_sub(self.len())
}
#[inline]
pub fn enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
if data.len() > OptimizedRingSlot::MAX_INLINE_SIZE {
return Err(KernelError::MessageTooLarge);
}
let head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= N as u32 {
return Err(KernelError::QueueFull);
}
let index = (head & self.mask) as usize;
let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
let slot = &mut self.slots[index];
slot.entry = OptimizedRingEntry::new_inline(data.len() as u16, priority, seq);
slot.payload[..data.len()].copy_from_slice(data);
self.head.store(head.wrapping_add(1), Ordering::Release);
Ok(())
}
#[inline]
pub fn dequeue(&mut self, buf: &mut [u8]) -> Result<(OptimizedRingEntry, usize)> {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Relaxed);
if head == tail {
return Err(KernelError::QueueEmpty);
}
let index = (tail & self.mask) as usize;
let slot = &self.slots[index];
let entry = slot.entry;
let copy_len = (entry.length as usize).min(buf.len());
buf[..copy_len].copy_from_slice(&slot.payload[..copy_len]);
self.tail.store(tail.wrapping_add(1), Ordering::Release);
Ok((entry, copy_len))
}
#[inline]
pub fn peek(&self) -> Option<&OptimizedRingEntry> {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Relaxed);
if head == tail {
return None;
}
let index = (tail & self.mask) as usize;
Some(&self.slots[index].entry)
}
#[inline]
pub fn try_enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
self.enqueue(data, priority)
}
#[inline]
pub fn try_dequeue(&mut self, buf: &mut [u8]) -> Result<(OptimizedRingEntry, usize)> {
self.dequeue(buf)
}
pub fn clear(&mut self) {
let head = self.head.load(Ordering::Relaxed);
self.tail.store(head, Ordering::Release);
}
}
impl<const N: usize> Default for OptimizedRingBuffer<N> {
fn default() -> Self {
Self::new(RegionHandle::null())
}
}
unsafe impl<const N: usize> Send for OptimizedRingBuffer<N> {}
unsafe impl<const N: usize> Sync for OptimizedRingBuffer<N> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ring_slot_size() {
assert_eq!(core::mem::size_of::<OptimizedRingSlot>(), 64);
assert_eq!(core::mem::align_of::<OptimizedRingSlot>(), 64);
}
#[test]
fn test_ring_entry_size() {
assert_eq!(core::mem::size_of::<OptimizedRingEntry>(), 8);
}
#[test]
fn test_optimized_ring_basic() {
let mut ring = OptimizedRingBuffer::<64>::new(RegionHandle::null());
assert!(ring.is_empty());
assert!(!ring.is_full());
assert_eq!(ring.len(), 0);
assert_eq!(ring.available(), 64);
ring.enqueue(b"hello", MsgPriority::Normal).unwrap();
assert_eq!(ring.len(), 1);
assert!(!ring.is_empty());
let mut buf = [0u8; 56];
let (entry, len) = ring.dequeue(&mut buf).unwrap();
assert_eq!(len, 5);
assert_eq!(&buf[..5], b"hello");
assert_eq!(entry.length, 5);
assert!(ring.is_empty());
}
#[test]
fn test_optimized_ring_full() {
let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
for i in 0..4 {
let msg = [i as u8; 8];
ring.enqueue(&msg, MsgPriority::Normal).unwrap();
}
assert!(ring.is_full());
let result = ring.enqueue(b"overflow", MsgPriority::Normal);
assert!(matches!(result, Err(KernelError::QueueFull)));
}
#[test]
fn test_optimized_ring_wraparound() {
let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
let mut buf = [0u8; 56];
for round in 0..10 {
for i in 0..4 {
let msg = [round as u8, i as u8];
ring.enqueue(&msg, MsgPriority::Normal).unwrap();
}
for i in 0..4 {
let (_, len) = ring.dequeue(&mut buf).unwrap();
assert_eq!(len, 2);
assert_eq!(buf[0], round as u8);
assert_eq!(buf[1], i as u8);
}
assert!(ring.is_empty());
}
}
#[test]
fn test_optimized_ring_priority() {
let mut ring = OptimizedRingBuffer::<8>::new(RegionHandle::null());
ring.enqueue(b"low", MsgPriority::Low).unwrap();
ring.enqueue(b"high", MsgPriority::High).unwrap();
ring.enqueue(b"urgent", MsgPriority::Urgent).unwrap();
let mut buf = [0u8; 56];
let (e1, _) = ring.dequeue(&mut buf).unwrap();
assert_eq!(e1.priority(), MsgPriority::Low);
let (e2, _) = ring.dequeue(&mut buf).unwrap();
assert_eq!(e2.priority(), MsgPriority::High);
let (e3, _) = ring.dequeue(&mut buf).unwrap();
assert_eq!(e3.priority(), MsgPriority::Urgent);
}
#[test]
fn test_optimized_ring_peek() {
let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
assert!(ring.peek().is_none());
ring.enqueue(b"test", MsgPriority::Normal).unwrap();
let peeked = ring.peek().unwrap();
assert_eq!(peeked.length, 4);
assert!(peeked.is_valid());
assert_eq!(ring.len(), 1);
}
#[test]
fn test_optimized_ring_message_too_large() {
let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
let large_msg = [0u8; 64];
let result = ring.enqueue(&large_msg, MsgPriority::Normal);
assert!(matches!(result, Err(KernelError::MessageTooLarge)));
}
#[test]
fn test_optimized_ring_clear() {
let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
for i in 0..4 {
ring.enqueue(&[i], MsgPriority::Normal).unwrap();
}
assert!(ring.is_full());
ring.clear();
assert!(ring.is_empty());
assert_eq!(ring.len(), 0);
}
#[test]
fn test_power_of_2_sizes() {
let _r8 = OptimizedRingBuffer::<8>::default();
let _r16 = OptimizedRingBuffer::<16>::default();
let _r32 = OptimizedRingBuffer::<32>::default();
let _r64 = OptimizedRingBuffer::<64>::default();
let _r128 = OptimizedRingBuffer::<128>::default();
let _r256 = OptimizedRingBuffer::<256>::default();
}
}