1use async_stream::stream;
7use futures_core::Stream;
8use futures_util::StreamExt;
9use std::fmt;
10use std::sync::Arc;
11use tokio::sync::{mpsc, Mutex};
12use crate::resource_manager::get_global_resource_manager;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum QueueError {
17 Closed,
19 Full,
21 Disconnected,
23}
24
25impl fmt::Display for QueueError {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 QueueError::Closed => write!(f, "Queue is closed"),
29 QueueError::Full => write!(f, "Queue is full"),
30 QueueError::Disconnected => write!(f, "Queue channel disconnected"),
31 }
32 }
33}
34
35impl std::error::Error for QueueError {}
36
37#[derive(Clone)]
40pub enum Queue<T> {
41 Bounded {
42 sender: Arc<Mutex<Option<mpsc::Sender<T>>>>,
43 receiver: Arc<Mutex<Option<mpsc::Receiver<T>>>>,
44 capacity: usize,
45 },
46 Unbounded {
47 sender: Arc<Mutex<Option<mpsc::UnboundedSender<T>>>>,
48 receiver: Arc<Mutex<Option<mpsc::UnboundedReceiver<T>>>>,
49 },
50}
51
52impl<T> Queue<T>
53where
54 T: Send + 'static,
55{
56 pub fn bounded(capacity: usize) -> Self {
58 let (sender, receiver) = mpsc::channel(capacity);
59 Queue::Bounded {
60 sender: Arc::new(Mutex::new(Some(sender))),
61 receiver: Arc::new(Mutex::new(Some(receiver))),
62 capacity,
63 }
64 }
65
66 pub fn unbounded() -> Self {
68 let (sender, receiver) = mpsc::unbounded_channel();
69 Queue::Unbounded {
70 sender: Arc::new(Mutex::new(Some(sender))),
71 receiver: Arc::new(Mutex::new(Some(receiver))),
72 }
73 }
74
75 pub async fn enqueue(&self, item: T) -> Result<(), QueueError> {
77 let resource_manager = get_global_resource_manager();
78 match self {
79 Queue::Bounded { sender, .. } => {
80 let guard = sender.lock().await;
81 match &*guard {
82 Some(sender) => match sender.send(item).await {
83 Ok(_) => {
84 resource_manager.track_memory_allocation(1).await.ok();
85 Ok(())
86 },
87 Err(_) => Err(QueueError::Disconnected),
88 },
89 None => Err(QueueError::Closed),
90 }
91 }
92 Queue::Unbounded { sender, .. } => {
93 let guard = sender.lock().await;
94 match &*guard {
95 Some(sender) => match sender.send(item) {
96 Ok(_) => {
97 resource_manager.track_memory_allocation(1).await.ok();
98 Ok(())
99 },
100 Err(_) => Err(QueueError::Disconnected),
101 },
102 None => Err(QueueError::Closed),
103 }
104 }
105 }
106 }
107
108 pub async fn try_enqueue(&self, item: T) -> Result<(), QueueError> {
110 let resource_manager = get_global_resource_manager();
111 match self {
112 Queue::Bounded { sender, .. } => {
113 let guard = sender.lock().await;
114 match &*guard {
115 Some(sender) => match sender.try_send(item) {
116 Ok(_) => {
117 resource_manager.track_memory_allocation(1).await.ok();
118 Ok(())
119 },
120 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
121 resource_manager.track_buffer_overflow().await.ok();
122 Err(QueueError::Full)
123 },
124 Err(_) => Err(QueueError::Disconnected),
125 },
126 None => Err(QueueError::Closed),
127 }
128 }
129 Queue::Unbounded { sender, .. } => {
130 let guard = sender.lock().await;
131 match &*guard {
132 Some(sender) => match sender.send(item) {
133 Ok(_) => {
134 resource_manager.track_memory_allocation(1).await.ok();
135 Ok(())
136 },
137 Err(_) => Err(QueueError::Disconnected),
138 },
139 None => Err(QueueError::Closed),
140 }
141 }
142 }
143 }
144
145 pub fn dequeue(&self) -> impl Stream<Item = T> + Send + 'static {
147 let resource_manager = get_global_resource_manager();
148 match self {
149 Queue::Bounded { receiver, .. } => {
150 let receiver = Arc::clone(receiver);
151 let resource_manager = resource_manager.clone();
152 stream! {
153 loop {
154 let item = {
155 let mut guard = receiver.lock().await;
156 if let Some(rx) = &mut *guard {
157 rx.recv().await
158 } else {
159 None
160 }
161 };
162 match item {
163 Some(item) => {
164 resource_manager.track_memory_deallocation(1).await;
165 yield item
166 },
167 None => break,
168 }
169 }
170 }
171 .boxed()
172 }
173 Queue::Unbounded { receiver, .. } => {
174 let receiver = Arc::clone(receiver);
175 let resource_manager = resource_manager.clone();
176 stream! {
177 loop {
178 let item = {
179 let mut guard = receiver.lock().await;
180 if let Some(rx) = &mut *guard {
181 rx.recv().await
182 } else {
183 None
184 }
185 };
186 match item {
187 Some(item) => {
188 resource_manager.track_memory_deallocation(1).await;
189 yield item
190 },
191 None => break,
192 }
193 }
194 }
195 .boxed()
196 }
197 }
198 }
199
200 pub async fn close(&self) {
202 match self {
203 Queue::Bounded { sender, .. } => {
204 let mut guard = sender.lock().await;
205 *guard = None;
206 }
207 Queue::Unbounded { sender, .. } => {
208 let mut guard = sender.lock().await;
209 *guard = None;
210 }
211 }
212 }
213
214 pub fn capacity(&self) -> Option<usize> {
216 match self {
217 Queue::Bounded { capacity, .. } => Some(*capacity),
218 Queue::Unbounded { .. } => None,
219 }
220 }
221
222 pub async fn is_empty(&self) -> bool {
224 self.len().await == 0
225 }
226
227 pub async fn len(&self) -> usize {
229 match self {
230 Queue::Bounded { receiver, .. } => {
231 let guard = receiver.lock().await;
232 if let Some(rx) = &*guard {
233 rx.len()
234 } else {
235 0
236 }
237 }
238 Queue::Unbounded { receiver, .. } => {
239 let guard = receiver.lock().await;
240 if let Some(rx) = &*guard {
241 rx.len()
242 } else {
243 0
244 }
245 }
246 }
247 }
248}
249
250impl<T> fmt::Debug for Queue<T> {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 match self {
253 Queue::Bounded { capacity, .. } => f
254 .debug_struct("Queue::Bounded")
255 .field("capacity", capacity)
256 .finish(),
257 Queue::Unbounded { .. } => f.debug_struct("Queue::Unbounded").finish(),
258 }
259 }
260}