rill_core/queues/
rt_queue.rs1use super::spsc::SpscQueue;
8use super::{QueueResult, QueueStatsSnapshot};
9
10#[derive(Debug, Clone, Copy)]
12pub enum QueueType {
13 SingleProducer,
15 MultiProducer,
17}
18
19pub struct RtQueue<T: Copy> {
37 inner: RtQueueInner<T>,
39}
40
41enum RtQueueInner<T: Copy> {
42 Spsc(SpscQueue<T, 1024>), Mpsc(super::mpsc::MpscQueue<T>), }
45
46impl<T: Copy + Default + Send + 'static> RtQueue<T> {
47 pub fn new(capacity: usize) -> Self {
49 if capacity <= 1024 {
51 Self {
52 inner: RtQueueInner::Spsc(SpscQueue::new()),
53 }
54 } else {
55 Self {
56 inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
57 }
58 }
59 }
60
61 pub fn new_spsc() -> Self {
63 Self {
64 inner: RtQueueInner::Spsc(SpscQueue::new()),
65 }
66 }
67
68 pub fn new_mpsc(capacity: usize) -> Self {
70 Self {
71 inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(capacity)),
72 }
73 }
74
75 pub fn push(&self, value: T) -> QueueResult<()> {
77 match &self.inner {
78 RtQueueInner::Spsc(q) => q.push(value),
79 RtQueueInner::Mpsc(q) => q.push(value),
80 }
81 }
82
83 pub fn pop(&self) -> Option<T> {
85 match &self.inner {
86 RtQueueInner::Spsc(q) => q.pop(),
87 RtQueueInner::Mpsc(q) => q.pop(),
88 }
89 }
90
91 pub fn len(&self) -> usize {
93 match &self.inner {
94 RtQueueInner::Spsc(q) => q.len(),
95 RtQueueInner::Mpsc(q) => q.size(),
96 }
97 }
98
99 pub fn capacity(&self) -> usize {
101 match &self.inner {
102 RtQueueInner::Spsc(q) => q.capacity(),
103 RtQueueInner::Mpsc(q) => q.capacity(),
104 }
105 }
106
107 pub fn is_empty(&self) -> bool {
109 self.len() == 0
110 }
111
112 pub fn stats(&self) -> QueueStatsSnapshot {
114 match &self.inner {
115 RtQueueInner::Spsc(q) => q.stats(),
116 RtQueueInner::Mpsc(_q) => {
117 QueueStatsSnapshot {
119 pushes: 0,
120 pops: 0,
121 overflows: 0,
122 underflows: 0,
123 max_size: 0,
124 }
125 }
126 }
127 }
128}
129
130impl<T: Copy> Clone for RtQueue<T> {
131 fn clone(&self) -> Self {
132 match &self.inner {
134 RtQueueInner::Spsc(_) => panic!("Cannot clone SPSC queue"),
135 RtQueueInner::Mpsc(q) => Self {
136 inner: RtQueueInner::Mpsc(super::mpsc::MpscQueue::with_capacity(q.capacity())),
137 },
138 }
139 }
140}
141
142#[allow(unsafe_code)]
143unsafe impl<T: Copy + Send> Send for RtQueue<T> {}
144#[allow(unsafe_code)]
145unsafe impl<T: Copy + Sync> Sync for RtQueue<T> {}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn test_rt_queue_spsc() {
153 let queue = RtQueue::<i32>::new_spsc();
154
155 queue.push(42).unwrap();
156 assert_eq!(queue.pop(), Some(42));
157 assert_eq!(queue.pop(), None);
158 }
159
160 #[test]
161 fn test_rt_queue_mpsc() {
162 let queue = RtQueue::<i32>::new_mpsc(16);
163
164 queue.push(1).unwrap();
165 queue.push(2).unwrap();
166 queue.push(3).unwrap();
167
168 assert_eq!(queue.pop(), Some(1));
169 assert_eq!(queue.pop(), Some(2));
170 assert_eq!(queue.pop(), Some(3));
171 }
172}