1use eyre::{eyre, Result};
2use std::collections::VecDeque;
3use std::sync::mpsc::{Receiver, Sender};
4use std::sync::{mpsc, Arc, Mutex};
5
6pub const QUEUE_SIZE: usize = 64;
7pub const MAX_QUEUE_SIZE: usize = 256;
8
9pub type Queue<T> = Arc<Mutex<VecDeque<T>>>;
10pub struct QReader<T>(Queue<T>, Sender<()>, Receiver<()>);
11pub struct QWriter<T>(Queue<T>, Sender<()>);
12
13impl<T> QReader<T> {
14 pub fn new() -> Self {
15 let queue = Arc::new(Mutex::new(VecDeque::with_capacity(QUEUE_SIZE)));
16 let (tx, rx) = mpsc::channel();
17 Self(queue, tx, rx)
18 }
19
20 #[inline(always)]
21 pub fn push(&mut self, msg: impl Into<T>) {
22 let mut queue = self.0.lock().unwrap();
23 if self.1.send(()).is_ok() {
24 queue.push_back(msg.into());
25 }
26 }
27
28 #[inline(always)]
29 pub fn pop(&mut self) -> Option<T> {
30 if self.2.recv().is_ok() {
31 Some(self.0.lock().unwrap().pop_front().unwrap())
32 } else {
33 None
34 }
35 }
36
37 #[inline(always)]
38 pub fn try_pop(&mut self) -> Option<T> {
39 if let Ok(()) = self.2.try_recv() {
40 self.0.lock().unwrap().pop_front()
41 } else {
42 None
43 }
44 }
45
46 #[inline(always)]
47 pub fn poll(&mut self) -> Result<Vec<T>>
48 where
49 T: Eq,
50 {
51 let mut signals = Vec::with_capacity(16);
52 if self.2.recv().is_ok() {
53 let mut queue = self.0.lock().unwrap();
54 loop {
55 let signal = queue.pop_front().unwrap();
56 if !signals.contains(&signal) {
57 signals.push(signal);
58 }
59 if self.2.try_recv().is_err() {
60 break;
61 }
62 if signals.len() > MAX_QUEUE_SIZE {
63 return Err(eyre!("Signal queue exceeded MAX_QUEUE_SIZE."));
64 }
65 }
66
67 Ok(signals)
68 } else {
69 Err(eyre!("Failed to poll. Ending sync queue."))
70 }
71 }
72
73 #[inline(always)]
74 pub fn clear(&mut self) {
75 self.0.lock().unwrap().clear();
76 }
77
78 #[inline(always)]
79 pub fn writer(&self) -> QWriter<T> {
80 QWriter(self.0.clone(), self.1.clone())
81 }
82}
83
84impl<T> Default for QReader<T> {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl<T> QWriter<T> {
91 #[inline(always)]
92 pub fn push(&mut self, msg: impl Into<T>) {
93 let mut queue = self.0.lock().unwrap();
94 if self.1.send(()).is_ok() {
95 queue.push_back(msg.into());
96 }
97 }
98}
99
100impl<T> Clone for QWriter<T> {
101 fn clone(&self) -> Self {
102 Self(self.0.clone(), self.1.clone())
103 }
104}