Skip to main content

ruvix_queue/
ring.rs

1//! Lock-free ring buffer implementation.
2//!
3//! This module provides the core ring buffer data structure used by kernel queues.
4//! The design is inspired by io_uring's submission/completion queues.
5
6use core::sync::atomic::{AtomicU32, Ordering};
7
8use ruvix_types::{KernelError, MsgPriority, RegionHandle};
9
10use crate::Result;
11
12/// A lock-free ring buffer entry.
13///
14/// Each entry contains a header followed by payload data. The header includes
15/// length, priority, flags, and sequence number for ordering.
16#[derive(Debug, Clone, Copy)]
17#[repr(C)]
18pub struct RingEntry {
19    /// Length of the payload in bytes.
20    pub length: u32,
21    /// Message priority (0-3).
22    pub priority: u8,
23    /// Entry flags.
24    pub flags: u8,
25    /// Sequence number for ordering.
26    pub sequence: u16,
27}
28
29impl RingEntry {
30    /// Size of a ring entry header in bytes.
31    pub const HEADER_SIZE: usize = core::mem::size_of::<Self>();
32
33    /// Flag indicating this entry contains a descriptor instead of inline data.
34    pub const FLAG_DESCRIPTOR: u8 = 1 << 0;
35
36    /// Flag indicating this entry is part of a multi-entry message.
37    pub const FLAG_CONTINUATION: u8 = 1 << 1;
38
39    /// Flag indicating this is the final entry of a multi-entry message.
40    pub const FLAG_FINAL: u8 = 1 << 2;
41
42    /// Create a new ring entry for inline data.
43    #[inline]
44    pub const fn new_inline(length: u32, priority: MsgPriority, sequence: u16) -> Self {
45        Self {
46            length,
47            priority: priority as u8,
48            flags: 0,
49            sequence,
50        }
51    }
52
53    /// Create a new ring entry for a descriptor.
54    #[inline]
55    pub const fn new_descriptor(priority: MsgPriority, sequence: u16) -> Self {
56        Self {
57            length: 24, // Size of MessageDescriptor
58            priority: priority as u8,
59            flags: Self::FLAG_DESCRIPTOR,
60            sequence,
61        }
62    }
63
64    /// Check if this entry contains a descriptor.
65    #[inline]
66    pub const fn is_descriptor(&self) -> bool {
67        (self.flags & Self::FLAG_DESCRIPTOR) != 0
68    }
69
70    /// Check if this is a continuation entry.
71    #[inline]
72    pub const fn is_continuation(&self) -> bool {
73        (self.flags & Self::FLAG_CONTINUATION) != 0
74    }
75
76    /// Get the priority as MsgPriority.
77    #[inline]
78    pub fn priority(&self) -> MsgPriority {
79        MsgPriority::from_u8(self.priority).unwrap_or(MsgPriority::Normal)
80    }
81}
82
83/// Statistics for ring buffer operations.
84#[derive(Debug, Clone, Default)]
85pub struct RingStats {
86    /// Total messages enqueued.
87    pub enqueued: u64,
88    /// Total messages dequeued.
89    pub dequeued: u64,
90    /// Total bytes enqueued.
91    pub bytes_enqueued: u64,
92    /// Total bytes dequeued.
93    pub bytes_dequeued: u64,
94    /// Number of times the ring was full.
95    pub full_count: u64,
96    /// Number of times the ring was empty.
97    pub empty_count: u64,
98}
99
100/// A lock-free ring buffer for message passing.
101///
102/// The ring buffer uses atomic head/tail pointers for lock-free operation.
103/// Messages are stored inline after the entry header, or as descriptors
104/// pointing to shared region data.
105pub struct RingBuffer {
106    /// The backing memory region handle.
107    region: RegionHandle,
108
109    /// Ring size (must be power of 2).
110    size: u32,
111
112    /// Mask for index calculation (size - 1).
113    mask: u32,
114
115    /// Maximum message size in bytes.
116    max_msg_size: u32,
117
118    /// Entry size (header + max payload).
119    entry_size: u32,
120
121    /// Submission queue head (producer writes).
122    sq_head: AtomicU32,
123
124    /// Submission queue tail (consumer advances).
125    sq_tail: AtomicU32,
126
127    /// Next sequence number.
128    sequence: AtomicU32,
129
130    /// Pointer to the ring buffer memory.
131    #[cfg(feature = "std")]
132    buffer: *mut u8,
133
134    /// Buffer length.
135    #[cfg(feature = "std")]
136    buffer_len: usize,
137
138    /// Statistics.
139    #[cfg(feature = "stats")]
140    stats: RingStats,
141}
142
143// SAFETY: RingBuffer uses atomic operations for thread safety.
144// The buffer pointer is only accessed with proper synchronization.
145unsafe impl Send for RingBuffer {}
146unsafe impl Sync for RingBuffer {}
147
148impl RingBuffer {
149    /// Create a new ring buffer.
150    ///
151    /// # Arguments
152    ///
153    /// * `region` - Handle to the backing region
154    /// * `size` - Number of entries (must be power of 2)
155    /// * `max_msg_size` - Maximum message size in bytes
156    /// * `buffer` - Pointer to the ring buffer memory
157    /// * `buffer_len` - Length of the buffer
158    ///
159    /// # Errors
160    ///
161    /// Returns `InvalidParameter` if size is not a power of 2.
162    #[cfg(feature = "std")]
163    pub fn new(
164        region: RegionHandle,
165        size: u32,
166        max_msg_size: u32,
167        buffer: *mut u8,
168        buffer_len: usize,
169    ) -> Result<Self> {
170        // Validate size is power of 2
171        if size == 0 || (size & (size - 1)) != 0 {
172            return Err(KernelError::InvalidArgument);
173        }
174
175        let entry_size = RingEntry::HEADER_SIZE as u32 + max_msg_size;
176        let required_size = (size as usize) * (entry_size as usize);
177
178        if buffer_len < required_size {
179            return Err(KernelError::OutOfMemory);
180        }
181
182        Ok(Self {
183            region,
184            size,
185            mask: size - 1,
186            max_msg_size,
187            entry_size,
188            sq_head: AtomicU32::new(0),
189            sq_tail: AtomicU32::new(0),
190            sequence: AtomicU32::new(0),
191            buffer,
192            buffer_len,
193            #[cfg(feature = "stats")]
194            stats: RingStats::default(),
195        })
196    }
197
198    /// Returns the region handle.
199    #[inline]
200    pub fn region(&self) -> RegionHandle {
201        self.region
202    }
203
204    /// Returns the ring size.
205    #[inline]
206    pub fn size(&self) -> u32 {
207        self.size
208    }
209
210    /// Returns the maximum message size.
211    #[inline]
212    pub fn max_msg_size(&self) -> u32 {
213        self.max_msg_size
214    }
215
216    /// Returns the number of entries currently in the ring.
217    #[inline]
218    pub fn len(&self) -> u32 {
219        let head = self.sq_head.load(Ordering::Acquire);
220        let tail = self.sq_tail.load(Ordering::Acquire);
221        head.wrapping_sub(tail)
222    }
223
224    /// Returns true if the ring is empty.
225    #[inline]
226    pub fn is_empty(&self) -> bool {
227        self.len() == 0
228    }
229
230    /// Returns true if the ring is full.
231    #[inline]
232    pub fn is_full(&self) -> bool {
233        self.len() >= self.size
234    }
235
236    /// Returns the number of free slots.
237    #[inline]
238    pub fn available(&self) -> u32 {
239        self.size.saturating_sub(self.len())
240    }
241
242    /// Enqueue a message into the ring buffer.
243    ///
244    /// # Arguments
245    ///
246    /// * `data` - Message payload
247    /// * `priority` - Message priority
248    ///
249    /// # Errors
250    ///
251    /// Returns `QueueFull` if the ring is full.
252    /// Returns `MessageTooLarge` if the message exceeds max_msg_size.
253    #[cfg(feature = "std")]
254    pub fn enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
255        if data.len() > self.max_msg_size as usize {
256            return Err(KernelError::MessageTooLarge);
257        }
258
259        // Check if there's space
260        let head = self.sq_head.load(Ordering::Relaxed);
261        let tail = self.sq_tail.load(Ordering::Acquire);
262
263        if head.wrapping_sub(tail) >= self.size {
264            #[cfg(feature = "stats")]
265            {
266                self.stats.full_count += 1;
267            }
268            return Err(KernelError::QueueFull);
269        }
270
271        // Calculate entry offset
272        let index = head & self.mask;
273        let offset = (index as usize) * (self.entry_size as usize);
274
275        // Get sequence number
276        let seq = self.sequence.fetch_add(1, Ordering::Relaxed) as u16;
277
278        // Create entry header
279        let entry = RingEntry::new_inline(data.len() as u32, priority, seq);
280
281        // Write header and data
282        // SAFETY: We've verified bounds and have exclusive write access
283        unsafe {
284            let entry_ptr = self.buffer.add(offset);
285            core::ptr::write(entry_ptr as *mut RingEntry, entry);
286
287            let data_ptr = entry_ptr.add(RingEntry::HEADER_SIZE);
288            core::ptr::copy_nonoverlapping(data.as_ptr(), data_ptr, data.len());
289        }
290
291        // Publish the entry
292        self.sq_head.store(head.wrapping_add(1), Ordering::Release);
293
294        #[cfg(feature = "stats")]
295        {
296            self.stats.enqueued += 1;
297            self.stats.bytes_enqueued += data.len() as u64;
298        }
299
300        Ok(())
301    }
302
303    /// Enqueue a descriptor into the ring buffer.
304    ///
305    /// This is used for zero-copy message passing when the data is in a shared region.
306    ///
307    /// # Arguments
308    ///
309    /// * `descriptor` - The message descriptor (region, offset, length)
310    /// * `priority` - Message priority
311    ///
312    /// # Errors
313    ///
314    /// Returns `QueueFull` if the ring is full.
315    #[cfg(feature = "std")]
316    pub fn enqueue_descriptor(
317        &mut self,
318        descriptor: &crate::MessageDescriptor,
319        priority: MsgPriority,
320    ) -> Result<()> {
321        // Check if there's space
322        let head = self.sq_head.load(Ordering::Relaxed);
323        let tail = self.sq_tail.load(Ordering::Acquire);
324
325        if head.wrapping_sub(tail) >= self.size {
326            #[cfg(feature = "stats")]
327            {
328                self.stats.full_count += 1;
329            }
330            return Err(KernelError::QueueFull);
331        }
332
333        // Calculate entry offset
334        let index = head & self.mask;
335        let offset = (index as usize) * (self.entry_size as usize);
336
337        // Get sequence number
338        let seq = self.sequence.fetch_add(1, Ordering::Relaxed) as u16;
339
340        // Create entry header
341        let entry = RingEntry::new_descriptor(priority, seq);
342
343        // Write header and descriptor
344        // SAFETY: We've verified bounds and have exclusive write access
345        unsafe {
346            let entry_ptr = self.buffer.add(offset);
347            core::ptr::write(entry_ptr as *mut RingEntry, entry);
348
349            let desc_ptr = entry_ptr.add(RingEntry::HEADER_SIZE);
350            core::ptr::write(desc_ptr as *mut crate::MessageDescriptor, *descriptor);
351        }
352
353        // Publish the entry
354        self.sq_head.store(head.wrapping_add(1), Ordering::Release);
355
356        #[cfg(feature = "stats")]
357        {
358            self.stats.enqueued += 1;
359            self.stats.bytes_enqueued += descriptor.length as u64;
360        }
361
362        Ok(())
363    }
364
365    /// Dequeue a message from the ring buffer.
366    ///
367    /// # Arguments
368    ///
369    /// * `buf` - Buffer to receive the message data
370    ///
371    /// # Returns
372    ///
373    /// On success, returns the ring entry header. For inline data, the actual
374    /// data is copied to `buf`. For descriptors, the descriptor is in `buf`.
375    ///
376    /// # Errors
377    ///
378    /// Returns `QueueEmpty` if the ring is empty.
379    #[cfg(feature = "std")]
380    pub fn dequeue(&mut self, buf: &mut [u8]) -> Result<RingEntry> {
381        // Check if there's data
382        let head = self.sq_head.load(Ordering::Acquire);
383        let tail = self.sq_tail.load(Ordering::Relaxed);
384
385        if head == tail {
386            #[cfg(feature = "stats")]
387            {
388                self.stats.empty_count += 1;
389            }
390            return Err(KernelError::QueueEmpty);
391        }
392
393        // Calculate entry offset
394        let index = tail & self.mask;
395        let offset = (index as usize) * (self.entry_size as usize);
396
397        // Read entry
398        // SAFETY: We've verified there's a valid entry at this position
399        let entry = unsafe {
400            let entry_ptr = self.buffer.add(offset);
401            core::ptr::read(entry_ptr as *const RingEntry)
402        };
403
404        // Read payload
405        let payload_len = entry.length as usize;
406        if payload_len > buf.len() {
407            return Err(KernelError::MessageTooLarge);
408        }
409
410        // SAFETY: We've verified bounds
411        unsafe {
412            let data_ptr = self.buffer.add(offset + RingEntry::HEADER_SIZE);
413            core::ptr::copy_nonoverlapping(data_ptr, buf.as_mut_ptr(), payload_len);
414        }
415
416        // Advance tail
417        self.sq_tail.store(tail.wrapping_add(1), Ordering::Release);
418
419        #[cfg(feature = "stats")]
420        {
421            self.stats.dequeued += 1;
422            self.stats.bytes_dequeued += payload_len as u64;
423        }
424
425        Ok(entry)
426    }
427
428    /// Peek at the next entry without removing it.
429    #[cfg(feature = "std")]
430    pub fn peek(&self) -> Option<RingEntry> {
431        let head = self.sq_head.load(Ordering::Acquire);
432        let tail = self.sq_tail.load(Ordering::Relaxed);
433
434        if head == tail {
435            return None;
436        }
437
438        let index = tail & self.mask;
439        let offset = (index as usize) * (self.entry_size as usize);
440
441        // SAFETY: We've verified there's a valid entry
442        let entry = unsafe {
443            let entry_ptr = self.buffer.add(offset);
444            core::ptr::read(entry_ptr as *const RingEntry)
445        };
446
447        Some(entry)
448    }
449
450    /// Get ring buffer statistics.
451    #[cfg(feature = "stats")]
452    pub fn stats(&self) -> &RingStats {
453        &self.stats
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    #[cfg(feature = "std")]
462    #[test]
463    fn test_ring_entry_size() {
464        assert_eq!(RingEntry::HEADER_SIZE, 8);
465    }
466
467    #[cfg(feature = "std")]
468    #[test]
469    fn test_ring_buffer_basic() {
470        // Need: size * (header_size + max_msg_size) = 64 * (8 + 4096) = 262656 bytes
471        let mut backing = vec![0u8; 64 * (RingEntry::HEADER_SIZE + 4096)];
472        let mut ring = RingBuffer::new(
473            RegionHandle::null(),
474            64,
475            4096,
476            backing.as_mut_ptr(),
477            backing.len(),
478        )
479        .unwrap();
480
481        assert!(ring.is_empty());
482        assert!(!ring.is_full());
483        assert_eq!(ring.len(), 0);
484        assert_eq!(ring.available(), 64);
485
486        // Enqueue
487        ring.enqueue(b"hello", MsgPriority::Normal).unwrap();
488        assert_eq!(ring.len(), 1);
489        assert!(!ring.is_empty());
490
491        // Dequeue
492        let mut buf = [0u8; 4096];
493        let entry = ring.dequeue(&mut buf).unwrap();
494        assert_eq!(entry.length, 5);
495        assert_eq!(&buf[..5], b"hello");
496        assert!(ring.is_empty());
497    }
498
499    #[cfg(feature = "std")]
500    #[test]
501    fn test_ring_buffer_full() {
502        let mut backing = vec![0u8; 4 * 1024]; // Small buffer for 4 entries
503        let mut ring = RingBuffer::new(
504            RegionHandle::null(),
505            4,
506            256,
507            backing.as_mut_ptr(),
508            backing.len(),
509        )
510        .unwrap();
511
512        // Fill the ring
513        for i in 0..4 {
514            let msg = format!("msg{}", i);
515            ring.enqueue(msg.as_bytes(), MsgPriority::Normal).unwrap();
516        }
517
518        assert!(ring.is_full());
519
520        // Should fail on next enqueue
521        let result = ring.enqueue(b"overflow", MsgPriority::Normal);
522        assert!(matches!(result, Err(KernelError::QueueFull)));
523    }
524
525    #[cfg(feature = "std")]
526    #[test]
527    fn test_ring_buffer_wraparound() {
528        let mut backing = vec![0u8; 4 * 1024];
529        let mut ring = RingBuffer::new(
530            RegionHandle::null(),
531            4,
532            256,
533            backing.as_mut_ptr(),
534            backing.len(),
535        )
536        .unwrap();
537
538        let mut buf = [0u8; 256];
539
540        // Fill and drain multiple times to test wraparound
541        for round in 0..10 {
542            for i in 0..4 {
543                let msg = format!("r{}m{}", round, i);
544                ring.enqueue(msg.as_bytes(), MsgPriority::Normal).unwrap();
545            }
546
547            for i in 0..4 {
548                let entry = ring.dequeue(&mut buf).unwrap();
549                let expected = format!("r{}m{}", round, i);
550                assert_eq!(&buf[..entry.length as usize], expected.as_bytes());
551            }
552
553            assert!(ring.is_empty());
554        }
555    }
556
557    #[cfg(feature = "std")]
558    #[test]
559    fn test_ring_buffer_priority() {
560        let mut backing = vec![0u8; 8 * 1024];
561        let mut ring = RingBuffer::new(
562            RegionHandle::null(),
563            8,
564            256,
565            backing.as_mut_ptr(),
566            backing.len(),
567        )
568        .unwrap();
569
570        // Enqueue with different priorities
571        ring.enqueue(b"low", MsgPriority::Low).unwrap();
572        ring.enqueue(b"high", MsgPriority::High).unwrap();
573        ring.enqueue(b"urgent", MsgPriority::Urgent).unwrap();
574
575        // Note: This basic ring doesn't reorder by priority.
576        // Priority ordering is handled at a higher level in KernelQueue.
577        let mut buf = [0u8; 256];
578
579        let e1 = ring.dequeue(&mut buf).unwrap();
580        assert_eq!(e1.priority(), MsgPriority::Low);
581
582        let e2 = ring.dequeue(&mut buf).unwrap();
583        assert_eq!(e2.priority(), MsgPriority::High);
584
585        let e3 = ring.dequeue(&mut buf).unwrap();
586        assert_eq!(e3.priority(), MsgPriority::Urgent);
587    }
588
589    #[cfg(feature = "std")]
590    #[test]
591    fn test_ring_buffer_invalid_size() {
592        let mut backing = vec![0u8; 1024];
593
594        // Size not power of 2
595        let result = RingBuffer::new(
596            RegionHandle::null(),
597            3,
598            256,
599            backing.as_mut_ptr(),
600            backing.len(),
601        );
602        assert!(matches!(result, Err(KernelError::InvalidArgument)));
603
604        // Size is 0
605        let result = RingBuffer::new(
606            RegionHandle::null(),
607            0,
608            256,
609            backing.as_mut_ptr(),
610            backing.len(),
611        );
612        assert!(matches!(result, Err(KernelError::InvalidArgument)));
613    }
614}