1use core::sync::atomic::{AtomicU32, Ordering};
7
8use ruvix_types::{KernelError, MsgPriority, RegionHandle, RegionPolicy};
9
10use crate::descriptor::{DescriptorValidator, MessageDescriptor};
11use crate::ring::{RingBuffer, RingEntry};
12use crate::{Duration, Result};
13
14#[derive(Debug, Clone, Copy)]
16pub struct QueueConfig {
17 pub ring_size: u32,
19 pub max_msg_size: u32,
21 pub schema: u32,
23}
24
25impl QueueConfig {
26 #[inline]
33 pub const fn new(ring_size: u32, max_msg_size: u32) -> Self {
34 Self {
35 ring_size,
36 max_msg_size,
37 schema: 0,
38 }
39 }
40
41 #[inline]
43 pub const fn with_schema(ring_size: u32, max_msg_size: u32, schema: u32) -> Self {
44 Self {
45 ring_size,
46 max_msg_size,
47 schema,
48 }
49 }
50
51 pub fn validate(&self) -> Result<()> {
53 if self.ring_size == 0 || (self.ring_size & (self.ring_size - 1)) != 0 {
55 return Err(KernelError::InvalidArgument);
56 }
57
58 if self.max_msg_size == 0 || self.max_msg_size > 1024 * 1024 {
60 return Err(KernelError::InvalidArgument);
61 }
62
63 Ok(())
64 }
65
66 pub fn required_memory(&self) -> usize {
68 let entry_size = RingEntry::HEADER_SIZE + self.max_msg_size as usize;
69 (self.ring_size as usize) * entry_size
70 }
71}
72
73impl Default for QueueConfig {
74 fn default() -> Self {
75 Self {
76 ring_size: 64,
77 max_msg_size: 4096,
78 schema: 0,
79 }
80 }
81}
82
83#[cfg(feature = "std")]
98pub struct KernelQueue {
99 ring: RingBuffer,
101
102 config: QueueConfig,
104
105 validator: DescriptorValidator,
107
108 send_count: AtomicU32,
110
111 recv_count: AtomicU32,
113}
114
115#[cfg(feature = "std")]
116impl KernelQueue {
117 pub fn new(
131 config: QueueConfig,
132 region: RegionHandle,
133 buffer: *mut u8,
134 buffer_len: usize,
135 ) -> Result<Self> {
136 config.validate()?;
137
138 let ring = RingBuffer::new(
139 region,
140 config.ring_size,
141 config.max_msg_size,
142 buffer,
143 buffer_len,
144 )?;
145
146 Ok(Self {
147 ring,
148 config,
149 validator: DescriptorValidator::new(),
150 send_count: AtomicU32::new(0),
151 recv_count: AtomicU32::new(0),
152 })
153 }
154
155 pub fn new_heap(config: QueueConfig) -> Result<(Self, Vec<u8>)> {
159 config.validate()?;
160
161 let required_size = config.required_memory();
162 let mut buffer = vec![0u8; required_size];
163
164 let ring = RingBuffer::new(
165 RegionHandle::null(),
166 config.ring_size,
167 config.max_msg_size,
168 buffer.as_mut_ptr(),
169 buffer.len(),
170 )?;
171
172 let queue = Self {
173 ring,
174 config,
175 validator: DescriptorValidator::new(),
176 send_count: AtomicU32::new(0),
177 recv_count: AtomicU32::new(0),
178 };
179
180 Ok((queue, buffer))
181 }
182
183 pub fn send(&mut self, msg: &[u8], priority: MsgPriority) -> Result<()> {
195 if msg.len() > self.config.max_msg_size as usize {
196 return Err(KernelError::MessageTooLarge);
197 }
198
199 self.ring.enqueue(msg, priority)?;
200 self.send_count.fetch_add(1, Ordering::Relaxed);
201
202 Ok(())
203 }
204
205 pub fn send_descriptor(
228 &mut self,
229 descriptor: &MessageDescriptor,
230 region_policy: &RegionPolicy,
231 region_size: usize,
232 priority: MsgPriority,
233 ) -> Result<()> {
234 self.validator
236 .validate(descriptor, region_policy, region_size)?;
237
238 self.ring.enqueue_descriptor(descriptor, priority)?;
239 self.send_count.fetch_add(1, Ordering::Relaxed);
240
241 Ok(())
242 }
243
244 pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
260 let entry = self.ring.dequeue(buf)?;
261 self.recv_count.fetch_add(1, Ordering::Relaxed);
262 Ok(entry.length as usize)
263 }
264
265 pub fn recv_timeout(&mut self, buf: &mut [u8], timeout: Duration) -> Result<usize> {
280 let start = std::time::Instant::now();
281
282 loop {
283 match self.recv(buf) {
284 Ok(len) => return Ok(len),
285 Err(KernelError::QueueEmpty) => {
286 if start.elapsed() >= timeout {
287 return Err(KernelError::Timeout);
288 }
289 std::hint::spin_loop();
291 }
292 Err(e) => return Err(e),
293 }
294 }
295 }
296
297 pub fn recv_typed(&mut self, buf: &mut [u8]) -> Result<ReceivedMessage> {
304 let entry = self.ring.dequeue(buf)?;
305 self.recv_count.fetch_add(1, Ordering::Relaxed);
306
307 if entry.is_descriptor() {
308 let descriptor =
309 MessageDescriptor::from_bytes(buf).ok_or(KernelError::InternalError)?;
310 Ok(ReceivedMessage::Descriptor {
311 descriptor,
312 priority: entry.priority(),
313 })
314 } else {
315 Ok(ReceivedMessage::Inline {
316 length: entry.length as usize,
317 priority: entry.priority(),
318 })
319 }
320 }
321
322 pub fn peek(&self) -> Option<RingEntry> {
324 self.ring.peek()
325 }
326
327 #[inline]
329 pub fn is_empty(&self) -> bool {
330 self.ring.is_empty()
331 }
332
333 #[inline]
335 pub fn is_full(&self) -> bool {
336 self.ring.is_full()
337 }
338
339 #[inline]
341 pub fn len(&self) -> u32 {
342 self.ring.len()
343 }
344
345 #[inline]
347 pub fn available(&self) -> u32 {
348 self.ring.available()
349 }
350
351 #[inline]
353 pub fn config(&self) -> &QueueConfig {
354 &self.config
355 }
356
357 #[inline]
359 pub fn region(&self) -> RegionHandle {
360 self.ring.region()
361 }
362
363 #[inline]
365 pub fn send_count(&self) -> u32 {
366 self.send_count.load(Ordering::Relaxed)
367 }
368
369 #[inline]
371 pub fn recv_count(&self) -> u32 {
372 self.recv_count.load(Ordering::Relaxed)
373 }
374
375 #[cfg(feature = "stats")]
377 pub fn stats(&self) -> &crate::ring::RingStats {
378 self.ring.stats()
379 }
380}
381
382#[derive(Debug, Clone)]
384pub enum ReceivedMessage {
385 Inline {
387 length: usize,
389 priority: MsgPriority,
391 },
392 Descriptor {
394 descriptor: MessageDescriptor,
396 priority: MsgPriority,
398 },
399}
400
401impl ReceivedMessage {
402 #[inline]
404 pub fn is_descriptor(&self) -> bool {
405 matches!(self, ReceivedMessage::Descriptor { .. })
406 }
407
408 #[inline]
410 pub fn priority(&self) -> MsgPriority {
411 match self {
412 ReceivedMessage::Inline { priority, .. } => *priority,
413 ReceivedMessage::Descriptor { priority, .. } => *priority,
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[cfg(feature = "std")]
423 #[test]
424 fn test_queue_config_validate() {
425 assert!(QueueConfig::new(64, 4096).validate().is_ok());
427 assert!(QueueConfig::new(128, 1024).validate().is_ok());
428
429 assert!(QueueConfig::new(63, 4096).validate().is_err());
431 assert!(QueueConfig::new(0, 4096).validate().is_err());
432
433 assert!(QueueConfig::new(64, 0).validate().is_err());
435 assert!(QueueConfig::new(64, 2 * 1024 * 1024).validate().is_err());
436 }
437
438 #[cfg(feature = "std")]
439 #[test]
440 fn test_queue_basic() {
441 let config = QueueConfig::new(16, 256);
442 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
443
444 assert!(queue.is_empty());
445 assert!(!queue.is_full());
446
447 queue.send(b"hello world", MsgPriority::Normal).unwrap();
449 assert!(!queue.is_empty());
450 assert_eq!(queue.len(), 1);
451
452 let mut buf = [0u8; 256];
454 let len = queue.recv(&mut buf).unwrap();
455 assert_eq!(len, 11);
456 assert_eq!(&buf[..len], b"hello world");
457 assert!(queue.is_empty());
458 }
459
460 #[cfg(feature = "std")]
461 #[test]
462 fn test_queue_send_count() {
463 let config = QueueConfig::new(16, 256);
464 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
465
466 for i in 0..5 {
467 let msg = format!("msg{}", i);
468 queue.send(msg.as_bytes(), MsgPriority::Normal).unwrap();
469 }
470
471 assert_eq!(queue.send_count(), 5);
472 assert_eq!(queue.recv_count(), 0);
473
474 let mut buf = [0u8; 256];
475 for _ in 0..3 {
476 queue.recv(&mut buf).unwrap();
477 }
478
479 assert_eq!(queue.recv_count(), 3);
480 }
481
482 #[cfg(feature = "std")]
483 #[test]
484 fn test_queue_full() {
485 let config = QueueConfig::new(4, 256);
486 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
487
488 for i in 0..4 {
490 let msg = format!("msg{}", i);
491 queue.send(msg.as_bytes(), MsgPriority::Normal).unwrap();
492 }
493
494 assert!(queue.is_full());
495
496 let result = queue.send(b"overflow", MsgPriority::Normal);
498 assert!(matches!(result, Err(KernelError::QueueFull)));
499 }
500
501 #[cfg(feature = "std")]
502 #[test]
503 fn test_queue_message_too_large() {
504 let config = QueueConfig::new(16, 64);
505 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
506
507 let large_msg = vec![0u8; 128];
508 let result = queue.send(&large_msg, MsgPriority::Normal);
509 assert!(matches!(result, Err(KernelError::MessageTooLarge)));
510 }
511
512 #[cfg(feature = "std")]
513 #[test]
514 fn test_queue_recv_empty() {
515 let config = QueueConfig::new(16, 256);
516 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
517
518 let mut buf = [0u8; 256];
519 let result = queue.recv(&mut buf);
520 assert!(matches!(result, Err(KernelError::QueueEmpty)));
521 }
522
523 #[cfg(feature = "std")]
524 #[test]
525 fn test_queue_recv_timeout() {
526 let config = QueueConfig::new(16, 256);
527 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
528
529 let mut buf = [0u8; 256];
530 let start = std::time::Instant::now();
531 let result = queue.recv_timeout(&mut buf, Duration::from_millis(10));
532 let elapsed = start.elapsed();
533
534 assert!(matches!(result, Err(KernelError::Timeout)));
535 assert!(elapsed >= Duration::from_millis(10));
536 }
537
538 #[cfg(feature = "std")]
539 #[test]
540 fn test_queue_descriptor_slab_rejected() {
541 let config = QueueConfig::new(16, 256);
542 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
543
544 use ruvix_types::Handle;
545 let desc = MessageDescriptor::new(RegionHandle(Handle::new(1, 0)), 0, 100);
546
547 let result = queue.send_descriptor(
549 &desc,
550 &RegionPolicy::Slab {
551 slot_size: 64,
552 slot_count: 16,
553 },
554 1024,
555 MsgPriority::Normal,
556 );
557
558 assert!(matches!(result, Err(KernelError::InvalidDescriptorRegion)));
559 }
560
561 #[cfg(feature = "std")]
562 #[test]
563 fn test_queue_descriptor_immutable_ok() {
564 let config = QueueConfig::new(16, 256);
565 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
566
567 use ruvix_types::Handle;
568 let desc = MessageDescriptor::new(RegionHandle(Handle::new(1, 0)), 0, 100);
569
570 let result =
572 queue.send_descriptor(&desc, &RegionPolicy::Immutable, 1024, MsgPriority::Normal);
573
574 assert!(result.is_ok());
575 }
576
577 #[cfg(feature = "std")]
578 #[test]
579 fn test_queue_descriptor_out_of_bounds() {
580 let config = QueueConfig::new(16, 256);
581 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
582
583 use ruvix_types::Handle;
584 let desc = MessageDescriptor::new(RegionHandle(Handle::new(1, 0)), 900, 200);
585
586 let result =
588 queue.send_descriptor(&desc, &RegionPolicy::Immutable, 1000, MsgPriority::Normal);
589
590 assert!(matches!(result, Err(KernelError::InvalidArgument)));
591 }
592
593 #[cfg(feature = "std")]
594 #[test]
595 fn test_received_message_types() {
596 let config = QueueConfig::new(16, 256);
597 let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
598
599 queue.send(b"inline", MsgPriority::High).unwrap();
601
602 let mut buf = [0u8; 256];
603 let msg = queue.recv_typed(&mut buf).unwrap();
604
605 match msg {
606 ReceivedMessage::Inline { length, priority } => {
607 assert_eq!(length, 6);
608 assert_eq!(priority, MsgPriority::High);
609 assert_eq!(&buf[..length], b"inline");
610 }
611 _ => panic!("Expected inline message"),
612 }
613 }
614}