Skip to main content

ruvix_queue/
kernel_queue.rs

1//! Kernel queue implementation.
2//!
3//! This module provides the high-level `KernelQueue` struct that implements
4//! the ADR-087 queue IPC primitive.
5
6use core::sync::atomic::{AtomicU32, Ordering};
7
8use ruvix_types::{KernelError, MsgPriority, RegionHandle, RegionPolicy};
9
10use crate::descriptor::{DescriptorValidator, MessageDescriptor};
11use crate::ring::{RingBuffer, RingEntry};
12use crate::{Duration, Result};
13
14/// Configuration for a kernel queue.
15#[derive(Debug, Clone, Copy)]
16pub struct QueueConfig {
17    /// Ring buffer size (must be power of 2).
18    pub ring_size: u32,
19    /// Maximum message size in bytes.
20    pub max_msg_size: u32,
21    /// WIT type ID for message schema validation (0 = no validation).
22    pub schema: u32,
23}
24
25impl QueueConfig {
26    /// Create a new queue configuration.
27    ///
28    /// # Arguments
29    ///
30    /// * `ring_size` - Number of ring entries (must be power of 2)
31    /// * `max_msg_size` - Maximum message size in bytes
32    #[inline]
33    pub const fn new(ring_size: u32, max_msg_size: u32) -> Self {
34        Self {
35            ring_size,
36            max_msg_size,
37            schema: 0,
38        }
39    }
40
41    /// Create a configuration with schema validation.
42    #[inline]
43    pub const fn with_schema(ring_size: u32, max_msg_size: u32, schema: u32) -> Self {
44        Self {
45            ring_size,
46            max_msg_size,
47            schema,
48        }
49    }
50
51    /// Validate the configuration.
52    pub fn validate(&self) -> Result<()> {
53        // Ring size must be power of 2
54        if self.ring_size == 0 || (self.ring_size & (self.ring_size - 1)) != 0 {
55            return Err(KernelError::InvalidArgument);
56        }
57
58        // Max message size must be reasonable
59        if self.max_msg_size == 0 || self.max_msg_size > 1024 * 1024 {
60            return Err(KernelError::InvalidArgument);
61        }
62
63        Ok(())
64    }
65
66    /// Calculate the required backing memory size.
67    pub fn required_memory(&self) -> usize {
68        let entry_size = RingEntry::HEADER_SIZE + self.max_msg_size as usize;
69        (self.ring_size as usize) * entry_size
70    }
71}
72
73impl Default for QueueConfig {
74    fn default() -> Self {
75        Self {
76            ring_size: 64,
77            max_msg_size: 4096,
78            schema: 0,
79        }
80    }
81}
82
83/// A kernel queue for inter-task communication.
84///
85/// Implements io_uring-style ring buffer IPC as specified in ADR-087 Section 7.
86/// Features:
87///
88/// - Lock-free send/recv using atomic head/tail pointers
89/// - Zero-copy descriptor-based messaging for shared regions
90/// - Priority support (messages can have Low, Normal, High, Urgent priority)
91/// - Optional WIT schema validation
92///
93/// # Thread Safety
94///
95/// KernelQueue is `Send` and `Sync`. Multiple tasks can send to the same queue
96/// concurrently. Receives are typically done by a single consumer task.
97#[cfg(feature = "std")]
98pub struct KernelQueue {
99    /// The ring buffer.
100    ring: RingBuffer,
101
102    /// Queue configuration.
103    config: QueueConfig,
104
105    /// Descriptor validator.
106    validator: DescriptorValidator,
107
108    /// Number of send operations.
109    send_count: AtomicU32,
110
111    /// Number of recv operations.
112    recv_count: AtomicU32,
113}
114
115#[cfg(feature = "std")]
116impl KernelQueue {
117    /// Create a new kernel queue.
118    ///
119    /// # Arguments
120    ///
121    /// * `config` - Queue configuration
122    /// * `region` - Handle to the backing region
123    /// * `buffer` - Pointer to the backing memory
124    /// * `buffer_len` - Length of the backing memory
125    ///
126    /// # Errors
127    ///
128    /// Returns `InvalidParameter` if the configuration is invalid.
129    /// Returns `OutOfMemory` if the buffer is too small.
130    pub fn new(
131        config: QueueConfig,
132        region: RegionHandle,
133        buffer: *mut u8,
134        buffer_len: usize,
135    ) -> Result<Self> {
136        config.validate()?;
137
138        let ring = RingBuffer::new(
139            region,
140            config.ring_size,
141            config.max_msg_size,
142            buffer,
143            buffer_len,
144        )?;
145
146        Ok(Self {
147            ring,
148            config,
149            validator: DescriptorValidator::new(),
150            send_count: AtomicU32::new(0),
151            recv_count: AtomicU32::new(0),
152        })
153    }
154
155    /// Create a queue with heap-allocated backing memory.
156    ///
157    /// This is a convenience method for testing and std environments.
158    pub fn new_heap(config: QueueConfig) -> Result<(Self, Vec<u8>)> {
159        config.validate()?;
160
161        let required_size = config.required_memory();
162        let mut buffer = vec![0u8; required_size];
163
164        let ring = RingBuffer::new(
165            RegionHandle::null(),
166            config.ring_size,
167            config.max_msg_size,
168            buffer.as_mut_ptr(),
169            buffer.len(),
170        )?;
171
172        let queue = Self {
173            ring,
174            config,
175            validator: DescriptorValidator::new(),
176            send_count: AtomicU32::new(0),
177            recv_count: AtomicU32::new(0),
178        };
179
180        Ok((queue, buffer))
181    }
182
183    /// Send a message to the queue.
184    ///
185    /// # Arguments
186    ///
187    /// * `msg` - Message payload bytes
188    /// * `priority` - Message priority
189    ///
190    /// # Errors
191    ///
192    /// Returns `QueueFull` if the queue is full.
193    /// Returns `MessageTooLarge` if the message exceeds `max_msg_size`.
194    pub fn send(&mut self, msg: &[u8], priority: MsgPriority) -> Result<()> {
195        if msg.len() > self.config.max_msg_size as usize {
196            return Err(KernelError::MessageTooLarge);
197        }
198
199        self.ring.enqueue(msg, priority)?;
200        self.send_count.fetch_add(1, Ordering::Relaxed);
201
202        Ok(())
203    }
204
205    /// Send a zero-copy message via descriptor.
206    ///
207    /// Instead of copying data, this sends a reference to data in a shared region.
208    /// The receiver must read the data from the shared region using the descriptor.
209    ///
210    /// # Arguments
211    ///
212    /// * `descriptor` - Reference to data in a shared region
213    /// * `region_policy` - Policy of the region referenced by the descriptor
214    /// * `region_size` - Size of the region
215    /// * `priority` - Message priority
216    ///
217    /// # Errors
218    ///
219    /// Returns `QueueFull` if the queue is full.
220    /// Returns `InvalidDescriptorRegion` if the region policy doesn't allow descriptors.
221    /// Returns `InvalidParameter` if the descriptor references out-of-bounds memory.
222    ///
223    /// # TOCTOU Protection
224    ///
225    /// Only Immutable or AppendOnly regions are allowed. This prevents the sender
226    /// from modifying the data after sending but before the receiver processes it.
227    pub fn send_descriptor(
228        &mut self,
229        descriptor: &MessageDescriptor,
230        region_policy: &RegionPolicy,
231        region_size: usize,
232        priority: MsgPriority,
233    ) -> Result<()> {
234        // Validate the descriptor and region policy
235        self.validator
236            .validate(descriptor, region_policy, region_size)?;
237
238        self.ring.enqueue_descriptor(descriptor, priority)?;
239        self.send_count.fetch_add(1, Ordering::Relaxed);
240
241        Ok(())
242    }
243
244    /// Receive a message from the queue (non-blocking).
245    ///
246    /// # Arguments
247    ///
248    /// * `buf` - Buffer to receive the message data
249    ///
250    /// # Returns
251    ///
252    /// On success, returns the number of bytes received. For descriptor messages,
253    /// the descriptor is written to `buf` and you should use `MessageDescriptor::from_bytes`
254    /// to parse it.
255    ///
256    /// # Errors
257    ///
258    /// Returns `QueueEmpty` if no messages are available.
259    pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
260        let entry = self.ring.dequeue(buf)?;
261        self.recv_count.fetch_add(1, Ordering::Relaxed);
262        Ok(entry.length as usize)
263    }
264
265    /// Receive a message from the queue with timeout.
266    ///
267    /// # Arguments
268    ///
269    /// * `buf` - Buffer to receive the message data
270    /// * `timeout` - Maximum time to wait for a message
271    ///
272    /// # Returns
273    ///
274    /// On success, returns the number of bytes received.
275    ///
276    /// # Errors
277    ///
278    /// Returns `Timeout` if no message arrived within the timeout period.
279    pub fn recv_timeout(&mut self, buf: &mut [u8], timeout: Duration) -> Result<usize> {
280        let start = std::time::Instant::now();
281
282        loop {
283            match self.recv(buf) {
284                Ok(len) => return Ok(len),
285                Err(KernelError::QueueEmpty) => {
286                    if start.elapsed() >= timeout {
287                        return Err(KernelError::Timeout);
288                    }
289                    // Spin briefly before retrying
290                    std::hint::spin_loop();
291                }
292                Err(e) => return Err(e),
293            }
294        }
295    }
296
297    /// Receive a message, distinguishing between inline data and descriptors.
298    ///
299    /// # Returns
300    ///
301    /// Returns `ReceivedMessage` which indicates whether the data is inline
302    /// or a descriptor reference.
303    pub fn recv_typed(&mut self, buf: &mut [u8]) -> Result<ReceivedMessage> {
304        let entry = self.ring.dequeue(buf)?;
305        self.recv_count.fetch_add(1, Ordering::Relaxed);
306
307        if entry.is_descriptor() {
308            let descriptor =
309                MessageDescriptor::from_bytes(buf).ok_or(KernelError::InternalError)?;
310            Ok(ReceivedMessage::Descriptor {
311                descriptor,
312                priority: entry.priority(),
313            })
314        } else {
315            Ok(ReceivedMessage::Inline {
316                length: entry.length as usize,
317                priority: entry.priority(),
318            })
319        }
320    }
321
322    /// Peek at the next message without removing it.
323    pub fn peek(&self) -> Option<RingEntry> {
324        self.ring.peek()
325    }
326
327    /// Check if the queue is empty.
328    #[inline]
329    pub fn is_empty(&self) -> bool {
330        self.ring.is_empty()
331    }
332
333    /// Check if the queue is full.
334    #[inline]
335    pub fn is_full(&self) -> bool {
336        self.ring.is_full()
337    }
338
339    /// Get the number of messages currently in the queue.
340    #[inline]
341    pub fn len(&self) -> u32 {
342        self.ring.len()
343    }
344
345    /// Get the number of available slots.
346    #[inline]
347    pub fn available(&self) -> u32 {
348        self.ring.available()
349    }
350
351    /// Get the queue configuration.
352    #[inline]
353    pub fn config(&self) -> &QueueConfig {
354        &self.config
355    }
356
357    /// Get the backing region handle.
358    #[inline]
359    pub fn region(&self) -> RegionHandle {
360        self.ring.region()
361    }
362
363    /// Get the total number of send operations.
364    #[inline]
365    pub fn send_count(&self) -> u32 {
366        self.send_count.load(Ordering::Relaxed)
367    }
368
369    /// Get the total number of recv operations.
370    #[inline]
371    pub fn recv_count(&self) -> u32 {
372        self.recv_count.load(Ordering::Relaxed)
373    }
374
375    /// Get ring buffer statistics.
376    #[cfg(feature = "stats")]
377    pub fn stats(&self) -> &crate::ring::RingStats {
378        self.ring.stats()
379    }
380}
381
382/// Result of a typed receive operation.
383#[derive(Debug, Clone)]
384pub enum ReceivedMessage {
385    /// Inline data was received and is in the buffer.
386    Inline {
387        /// Length of the data in bytes.
388        length: usize,
389        /// Message priority.
390        priority: MsgPriority,
391    },
392    /// A descriptor was received (zero-copy reference).
393    Descriptor {
394        /// The message descriptor.
395        descriptor: MessageDescriptor,
396        /// Message priority.
397        priority: MsgPriority,
398    },
399}
400
401impl ReceivedMessage {
402    /// Check if this is a descriptor message.
403    #[inline]
404    pub fn is_descriptor(&self) -> bool {
405        matches!(self, ReceivedMessage::Descriptor { .. })
406    }
407
408    /// Get the message priority.
409    #[inline]
410    pub fn priority(&self) -> MsgPriority {
411        match self {
412            ReceivedMessage::Inline { priority, .. } => *priority,
413            ReceivedMessage::Descriptor { priority, .. } => *priority,
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[cfg(feature = "std")]
423    #[test]
424    fn test_queue_config_validate() {
425        // Valid config
426        assert!(QueueConfig::new(64, 4096).validate().is_ok());
427        assert!(QueueConfig::new(128, 1024).validate().is_ok());
428
429        // Invalid ring size (not power of 2)
430        assert!(QueueConfig::new(63, 4096).validate().is_err());
431        assert!(QueueConfig::new(0, 4096).validate().is_err());
432
433        // Invalid message size
434        assert!(QueueConfig::new(64, 0).validate().is_err());
435        assert!(QueueConfig::new(64, 2 * 1024 * 1024).validate().is_err());
436    }
437
438    #[cfg(feature = "std")]
439    #[test]
440    fn test_queue_basic() {
441        let config = QueueConfig::new(16, 256);
442        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
443
444        assert!(queue.is_empty());
445        assert!(!queue.is_full());
446
447        // Send a message
448        queue.send(b"hello world", MsgPriority::Normal).unwrap();
449        assert!(!queue.is_empty());
450        assert_eq!(queue.len(), 1);
451
452        // Receive the message
453        let mut buf = [0u8; 256];
454        let len = queue.recv(&mut buf).unwrap();
455        assert_eq!(len, 11);
456        assert_eq!(&buf[..len], b"hello world");
457        assert!(queue.is_empty());
458    }
459
460    #[cfg(feature = "std")]
461    #[test]
462    fn test_queue_send_count() {
463        let config = QueueConfig::new(16, 256);
464        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
465
466        for i in 0..5 {
467            let msg = format!("msg{}", i);
468            queue.send(msg.as_bytes(), MsgPriority::Normal).unwrap();
469        }
470
471        assert_eq!(queue.send_count(), 5);
472        assert_eq!(queue.recv_count(), 0);
473
474        let mut buf = [0u8; 256];
475        for _ in 0..3 {
476            queue.recv(&mut buf).unwrap();
477        }
478
479        assert_eq!(queue.recv_count(), 3);
480    }
481
482    #[cfg(feature = "std")]
483    #[test]
484    fn test_queue_full() {
485        let config = QueueConfig::new(4, 256);
486        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
487
488        // Fill the queue
489        for i in 0..4 {
490            let msg = format!("msg{}", i);
491            queue.send(msg.as_bytes(), MsgPriority::Normal).unwrap();
492        }
493
494        assert!(queue.is_full());
495
496        // Should fail
497        let result = queue.send(b"overflow", MsgPriority::Normal);
498        assert!(matches!(result, Err(KernelError::QueueFull)));
499    }
500
501    #[cfg(feature = "std")]
502    #[test]
503    fn test_queue_message_too_large() {
504        let config = QueueConfig::new(16, 64);
505        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
506
507        let large_msg = vec![0u8; 128];
508        let result = queue.send(&large_msg, MsgPriority::Normal);
509        assert!(matches!(result, Err(KernelError::MessageTooLarge)));
510    }
511
512    #[cfg(feature = "std")]
513    #[test]
514    fn test_queue_recv_empty() {
515        let config = QueueConfig::new(16, 256);
516        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
517
518        let mut buf = [0u8; 256];
519        let result = queue.recv(&mut buf);
520        assert!(matches!(result, Err(KernelError::QueueEmpty)));
521    }
522
523    #[cfg(feature = "std")]
524    #[test]
525    fn test_queue_recv_timeout() {
526        let config = QueueConfig::new(16, 256);
527        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
528
529        let mut buf = [0u8; 256];
530        let start = std::time::Instant::now();
531        let result = queue.recv_timeout(&mut buf, Duration::from_millis(10));
532        let elapsed = start.elapsed();
533
534        assert!(matches!(result, Err(KernelError::Timeout)));
535        assert!(elapsed >= Duration::from_millis(10));
536    }
537
538    #[cfg(feature = "std")]
539    #[test]
540    fn test_queue_descriptor_slab_rejected() {
541        let config = QueueConfig::new(16, 256);
542        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
543
544        use ruvix_types::Handle;
545        let desc = MessageDescriptor::new(RegionHandle(Handle::new(1, 0)), 0, 100);
546
547        // Slab regions should be rejected
548        let result = queue.send_descriptor(
549            &desc,
550            &RegionPolicy::Slab {
551                slot_size: 64,
552                slot_count: 16,
553            },
554            1024,
555            MsgPriority::Normal,
556        );
557
558        assert!(matches!(result, Err(KernelError::InvalidDescriptorRegion)));
559    }
560
561    #[cfg(feature = "std")]
562    #[test]
563    fn test_queue_descriptor_immutable_ok() {
564        let config = QueueConfig::new(16, 256);
565        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
566
567        use ruvix_types::Handle;
568        let desc = MessageDescriptor::new(RegionHandle(Handle::new(1, 0)), 0, 100);
569
570        // Immutable regions should be allowed
571        let result =
572            queue.send_descriptor(&desc, &RegionPolicy::Immutable, 1024, MsgPriority::Normal);
573
574        assert!(result.is_ok());
575    }
576
577    #[cfg(feature = "std")]
578    #[test]
579    fn test_queue_descriptor_out_of_bounds() {
580        let config = QueueConfig::new(16, 256);
581        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
582
583        use ruvix_types::Handle;
584        let desc = MessageDescriptor::new(RegionHandle(Handle::new(1, 0)), 900, 200);
585
586        // Offset + length exceeds region size
587        let result =
588            queue.send_descriptor(&desc, &RegionPolicy::Immutable, 1000, MsgPriority::Normal);
589
590        assert!(matches!(result, Err(KernelError::InvalidArgument)));
591    }
592
593    #[cfg(feature = "std")]
594    #[test]
595    fn test_received_message_types() {
596        let config = QueueConfig::new(16, 256);
597        let (mut queue, _buffer) = KernelQueue::new_heap(config).unwrap();
598
599        // Send inline message
600        queue.send(b"inline", MsgPriority::High).unwrap();
601
602        let mut buf = [0u8; 256];
603        let msg = queue.recv_typed(&mut buf).unwrap();
604
605        match msg {
606            ReceivedMessage::Inline { length, priority } => {
607                assert_eq!(length, 6);
608                assert_eq!(priority, MsgPriority::High);
609                assert_eq!(&buf[..length], b"inline");
610            }
611            _ => panic!("Expected inline message"),
612        }
613    }
614}