Skip to main content

ruvix_queue/
ring_optimized.rs

1//! Optimized lock-free ring buffer with power-of-2 masking (ADR-087).
2//!
3//! This module provides a high-performance ring buffer using:
4//! - Power-of-2 size for fast modulo via bitwise AND
5//! - Cache-line aligned entries for optimal memory access
6//! - Zero-copy descriptor mode for large messages
7//! - Lock-free operation with atomic head/tail pointers
8//!
9//! Performance targets:
10//! - Enqueue: <200ns
11//! - Dequeue: <200ns
12//! - Single cache line access per operation
13
14use core::sync::atomic::{AtomicU32, Ordering};
15
16use ruvix_types::{KernelError, MsgPriority, RegionHandle};
17
18use crate::Result;
19
20/// Cache line size for alignment.
21const CACHE_LINE_SIZE: usize = 64;
22
23/// Optimized ring entry header (8 bytes).
24#[derive(Debug, Clone, Copy)]
25#[repr(C)]
26pub struct OptimizedRingEntry {
27    /// Length of the payload in bytes.
28    pub length: u16,
29    /// Message priority (0-3).
30    pub priority: u8,
31    /// Entry flags.
32    pub flags: u8,
33    /// Sequence number for ordering.
34    pub sequence: u32,
35}
36
37impl OptimizedRingEntry {
38    /// Size of the entry header.
39    pub const HEADER_SIZE: usize = 8;
40
41    /// Flag indicating this entry contains a descriptor.
42    pub const FLAG_DESCRIPTOR: u8 = 1 << 0;
43
44    /// Flag indicating entry is valid (ready to consume).
45    pub const FLAG_VALID: u8 = 1 << 1;
46
47    /// Creates a new inline data entry.
48    #[inline]
49    pub const fn new_inline(length: u16, priority: MsgPriority, sequence: u32) -> Self {
50        Self {
51            length,
52            priority: priority as u8,
53            flags: Self::FLAG_VALID,
54            sequence,
55        }
56    }
57
58    /// Creates a new descriptor entry.
59    #[inline]
60    pub const fn new_descriptor(priority: MsgPriority, sequence: u32) -> Self {
61        Self {
62            length: 24, // Size of MessageDescriptor
63            priority: priority as u8,
64            flags: Self::FLAG_DESCRIPTOR | Self::FLAG_VALID,
65            sequence,
66        }
67    }
68
69    /// Creates an empty (invalid) entry.
70    #[inline]
71    pub const fn empty() -> Self {
72        Self {
73            length: 0,
74            priority: 0,
75            flags: 0,
76            sequence: 0,
77        }
78    }
79
80    /// Returns true if this entry is valid.
81    #[inline]
82    pub const fn is_valid(&self) -> bool {
83        (self.flags & Self::FLAG_VALID) != 0
84    }
85
86    /// Returns true if this entry contains a descriptor.
87    #[inline]
88    pub const fn is_descriptor(&self) -> bool {
89        (self.flags & Self::FLAG_DESCRIPTOR) != 0
90    }
91
92    /// Gets the priority as MsgPriority.
93    #[inline]
94    pub fn priority(&self) -> MsgPriority {
95        MsgPriority::from_u8(self.priority).unwrap_or(MsgPriority::Normal)
96    }
97}
98
99/// Cache-line aligned ring slot.
100///
101/// Each slot contains the entry header and inline payload data.
102/// Aligned to cache line for single cache line access per operation.
103#[derive(Clone, Copy)]
104#[repr(C, align(64))]
105pub struct OptimizedRingSlot {
106    /// Entry header.
107    pub entry: OptimizedRingEntry,
108    /// Inline payload data (56 bytes max for 64-byte slot).
109    pub payload: [u8; 56],
110}
111
112impl OptimizedRingSlot {
113    /// Maximum inline payload size.
114    pub const MAX_INLINE_SIZE: usize = 56;
115
116    /// Creates an empty slot.
117    #[inline]
118    pub const fn empty() -> Self {
119        Self {
120            entry: OptimizedRingEntry::empty(),
121            payload: [0; 56],
122        }
123    }
124}
125
126// Compile-time assertion that slot is exactly 64 bytes
127const _: () = assert!(core::mem::size_of::<OptimizedRingSlot>() == CACHE_LINE_SIZE);
128
129/// Optimized lock-free ring buffer with power-of-2 size.
130///
131/// Uses atomic operations for thread-safe enqueue/dequeue.
132/// Power-of-2 size enables fast modulo via bitwise AND.
133pub struct OptimizedRingBuffer<const N: usize = 64> {
134    /// The backing memory region handle.
135    region: RegionHandle,
136
137    /// Ring slots array (cache-line aligned).
138    slots: [OptimizedRingSlot; N],
139
140    /// Mask for index calculation (N - 1).
141    mask: u32,
142
143    /// Producer head (where next enqueue goes).
144    head: AtomicU32,
145
146    /// Consumer tail (where next dequeue comes from).
147    tail: AtomicU32,
148
149    /// Sequence counter for ordering.
150    sequence: AtomicU32,
151}
152
153impl<const N: usize> OptimizedRingBuffer<N> {
154    /// Creates a new optimized ring buffer.
155    ///
156    /// # Arguments
157    ///
158    /// * `region` - Handle to the backing region
159    ///
160    /// # Panics
161    ///
162    /// Panics if N is not a power of 2.
163    #[must_use]
164    pub fn new(region: RegionHandle) -> Self {
165        // Compile-time check would be ideal, but we verify at runtime
166        assert!(N > 0 && (N & (N - 1)) == 0, "Ring size must be power of 2");
167
168        Self {
169            region,
170            slots: [OptimizedRingSlot::empty(); N],
171            mask: (N - 1) as u32,
172            head: AtomicU32::new(0),
173            tail: AtomicU32::new(0),
174            sequence: AtomicU32::new(0),
175        }
176    }
177
178    /// Returns the region handle.
179    #[inline]
180    pub fn region(&self) -> RegionHandle {
181        self.region
182    }
183
184    /// Returns the ring capacity.
185    #[inline]
186    pub const fn capacity(&self) -> usize {
187        N
188    }
189
190    /// Returns the number of entries currently in the ring.
191    #[inline]
192    pub fn len(&self) -> u32 {
193        let head = self.head.load(Ordering::Acquire);
194        let tail = self.tail.load(Ordering::Acquire);
195        head.wrapping_sub(tail)
196    }
197
198    /// Returns true if the ring is empty.
199    #[inline]
200    pub fn is_empty(&self) -> bool {
201        self.len() == 0
202    }
203
204    /// Returns true if the ring is full.
205    #[inline]
206    pub fn is_full(&self) -> bool {
207        self.len() >= N as u32
208    }
209
210    /// Returns the number of free slots.
211    #[inline]
212    pub fn available(&self) -> u32 {
213        (N as u32).saturating_sub(self.len())
214    }
215
216    /// Enqueues a message into the ring buffer (<200ns target).
217    ///
218    /// # Arguments
219    ///
220    /// * `data` - Message payload (max 56 bytes for inline)
221    /// * `priority` - Message priority
222    ///
223    /// # Errors
224    ///
225    /// Returns `QueueFull` if the ring is full.
226    /// Returns `MessageTooLarge` if the message exceeds inline limit.
227    #[inline]
228    pub fn enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
229        if data.len() > OptimizedRingSlot::MAX_INLINE_SIZE {
230            return Err(KernelError::MessageTooLarge);
231        }
232
233        // Check if there's space (single atomic load)
234        let head = self.head.load(Ordering::Relaxed);
235        let tail = self.tail.load(Ordering::Acquire);
236
237        if head.wrapping_sub(tail) >= N as u32 {
238            return Err(KernelError::QueueFull);
239        }
240
241        // Calculate slot index using mask (fast modulo)
242        let index = (head & self.mask) as usize;
243
244        // Get sequence number
245        let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
246
247        // Write to slot (single cache line)
248        let slot = &mut self.slots[index];
249        slot.entry = OptimizedRingEntry::new_inline(data.len() as u16, priority, seq);
250        slot.payload[..data.len()].copy_from_slice(data);
251
252        // Publish the entry (release semantics)
253        self.head.store(head.wrapping_add(1), Ordering::Release);
254
255        Ok(())
256    }
257
258    /// Dequeues a message from the ring buffer (<200ns target).
259    ///
260    /// # Arguments
261    ///
262    /// * `buf` - Buffer to receive the message data
263    ///
264    /// # Returns
265    ///
266    /// On success, returns the entry header and number of bytes copied.
267    ///
268    /// # Errors
269    ///
270    /// Returns `QueueEmpty` if the ring is empty.
271    #[inline]
272    pub fn dequeue(&mut self, buf: &mut [u8]) -> Result<(OptimizedRingEntry, usize)> {
273        // Check if there's data (single atomic load)
274        let head = self.head.load(Ordering::Acquire);
275        let tail = self.tail.load(Ordering::Relaxed);
276
277        if head == tail {
278            return Err(KernelError::QueueEmpty);
279        }
280
281        // Calculate slot index using mask (fast modulo)
282        let index = (tail & self.mask) as usize;
283
284        // Read from slot (single cache line)
285        let slot = &self.slots[index];
286        let entry = slot.entry;
287
288        let copy_len = (entry.length as usize).min(buf.len());
289        buf[..copy_len].copy_from_slice(&slot.payload[..copy_len]);
290
291        // Advance tail (release semantics)
292        self.tail.store(tail.wrapping_add(1), Ordering::Release);
293
294        Ok((entry, copy_len))
295    }
296
297    /// Peeks at the next entry without removing it.
298    #[inline]
299    pub fn peek(&self) -> Option<&OptimizedRingEntry> {
300        let head = self.head.load(Ordering::Acquire);
301        let tail = self.tail.load(Ordering::Relaxed);
302
303        if head == tail {
304            return None;
305        }
306
307        let index = (tail & self.mask) as usize;
308        Some(&self.slots[index].entry)
309    }
310
311    /// Tries to enqueue without blocking, returns immediately.
312    #[inline]
313    pub fn try_enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
314        self.enqueue(data, priority)
315    }
316
317    /// Tries to dequeue without blocking, returns immediately.
318    #[inline]
319    pub fn try_dequeue(&mut self, buf: &mut [u8]) -> Result<(OptimizedRingEntry, usize)> {
320        self.dequeue(buf)
321    }
322
323    /// Clears all entries from the ring.
324    pub fn clear(&mut self) {
325        let head = self.head.load(Ordering::Relaxed);
326        self.tail.store(head, Ordering::Release);
327    }
328}
329
330impl<const N: usize> Default for OptimizedRingBuffer<N> {
331    fn default() -> Self {
332        Self::new(RegionHandle::null())
333    }
334}
335
336// SAFETY: OptimizedRingBuffer uses atomic operations for thread safety.
337unsafe impl<const N: usize> Send for OptimizedRingBuffer<N> {}
338unsafe impl<const N: usize> Sync for OptimizedRingBuffer<N> {}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn test_ring_slot_size() {
346        assert_eq!(core::mem::size_of::<OptimizedRingSlot>(), 64);
347        assert_eq!(core::mem::align_of::<OptimizedRingSlot>(), 64);
348    }
349
350    #[test]
351    fn test_ring_entry_size() {
352        assert_eq!(core::mem::size_of::<OptimizedRingEntry>(), 8);
353    }
354
355    #[test]
356    fn test_optimized_ring_basic() {
357        let mut ring = OptimizedRingBuffer::<64>::new(RegionHandle::null());
358
359        assert!(ring.is_empty());
360        assert!(!ring.is_full());
361        assert_eq!(ring.len(), 0);
362        assert_eq!(ring.available(), 64);
363
364        // Enqueue
365        ring.enqueue(b"hello", MsgPriority::Normal).unwrap();
366        assert_eq!(ring.len(), 1);
367        assert!(!ring.is_empty());
368
369        // Dequeue
370        let mut buf = [0u8; 56];
371        let (entry, len) = ring.dequeue(&mut buf).unwrap();
372        assert_eq!(len, 5);
373        assert_eq!(&buf[..5], b"hello");
374        assert_eq!(entry.length, 5);
375        assert!(ring.is_empty());
376    }
377
378    #[test]
379    fn test_optimized_ring_full() {
380        let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
381
382        // Fill the ring
383        for i in 0..4 {
384            let msg = [i as u8; 8];
385            ring.enqueue(&msg, MsgPriority::Normal).unwrap();
386        }
387
388        assert!(ring.is_full());
389
390        // Should fail on next enqueue
391        let result = ring.enqueue(b"overflow", MsgPriority::Normal);
392        assert!(matches!(result, Err(KernelError::QueueFull)));
393    }
394
395    #[test]
396    fn test_optimized_ring_wraparound() {
397        let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
398        let mut buf = [0u8; 56];
399
400        // Fill and drain multiple times to test wraparound
401        for round in 0..10 {
402            for i in 0..4 {
403                let msg = [round as u8, i as u8];
404                ring.enqueue(&msg, MsgPriority::Normal).unwrap();
405            }
406
407            for i in 0..4 {
408                let (_, len) = ring.dequeue(&mut buf).unwrap();
409                assert_eq!(len, 2);
410                assert_eq!(buf[0], round as u8);
411                assert_eq!(buf[1], i as u8);
412            }
413
414            assert!(ring.is_empty());
415        }
416    }
417
418    #[test]
419    fn test_optimized_ring_priority() {
420        let mut ring = OptimizedRingBuffer::<8>::new(RegionHandle::null());
421
422        ring.enqueue(b"low", MsgPriority::Low).unwrap();
423        ring.enqueue(b"high", MsgPriority::High).unwrap();
424        ring.enqueue(b"urgent", MsgPriority::Urgent).unwrap();
425
426        let mut buf = [0u8; 56];
427
428        let (e1, _) = ring.dequeue(&mut buf).unwrap();
429        assert_eq!(e1.priority(), MsgPriority::Low);
430
431        let (e2, _) = ring.dequeue(&mut buf).unwrap();
432        assert_eq!(e2.priority(), MsgPriority::High);
433
434        let (e3, _) = ring.dequeue(&mut buf).unwrap();
435        assert_eq!(e3.priority(), MsgPriority::Urgent);
436    }
437
438    #[test]
439    fn test_optimized_ring_peek() {
440        let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
441
442        assert!(ring.peek().is_none());
443
444        ring.enqueue(b"test", MsgPriority::Normal).unwrap();
445
446        let peeked = ring.peek().unwrap();
447        assert_eq!(peeked.length, 4);
448        assert!(peeked.is_valid());
449
450        // Peek should not consume
451        assert_eq!(ring.len(), 1);
452    }
453
454    #[test]
455    fn test_optimized_ring_message_too_large() {
456        let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
457
458        // Message larger than 56 bytes should fail
459        let large_msg = [0u8; 64];
460        let result = ring.enqueue(&large_msg, MsgPriority::Normal);
461        assert!(matches!(result, Err(KernelError::MessageTooLarge)));
462    }
463
464    #[test]
465    fn test_optimized_ring_clear() {
466        let mut ring = OptimizedRingBuffer::<4>::new(RegionHandle::null());
467
468        for i in 0..4 {
469            ring.enqueue(&[i], MsgPriority::Normal).unwrap();
470        }
471
472        assert!(ring.is_full());
473
474        ring.clear();
475
476        assert!(ring.is_empty());
477        assert_eq!(ring.len(), 0);
478    }
479
480    #[test]
481    fn test_power_of_2_sizes() {
482        // Test various power-of-2 sizes compile and work
483        let _r8 = OptimizedRingBuffer::<8>::default();
484        let _r16 = OptimizedRingBuffer::<16>::default();
485        let _r32 = OptimizedRingBuffer::<32>::default();
486        let _r64 = OptimizedRingBuffer::<64>::default();
487        let _r128 = OptimizedRingBuffer::<128>::default();
488        let _r256 = OptimizedRingBuffer::<256>::default();
489    }
490}