1use core::sync::atomic::{AtomicU32, Ordering};
7
8use ruvix_types::{KernelError, MsgPriority, RegionHandle};
9
10use crate::Result;
11
12#[derive(Debug, Clone, Copy)]
17#[repr(C)]
18pub struct RingEntry {
19 pub length: u32,
21 pub priority: u8,
23 pub flags: u8,
25 pub sequence: u16,
27}
28
29impl RingEntry {
30 pub const HEADER_SIZE: usize = core::mem::size_of::<Self>();
32
33 pub const FLAG_DESCRIPTOR: u8 = 1 << 0;
35
36 pub const FLAG_CONTINUATION: u8 = 1 << 1;
38
39 pub const FLAG_FINAL: u8 = 1 << 2;
41
42 #[inline]
44 pub const fn new_inline(length: u32, priority: MsgPriority, sequence: u16) -> Self {
45 Self {
46 length,
47 priority: priority as u8,
48 flags: 0,
49 sequence,
50 }
51 }
52
53 #[inline]
55 pub const fn new_descriptor(priority: MsgPriority, sequence: u16) -> Self {
56 Self {
57 length: 24, priority: priority as u8,
59 flags: Self::FLAG_DESCRIPTOR,
60 sequence,
61 }
62 }
63
64 #[inline]
66 pub const fn is_descriptor(&self) -> bool {
67 (self.flags & Self::FLAG_DESCRIPTOR) != 0
68 }
69
70 #[inline]
72 pub const fn is_continuation(&self) -> bool {
73 (self.flags & Self::FLAG_CONTINUATION) != 0
74 }
75
76 #[inline]
78 pub fn priority(&self) -> MsgPriority {
79 MsgPriority::from_u8(self.priority).unwrap_or(MsgPriority::Normal)
80 }
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct RingStats {
86 pub enqueued: u64,
88 pub dequeued: u64,
90 pub bytes_enqueued: u64,
92 pub bytes_dequeued: u64,
94 pub full_count: u64,
96 pub empty_count: u64,
98}
99
100pub struct RingBuffer {
106 region: RegionHandle,
108
109 size: u32,
111
112 mask: u32,
114
115 max_msg_size: u32,
117
118 entry_size: u32,
120
121 sq_head: AtomicU32,
123
124 sq_tail: AtomicU32,
126
127 sequence: AtomicU32,
129
130 #[cfg(feature = "std")]
132 buffer: *mut u8,
133
134 #[cfg(feature = "std")]
136 buffer_len: usize,
137
138 #[cfg(feature = "stats")]
140 stats: RingStats,
141}
142
143unsafe impl Send for RingBuffer {}
146unsafe impl Sync for RingBuffer {}
147
148impl RingBuffer {
149 #[cfg(feature = "std")]
163 pub fn new(
164 region: RegionHandle,
165 size: u32,
166 max_msg_size: u32,
167 buffer: *mut u8,
168 buffer_len: usize,
169 ) -> Result<Self> {
170 if size == 0 || (size & (size - 1)) != 0 {
172 return Err(KernelError::InvalidArgument);
173 }
174
175 let entry_size = RingEntry::HEADER_SIZE as u32 + max_msg_size;
176 let required_size = (size as usize) * (entry_size as usize);
177
178 if buffer_len < required_size {
179 return Err(KernelError::OutOfMemory);
180 }
181
182 Ok(Self {
183 region,
184 size,
185 mask: size - 1,
186 max_msg_size,
187 entry_size,
188 sq_head: AtomicU32::new(0),
189 sq_tail: AtomicU32::new(0),
190 sequence: AtomicU32::new(0),
191 buffer,
192 buffer_len,
193 #[cfg(feature = "stats")]
194 stats: RingStats::default(),
195 })
196 }
197
198 #[inline]
200 pub fn region(&self) -> RegionHandle {
201 self.region
202 }
203
204 #[inline]
206 pub fn size(&self) -> u32 {
207 self.size
208 }
209
210 #[inline]
212 pub fn max_msg_size(&self) -> u32 {
213 self.max_msg_size
214 }
215
216 #[inline]
218 pub fn len(&self) -> u32 {
219 let head = self.sq_head.load(Ordering::Acquire);
220 let tail = self.sq_tail.load(Ordering::Acquire);
221 head.wrapping_sub(tail)
222 }
223
224 #[inline]
226 pub fn is_empty(&self) -> bool {
227 self.len() == 0
228 }
229
230 #[inline]
232 pub fn is_full(&self) -> bool {
233 self.len() >= self.size
234 }
235
236 #[inline]
238 pub fn available(&self) -> u32 {
239 self.size.saturating_sub(self.len())
240 }
241
242 #[cfg(feature = "std")]
254 pub fn enqueue(&mut self, data: &[u8], priority: MsgPriority) -> Result<()> {
255 if data.len() > self.max_msg_size as usize {
256 return Err(KernelError::MessageTooLarge);
257 }
258
259 let head = self.sq_head.load(Ordering::Relaxed);
261 let tail = self.sq_tail.load(Ordering::Acquire);
262
263 if head.wrapping_sub(tail) >= self.size {
264 #[cfg(feature = "stats")]
265 {
266 self.stats.full_count += 1;
267 }
268 return Err(KernelError::QueueFull);
269 }
270
271 let index = head & self.mask;
273 let offset = (index as usize) * (self.entry_size as usize);
274
275 let seq = self.sequence.fetch_add(1, Ordering::Relaxed) as u16;
277
278 let entry = RingEntry::new_inline(data.len() as u32, priority, seq);
280
281 unsafe {
284 let entry_ptr = self.buffer.add(offset);
285 core::ptr::write(entry_ptr as *mut RingEntry, entry);
286
287 let data_ptr = entry_ptr.add(RingEntry::HEADER_SIZE);
288 core::ptr::copy_nonoverlapping(data.as_ptr(), data_ptr, data.len());
289 }
290
291 self.sq_head.store(head.wrapping_add(1), Ordering::Release);
293
294 #[cfg(feature = "stats")]
295 {
296 self.stats.enqueued += 1;
297 self.stats.bytes_enqueued += data.len() as u64;
298 }
299
300 Ok(())
301 }
302
303 #[cfg(feature = "std")]
316 pub fn enqueue_descriptor(
317 &mut self,
318 descriptor: &crate::MessageDescriptor,
319 priority: MsgPriority,
320 ) -> Result<()> {
321 let head = self.sq_head.load(Ordering::Relaxed);
323 let tail = self.sq_tail.load(Ordering::Acquire);
324
325 if head.wrapping_sub(tail) >= self.size {
326 #[cfg(feature = "stats")]
327 {
328 self.stats.full_count += 1;
329 }
330 return Err(KernelError::QueueFull);
331 }
332
333 let index = head & self.mask;
335 let offset = (index as usize) * (self.entry_size as usize);
336
337 let seq = self.sequence.fetch_add(1, Ordering::Relaxed) as u16;
339
340 let entry = RingEntry::new_descriptor(priority, seq);
342
343 unsafe {
346 let entry_ptr = self.buffer.add(offset);
347 core::ptr::write(entry_ptr as *mut RingEntry, entry);
348
349 let desc_ptr = entry_ptr.add(RingEntry::HEADER_SIZE);
350 core::ptr::write(desc_ptr as *mut crate::MessageDescriptor, *descriptor);
351 }
352
353 self.sq_head.store(head.wrapping_add(1), Ordering::Release);
355
356 #[cfg(feature = "stats")]
357 {
358 self.stats.enqueued += 1;
359 self.stats.bytes_enqueued += descriptor.length as u64;
360 }
361
362 Ok(())
363 }
364
365 #[cfg(feature = "std")]
380 pub fn dequeue(&mut self, buf: &mut [u8]) -> Result<RingEntry> {
381 let head = self.sq_head.load(Ordering::Acquire);
383 let tail = self.sq_tail.load(Ordering::Relaxed);
384
385 if head == tail {
386 #[cfg(feature = "stats")]
387 {
388 self.stats.empty_count += 1;
389 }
390 return Err(KernelError::QueueEmpty);
391 }
392
393 let index = tail & self.mask;
395 let offset = (index as usize) * (self.entry_size as usize);
396
397 let entry = unsafe {
400 let entry_ptr = self.buffer.add(offset);
401 core::ptr::read(entry_ptr as *const RingEntry)
402 };
403
404 let payload_len = entry.length as usize;
406 if payload_len > buf.len() {
407 return Err(KernelError::MessageTooLarge);
408 }
409
410 unsafe {
412 let data_ptr = self.buffer.add(offset + RingEntry::HEADER_SIZE);
413 core::ptr::copy_nonoverlapping(data_ptr, buf.as_mut_ptr(), payload_len);
414 }
415
416 self.sq_tail.store(tail.wrapping_add(1), Ordering::Release);
418
419 #[cfg(feature = "stats")]
420 {
421 self.stats.dequeued += 1;
422 self.stats.bytes_dequeued += payload_len as u64;
423 }
424
425 Ok(entry)
426 }
427
428 #[cfg(feature = "std")]
430 pub fn peek(&self) -> Option<RingEntry> {
431 let head = self.sq_head.load(Ordering::Acquire);
432 let tail = self.sq_tail.load(Ordering::Relaxed);
433
434 if head == tail {
435 return None;
436 }
437
438 let index = tail & self.mask;
439 let offset = (index as usize) * (self.entry_size as usize);
440
441 let entry = unsafe {
443 let entry_ptr = self.buffer.add(offset);
444 core::ptr::read(entry_ptr as *const RingEntry)
445 };
446
447 Some(entry)
448 }
449
450 #[cfg(feature = "stats")]
452 pub fn stats(&self) -> &RingStats {
453 &self.stats
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[cfg(feature = "std")]
462 #[test]
463 fn test_ring_entry_size() {
464 assert_eq!(RingEntry::HEADER_SIZE, 8);
465 }
466
467 #[cfg(feature = "std")]
468 #[test]
469 fn test_ring_buffer_basic() {
470 let mut backing = vec![0u8; 64 * (RingEntry::HEADER_SIZE + 4096)];
472 let mut ring = RingBuffer::new(
473 RegionHandle::null(),
474 64,
475 4096,
476 backing.as_mut_ptr(),
477 backing.len(),
478 )
479 .unwrap();
480
481 assert!(ring.is_empty());
482 assert!(!ring.is_full());
483 assert_eq!(ring.len(), 0);
484 assert_eq!(ring.available(), 64);
485
486 ring.enqueue(b"hello", MsgPriority::Normal).unwrap();
488 assert_eq!(ring.len(), 1);
489 assert!(!ring.is_empty());
490
491 let mut buf = [0u8; 4096];
493 let entry = ring.dequeue(&mut buf).unwrap();
494 assert_eq!(entry.length, 5);
495 assert_eq!(&buf[..5], b"hello");
496 assert!(ring.is_empty());
497 }
498
499 #[cfg(feature = "std")]
500 #[test]
501 fn test_ring_buffer_full() {
502 let mut backing = vec![0u8; 4 * 1024]; let mut ring = RingBuffer::new(
504 RegionHandle::null(),
505 4,
506 256,
507 backing.as_mut_ptr(),
508 backing.len(),
509 )
510 .unwrap();
511
512 for i in 0..4 {
514 let msg = format!("msg{}", i);
515 ring.enqueue(msg.as_bytes(), MsgPriority::Normal).unwrap();
516 }
517
518 assert!(ring.is_full());
519
520 let result = ring.enqueue(b"overflow", MsgPriority::Normal);
522 assert!(matches!(result, Err(KernelError::QueueFull)));
523 }
524
525 #[cfg(feature = "std")]
526 #[test]
527 fn test_ring_buffer_wraparound() {
528 let mut backing = vec![0u8; 4 * 1024];
529 let mut ring = RingBuffer::new(
530 RegionHandle::null(),
531 4,
532 256,
533 backing.as_mut_ptr(),
534 backing.len(),
535 )
536 .unwrap();
537
538 let mut buf = [0u8; 256];
539
540 for round in 0..10 {
542 for i in 0..4 {
543 let msg = format!("r{}m{}", round, i);
544 ring.enqueue(msg.as_bytes(), MsgPriority::Normal).unwrap();
545 }
546
547 for i in 0..4 {
548 let entry = ring.dequeue(&mut buf).unwrap();
549 let expected = format!("r{}m{}", round, i);
550 assert_eq!(&buf[..entry.length as usize], expected.as_bytes());
551 }
552
553 assert!(ring.is_empty());
554 }
555 }
556
557 #[cfg(feature = "std")]
558 #[test]
559 fn test_ring_buffer_priority() {
560 let mut backing = vec![0u8; 8 * 1024];
561 let mut ring = RingBuffer::new(
562 RegionHandle::null(),
563 8,
564 256,
565 backing.as_mut_ptr(),
566 backing.len(),
567 )
568 .unwrap();
569
570 ring.enqueue(b"low", MsgPriority::Low).unwrap();
572 ring.enqueue(b"high", MsgPriority::High).unwrap();
573 ring.enqueue(b"urgent", MsgPriority::Urgent).unwrap();
574
575 let mut buf = [0u8; 256];
578
579 let e1 = ring.dequeue(&mut buf).unwrap();
580 assert_eq!(e1.priority(), MsgPriority::Low);
581
582 let e2 = ring.dequeue(&mut buf).unwrap();
583 assert_eq!(e2.priority(), MsgPriority::High);
584
585 let e3 = ring.dequeue(&mut buf).unwrap();
586 assert_eq!(e3.priority(), MsgPriority::Urgent);
587 }
588
589 #[cfg(feature = "std")]
590 #[test]
591 fn test_ring_buffer_invalid_size() {
592 let mut backing = vec![0u8; 1024];
593
594 let result = RingBuffer::new(
596 RegionHandle::null(),
597 3,
598 256,
599 backing.as_mut_ptr(),
600 backing.len(),
601 );
602 assert!(matches!(result, Err(KernelError::InvalidArgument)));
603
604 let result = RingBuffer::new(
606 RegionHandle::null(),
607 0,
608 256,
609 backing.as_mut_ptr(),
610 backing.len(),
611 );
612 assert!(matches!(result, Err(KernelError::InvalidArgument)));
613 }
614}