1
2use async_stream::stream;
8use futures_core::Stream;
9use futures_util::StreamExt;
10use std::fmt;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum QueueError {
17 Closed,
19 Full,
21 Disconnected,
23}
24
25impl fmt::Display for QueueError {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 QueueError::Closed => write!(f, "Queue is closed"),
29 QueueError::Full => write!(f, "Queue is full"),
30 QueueError::Disconnected => write!(f, "Queue channel disconnected"),
31 }
32 }
33}
34
35impl std::error::Error for QueueError {}
36
37#[derive(Clone)]
40pub enum Queue<T> {
41 Bounded {
42 sender: Arc<Mutex<Option<mpsc::Sender<T>>>>,
43 receiver: Arc<Mutex<Option<mpsc::Receiver<T>>>>,
44 capacity: usize,
45 },
46 Unbounded {
47 sender: Arc<Mutex<Option<mpsc::UnboundedSender<T>>>>,
48 receiver: Arc<Mutex<Option<mpsc::UnboundedReceiver<T>>>>,
49 },
50}
51
52impl<T> Queue<T>
53where
54 T: Send + 'static,
55{
56 pub fn bounded(capacity: usize) -> Self {
58 let (sender, receiver) = mpsc::channel(capacity);
59 Queue::Bounded {
60 sender: Arc::new(Mutex::new(Some(sender))),
61 receiver: Arc::new(Mutex::new(Some(receiver))),
62 capacity,
63 }
64 }
65
66 pub fn unbounded() -> Self {
68 let (sender, receiver) = mpsc::unbounded_channel();
69 Queue::Unbounded {
70 sender: Arc::new(Mutex::new(Some(sender))),
71 receiver: Arc::new(Mutex::new(Some(receiver))),
72 }
73 }
74
75 pub async fn enqueue(&self, item: T) -> Result<(), QueueError> {
77 match self {
78 Queue::Bounded { sender, .. } => {
79 let guard = sender.lock().await;
80 match &*guard {
81 Some(sender) => {
82 match sender.send(item).await {
83 Ok(_) => Ok(()),
84 Err(_) => Err(QueueError::Disconnected),
85 }
86 }
87 None => Err(QueueError::Closed),
88 }
89 }
90 Queue::Unbounded { sender, .. } => {
91 let guard = sender.lock().await;
92 match &*guard {
93 Some(sender) => {
94 match sender.send(item) {
95 Ok(_) => Ok(()),
96 Err(_) => Err(QueueError::Disconnected),
97 }
98 }
99 None => Err(QueueError::Closed),
100 }
101 }
102 }
103 }
104
105 pub async fn try_enqueue(&self, item: T) -> Result<(), QueueError> {
107 match self {
108 Queue::Bounded { sender, .. } => {
109 let guard = sender.lock().await;
110 match &*guard {
111 Some(sender) => {
112 match sender.try_send(item) {
113 Ok(_) => Ok(()),
114 Err(mpsc::error::TrySendError::Full(_)) => Err(QueueError::Full),
115 Err(mpsc::error::TrySendError::Closed(_)) => Err(QueueError::Disconnected),
116 }
117 }
118 None => Err(QueueError::Closed),
119 }
120 }
121 Queue::Unbounded { sender, .. } => {
122 let guard = sender.lock().await;
123 match &*guard {
124 Some(sender) => {
125 match sender.send(item) {
126 Ok(_) => Ok(()),
127 Err(_) => Err(QueueError::Disconnected),
128 }
129 }
130 None => Err(QueueError::Closed),
131 }
132 }
133 }
134 }
135
136 pub fn dequeue(&self) -> impl Stream<Item = T> + Send + 'static {
138 match self {
139 Queue::Bounded { receiver, .. } => {
140 let receiver = Arc::clone(receiver);
141
142 stream! {
143 loop {
144 let item = {
145 let mut guard = receiver.lock().await;
146 if let Some(rx) = &mut *guard {
147 rx.recv().await
148 } else {
149 None
150 }
151 };
152
153 match item {
154 Some(item) => yield item,
155 None => break,
156 }
157 }
158 }.boxed()
159 }
160 Queue::Unbounded { receiver, .. } => {
161 let receiver = Arc::clone(receiver);
162
163 stream! {
164 loop {
165 let item = {
166 let mut guard = receiver.lock().await;
167 if let Some(rx) = &mut *guard {
168 rx.recv().await
169 } else {
170 None
171 }
172 };
173
174 match item {
175 Some(item) => yield item,
176 None => break,
177 }
178 }
179 }.boxed()
180 }
181 }
182 }
183
184 pub async fn close(&self) {
186 match self {
187 Queue::Bounded { sender, .. } => {
188 let mut guard = sender.lock().await;
189 *guard = None;
190 }
191 Queue::Unbounded { sender, .. } => {
192 let mut guard = sender.lock().await;
193 *guard = None;
194 }
195 }
196 }
197
198 pub fn capacity(&self) -> Option<usize> {
200 match self {
201 Queue::Bounded { capacity, .. } => Some(*capacity),
202 Queue::Unbounded { .. } => None,
203 }
204 }
205
206 pub async fn is_empty(&self) -> bool {
208 self.len().await == 0
209 }
210
211 pub async fn len(&self) -> usize {
213 match self {
214 Queue::Bounded { receiver, .. } => {
215 let guard = receiver.lock().await;
216 if let Some(rx) = &*guard {
217 rx.len()
218 } else {
219 0
220 }
221 }
222 Queue::Unbounded { receiver, .. } => {
223 let guard = receiver.lock().await;
224 if let Some(rx) = &*guard {
225 rx.len()
226 } else {
227 0
228 }
229 }
230 }
231 }
232}
233
234impl<T> fmt::Debug for Queue<T> {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 match self {
237 Queue::Bounded { capacity, .. } => {
238 f.debug_struct("Queue::Bounded")
239 .field("capacity", capacity)
240 .finish()
241 }
242 Queue::Unbounded { .. } => {
243 f.debug_struct("Queue::Unbounded").finish()
244 }
245 }
246 }
247}