1#[cfg(test)]
2extern crate env_logger;
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::sync::Arc;
7
8use anyhow::Result;
9use thiserror::Error;
10
11pub use blocking_queue::*;
12pub use queue_mpsc::*;
13pub use queue_vec::*;
14
15mod blocking_queue;
16mod queue_mpsc;
17mod queue_vec;
18
19pub trait Element: Debug + Clone + Send + Sync {}
20
21impl Element for i8 {}
22
23impl Element for i16 {}
24
25impl Element for i32 {}
26
27impl Element for i64 {}
28
29impl Element for u8 {}
30
31impl Element for u16 {}
32
33impl Element for u32 {}
34
35impl Element for u64 {}
36
37impl Element for usize {}
38
39impl Element for f32 {}
40
41impl Element for f64 {}
42
43impl Element for String {}
44
45impl<T: Debug + Clone + Send + Sync> Element for Box<T> {}
46
47impl<T: Debug + Clone + Send + Sync> Element for Arc<T> {}
48
49#[derive(Error, Debug)]
50pub enum QueueError<E> {
51 #[error("Failed to offer an element: {0:?}")]
52 OfferError(E),
53 #[error("Failed to pool an element")]
54 PoolError,
55 #[error("Failed to peek an element")]
56 PeekError,
57}
58
59#[derive(Debug, Clone)]
60pub enum QueueSize {
61 Limitless,
62 Limited(usize),
63}
64
65impl QueueSize {
66 fn increment(&mut self) {
67 match self {
68 QueueSize::Limited(c) => {
69 *c += 1;
70 }
71 _ => {}
72 }
73 }
74
75 fn decrement(&mut self) {
76 match self {
77 QueueSize::Limited(c) => {
78 *c -= 1;
79 }
80 _ => {}
81 }
82 }
83}
84
85impl PartialEq<Self> for QueueSize {
86 fn eq(&self, other: &Self) -> bool {
87 match (self, other) {
88 (QueueSize::Limitless, QueueSize::Limitless) => true,
89 (QueueSize::Limited(l), QueueSize::Limited(r)) => l == r,
90 _ => false,
91 }
92 }
93}
94
95impl PartialOrd<QueueSize> for QueueSize {
96 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
97 match (self, other) {
98 (QueueSize::Limitless, QueueSize::Limitless) => Some(Ordering::Equal),
99 (QueueSize::Limitless, _) => Some(Ordering::Greater),
100 (_, QueueSize::Limitless) => Some(Ordering::Less),
101 (QueueSize::Limited(l), QueueSize::Limited(r)) => l.partial_cmp(r),
102 }
103 }
104}
105
106pub trait QueueBehavior<E>: Send + Sized {
107 fn is_empty(&self) -> bool {
110 self.len() == QueueSize::Limited(0)
111 }
112
113 fn non_empty(&self) -> bool {
116 !self.is_empty()
117 }
118
119 fn is_full(&self) -> bool {
122 self.capacity() == self.len()
123 }
124
125 fn non_full(&self) -> bool {
128 !self.is_full()
129 }
130
131 fn len(&self) -> QueueSize;
134
135 fn capacity(&self) -> QueueSize;
138
139 fn offer(&mut self, e: E) -> Result<()>;
143
144 fn poll(&mut self) -> Result<Option<E>>;
147}
148
149pub trait HasPeekBehavior<E: Element>: QueueBehavior<E> {
150 fn peek(&self) -> Result<Option<E>>;
153}
154
155pub trait BlockingQueueBehavior<E: Element>: QueueBehavior<E> + Send {
156 fn put(&mut self, e: E) -> Result<()>;
159
160 fn take(&mut self) -> Result<Option<E>>;
163}
164
165pub enum QueueType {
166 Vec,
167 MPSC,
168}
169
170#[derive(Debug, Clone)]
171pub enum Queue<T> {
172 Vec(QueueVec<T>),
173 MPSC(QueueMPSC<T>),
174}
175
176impl<T: Element + 'static> Queue<T> {
177 pub fn with_blocking(self) -> BlockingQueue<T, Queue<T>> {
178 BlockingQueue::new(self)
179 }
180}
181
182impl<T: Element + 'static> QueueBehavior<T> for Queue<T> {
183 fn len(&self) -> QueueSize {
184 match self {
185 Queue::Vec(inner) => inner.len(),
186 Queue::MPSC(inner) => inner.len(),
187 }
188 }
189
190 fn capacity(&self) -> QueueSize {
191 match self {
192 Queue::Vec(inner) => inner.capacity(),
193 Queue::MPSC(inner) => inner.capacity(),
194 }
195 }
196
197 fn offer(&mut self, e: T) -> Result<()> {
198 match self {
199 Queue::Vec(inner) => inner.offer(e),
200 Queue::MPSC(inner) => inner.offer(e),
201 }
202 }
203
204 fn poll(&mut self) -> Result<Option<T>> {
205 match self {
206 Queue::Vec(inner) => inner.poll(),
207 Queue::MPSC(inner) => inner.poll(),
208 }
209 }
210}
211
212pub fn create_queue<T: Element + 'static>(queue_type: QueueType, num_elements: Option<usize>) -> Queue<T> {
213 match (queue_type, num_elements) {
214 (QueueType::Vec, None) => Queue::Vec(QueueVec::<T>::new()),
215 (QueueType::Vec, Some(num)) => Queue::Vec(QueueVec::<T>::with_num_elements(num)),
216 (QueueType::MPSC, None) => Queue::MPSC(QueueMPSC::<T>::new()),
217 (QueueType::MPSC, Some(num)) => Queue::MPSC(QueueMPSC::<T>::with_num_elements(num)),
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::thread::sleep;
224 use std::time::Duration;
225 use std::{env, thread};
226
227 use fp_rust::sync::CountDownLatch;
228
229 use crate::queue::BlockingQueueBehavior;
230 use crate::queue::{create_queue, QueueBehavior, QueueType};
231
232 fn init_logger() {
233 env::set_var("RUST_LOG", "debug");
234 let _ = env_logger::try_init();
236 }
237
238 fn test_queue_vec<Q>(queue: Q)
239 where
240 Q: QueueBehavior<i32> + Clone + 'static, {
241 let cdl = CountDownLatch::new(1);
242 let cdl2 = cdl.clone();
243
244 let mut q1 = queue;
245 let mut q2 = q1.clone();
246
247 let max = 5;
248
249 let handler1 = thread::spawn(move || {
250 cdl2.countdown();
251 for i in 1..=max {
252 log::debug!("take: start: {}", i);
253 let n = q2.poll();
254 log::debug!("take: finish: {},{:?}", i, n);
255 }
256 });
257
258 cdl.wait();
259
260 let handler2 = thread::spawn(move || {
261 sleep(Duration::from_secs(3));
262
263 for i in 1..=max {
264 log::debug!("put: start: {}", i);
265 q1.offer(i).unwrap();
266 log::debug!("put: finish: {}", i);
267 }
268 });
269
270 handler1.join().unwrap();
271 handler2.join().unwrap();
272 }
273
274 fn test_blocking_queue_vec<Q>(queue: Q)
275 where
276 Q: BlockingQueueBehavior<i32> + Clone + 'static, {
277 let cdl = CountDownLatch::new(1);
278 let cdl2 = cdl.clone();
279
280 let mut bqv1 = queue;
281 let mut bqv2 = bqv1.clone();
282
283 let max = 5;
284
285 let handler1 = thread::spawn(move || {
286 cdl2.countdown();
287 for i in 1..=max {
288 log::debug!("take: start: {}", i);
289 let n = bqv2.take();
290 log::debug!("take: finish: {},{:?}", i, n);
291 }
292 });
293
294 cdl.wait();
295
296 let handler2 = thread::spawn(move || {
297 sleep(Duration::from_secs(3));
298
299 for i in 1..=max {
300 log::debug!("put: start: {}", i);
301 bqv1.offer(i).unwrap();
302 log::debug!("put: finish: {}", i);
303 }
304 });
305
306 handler1.join().unwrap();
307 handler2.join().unwrap();
308 }
309
310 #[test]
311 fn test() {
312 init_logger();
313
314 let q = create_queue(QueueType::Vec, Some(32));
315 test_queue_vec(q);
316
317 let bq = create_queue(QueueType::Vec, Some(32)).with_blocking();
318 test_blocking_queue_vec(bq);
319 }
320}