Skip to main content

rill_core/queues/
rt_queue.rs

1//! # Main RT-safe queue for dual-thread architecture
2//!
3//! [`RtQueue`] — the main queue for communication between
4//! the control thread and the signal thread. Combines the functionality
5//! of SPSC and MPSC queues with a convenient API.
6
7use super::spsc::SpscQueue;
8use super::{QueueResult, QueueStatsSnapshot};
9
10/// Queue type
11#[derive(Debug, Clone, Copy)]
12pub enum QueueType {
13    /// One producer, one consumer (maximum throughput)
14    SingleProducer,
15    /// Multiple producers, one consumer
16    MultiProducer,
17}
18
19/// Main RT-safe queue
20///
21/// # Example
22/// ```
23/// use rill_core::queues::RtQueue;
24///
25/// // Create a queue for commands
26/// let queue = RtQueue::<i32>::new(1024);
27///
28/// // Control thread (soft RT)
29/// queue.push(42).unwrap();
30///
31/// // Signal thread (hard RT)
32/// if let Some(cmd) = queue.pop() {
33///     println!("Got command: {}", cmd);
34/// }
35/// ```
36pub struct RtQueue<T: Copy> {
37    /// Inner implementation
38    inner: RtQueueInner<T>,
39}
40
41enum RtQueueInner<T: Copy> {
42    Spsc(SpscQueue<T, 1024>),        // For single producer
43    Mpsc(super::mpsc::MpscQueue<T>), // For multiple producers
44}
45
46impl<T: Copy + Default + Send + 'static> RtQueue<T> {
47    /// Create a new queue with a fixed size
48    pub fn new(capacity: usize) -> Self {
49        // By default use SPSC for maximum performance
50        if capacity <= 1024 {
51            Self {
52                inner: RtQueueInner::Spsc(SpscQueue::new()),
53            }
54        } else {
55            Self {
56                inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
57            }
58        }
59    }
60
61    /// Create a queue for a single producer
62    pub fn new_spsc() -> Self {
63        Self {
64            inner: RtQueueInner::Spsc(SpscQueue::new()),
65        }
66    }
67
68    /// Create a queue for multiple producers
69    pub fn new_mpsc(capacity: usize) -> Self {
70        Self {
71            inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
72        }
73    }
74
75    /// Push an element (from the control thread)
76    pub fn push(&self, value: T) -> QueueResult<()> {
77        match &self.inner {
78            RtQueueInner::Spsc(q) => q.push(value),
79            RtQueueInner::Mpsc(q) => q.push(value),
80        }
81    }
82
83    /// Pop an element (from the signal thread)
84    pub fn pop(&self) -> Option<T> {
85        match &self.inner {
86            RtQueueInner::Spsc(q) => q.pop(),
87            RtQueueInner::Mpsc(q) => q.pop(),
88        }
89    }
90
91    /// Current size
92    pub fn len(&self) -> usize {
93        match &self.inner {
94            RtQueueInner::Spsc(q) => q.len(),
95            RtQueueInner::Mpsc(q) => q.size(),
96        }
97    }
98
99    /// Capacity
100    pub fn capacity(&self) -> usize {
101        match &self.inner {
102            RtQueueInner::Spsc(q) => q.capacity(),
103            RtQueueInner::Mpsc(q) => q.capacity(),
104        }
105    }
106
107    /// Check if the queue is empty
108    pub fn is_empty(&self) -> bool {
109        self.len() == 0
110    }
111
112    /// Get statistics
113    pub fn stats(&self) -> QueueStatsSnapshot {
114        match &self.inner {
115            RtQueueInner::Spsc(q) => q.stats(),
116            RtQueueInner::Mpsc(_q) => {
117                // Stub for MPSC
118                QueueStatsSnapshot {
119                    pushes: 0,
120                    pops: 0,
121                    overflows: 0,
122                    underflows: 0,
123                    max_size: 0,
124                }
125            }
126        }
127    }
128}
129
130impl<T: Copy> Clone for RtQueue<T> {
131    fn clone(&self) -> Self {
132        // Only for MPSC queues, SPSC cannot be cloned
133        match &self.inner {
134            RtQueueInner::Spsc(_) => panic!("Cannot clone SPSC queue"),
135            RtQueueInner::Mpsc(q) => Self {
136                inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(q.capacity())),
137            },
138        }
139    }
140}
141
142#[allow(unsafe_code)]
143unsafe impl<T: Copy + Send> Send for RtQueue<T> {}
144#[allow(unsafe_code)]
145unsafe impl<T: Copy + Sync> Sync for RtQueue<T> {}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_rt_queue_spsc() {
153        let queue = RtQueue::<i32>::new_spsc();
154
155        queue.push(42).unwrap();
156        assert_eq!(queue.pop(), Some(42));
157        assert_eq!(queue.pop(), None);
158    }
159
160    #[test]
161    fn test_rt_queue_mpsc() {
162        let queue = RtQueue::<i32>::new_mpsc(16);
163
164        queue.push(1).unwrap();
165        queue.push(2).unwrap();
166        queue.push(3).unwrap();
167
168        assert_eq!(queue.pop(), Some(1));
169        assert_eq!(queue.pop(), Some(2));
170        assert_eq!(queue.pop(), Some(3));
171    }
172}