rs2_stream/
queue.rs

1//! Queue implementation for RStream
2//!
3//! This module provides a concurrent queue with Stream interface for dequeuing
4//! and async methods for enqueuing.
5
6use async_stream::stream;
7use futures_core::Stream;
8use futures_util::StreamExt;
9use std::fmt;
10use std::sync::Arc;
11use tokio::sync::{mpsc, Mutex};
12use crate::resource_manager::get_global_resource_manager;
13
14/// Error types for Queue operations
15#[derive(Debug, Clone, PartialEq)]
16pub enum QueueError {
17    /// Queue has been closed
18    Closed,
19    /// Queue is full (for bounded queues with try_enqueue)
20    Full,
21    /// Channel disconnected
22    Disconnected,
23}
24
25impl fmt::Display for QueueError {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        match self {
28            QueueError::Closed => write!(f, "Queue is closed"),
29            QueueError::Full => write!(f, "Queue is full"),
30            QueueError::Disconnected => write!(f, "Queue channel disconnected"),
31        }
32    }
33}
34
35impl std::error::Error for QueueError {}
36
37/// A Queue represents a concurrent queue with a Stream interface for dequeuing
38/// and async methods for enqueuing.
39#[derive(Clone)]
40pub enum Queue<T> {
41    Bounded {
42        sender: Arc<Mutex<Option<mpsc::Sender<T>>>>,
43        receiver: Arc<Mutex<Option<mpsc::Receiver<T>>>>,
44        capacity: usize,
45    },
46    Unbounded {
47        sender: Arc<Mutex<Option<mpsc::UnboundedSender<T>>>>,
48        receiver: Arc<Mutex<Option<mpsc::UnboundedReceiver<T>>>>,
49    },
50}
51
52impl<T> Queue<T>
53where
54    T: Send + 'static,
55{
56    /// Create a new bounded queue with the given capacity
57    pub fn bounded(capacity: usize) -> Self {
58        let (sender, receiver) = mpsc::channel(capacity);
59        Queue::Bounded {
60            sender: Arc::new(Mutex::new(Some(sender))),
61            receiver: Arc::new(Mutex::new(Some(receiver))),
62            capacity,
63        }
64    }
65
66    /// Create a new unbounded queue
67    pub fn unbounded() -> Self {
68        let (sender, receiver) = mpsc::unbounded_channel();
69        Queue::Unbounded {
70            sender: Arc::new(Mutex::new(Some(sender))),
71            receiver: Arc::new(Mutex::new(Some(receiver))),
72        }
73    }
74
75    /// Enqueue an item into the queue
76    pub async fn enqueue(&self, item: T) -> Result<(), QueueError> {
77        let resource_manager = get_global_resource_manager();
78        match self {
79            Queue::Bounded { sender, .. } => {
80                let guard = sender.lock().await;
81                match &*guard {
82                    Some(sender) => match sender.send(item).await {
83                        Ok(_) => {
84                            resource_manager.track_memory_allocation(1).await.ok();
85                            Ok(())
86                        },
87                        Err(_) => Err(QueueError::Disconnected),
88                    },
89                    None => Err(QueueError::Closed),
90                }
91            }
92            Queue::Unbounded { sender, .. } => {
93                let guard = sender.lock().await;
94                match &*guard {
95                    Some(sender) => match sender.send(item) {
96                        Ok(_) => {
97                            resource_manager.track_memory_allocation(1).await.ok();
98                            Ok(())
99                        },
100                        Err(_) => Err(QueueError::Disconnected),
101                    },
102                    None => Err(QueueError::Closed),
103                }
104            }
105        }
106    }
107
108    /// Try to enqueue an item without blocking
109    pub async fn try_enqueue(&self, item: T) -> Result<(), QueueError> {
110        let resource_manager = get_global_resource_manager();
111        match self {
112            Queue::Bounded { sender, .. } => {
113                let guard = sender.lock().await;
114                match &*guard {
115                    Some(sender) => match sender.try_send(item) {
116                        Ok(_) => {
117                            resource_manager.track_memory_allocation(1).await.ok();
118                            Ok(())
119                        },
120                        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
121                            resource_manager.track_buffer_overflow().await.ok();
122                            Err(QueueError::Full)
123                        },
124                        Err(_) => Err(QueueError::Disconnected),
125                    },
126                    None => Err(QueueError::Closed),
127                }
128            }
129            Queue::Unbounded { sender, .. } => {
130                let guard = sender.lock().await;
131                match &*guard {
132                    Some(sender) => match sender.send(item) {
133                        Ok(_) => {
134                            resource_manager.track_memory_allocation(1).await.ok();
135                            Ok(())
136                        },
137                        Err(_) => Err(QueueError::Disconnected),
138                    },
139                    None => Err(QueueError::Closed),
140                }
141            }
142        }
143    }
144
145    /// Get a rs2_stream for dequeuing items
146    pub fn dequeue(&self) -> impl Stream<Item = T> + Send + 'static {
147        let resource_manager = get_global_resource_manager();
148        match self {
149            Queue::Bounded { receiver, .. } => {
150                let receiver = Arc::clone(receiver);
151                let resource_manager = resource_manager.clone();
152                stream! {
153                    loop {
154                        let item = {
155                            let mut guard = receiver.lock().await;
156                            if let Some(rx) = &mut *guard {
157                                rx.recv().await
158                            } else {
159                                None
160                            }
161                        };
162                        match item {
163                            Some(item) => {
164                                resource_manager.track_memory_deallocation(1).await;
165                                yield item
166                            },
167                            None => break,
168                        }
169                    }
170                }
171                .boxed()
172            }
173            Queue::Unbounded { receiver, .. } => {
174                let receiver = Arc::clone(receiver);
175                let resource_manager = resource_manager.clone();
176                stream! {
177                    loop {
178                        let item = {
179                            let mut guard = receiver.lock().await;
180                            if let Some(rx) = &mut *guard {
181                                rx.recv().await
182                            } else {
183                                None
184                            }
185                        };
186                        match item {
187                            Some(item) => {
188                                resource_manager.track_memory_deallocation(1).await;
189                                yield item
190                            },
191                            None => break,
192                        }
193                    }
194                }
195                .boxed()
196            }
197        }
198    }
199
200    /// Close the queue, preventing further enqueues
201    pub async fn close(&self) {
202        match self {
203            Queue::Bounded { sender, .. } => {
204                let mut guard = sender.lock().await;
205                *guard = None;
206            }
207            Queue::Unbounded { sender, .. } => {
208                let mut guard = sender.lock().await;
209                *guard = None;
210            }
211        }
212    }
213
214    /// Get the capacity of the queue (None for unbounded)
215    pub fn capacity(&self) -> Option<usize> {
216        match self {
217            Queue::Bounded { capacity, .. } => Some(*capacity),
218            Queue::Unbounded { .. } => None,
219        }
220    }
221
222    /// Check if the queue is empty
223    pub async fn is_empty(&self) -> bool {
224        self.len().await == 0
225    }
226
227    /// Get the current number of items in the queue
228    pub async fn len(&self) -> usize {
229        match self {
230            Queue::Bounded { receiver, .. } => {
231                let guard = receiver.lock().await;
232                if let Some(rx) = &*guard {
233                    rx.len()
234                } else {
235                    0
236                }
237            }
238            Queue::Unbounded { receiver, .. } => {
239                let guard = receiver.lock().await;
240                if let Some(rx) = &*guard {
241                    rx.len()
242                } else {
243                    0
244                }
245            }
246        }
247    }
248}
249
250impl<T> fmt::Debug for Queue<T> {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        match self {
253            Queue::Bounded { capacity, .. } => f
254                .debug_struct("Queue::Bounded")
255                .field("capacity", capacity)
256                .finish(),
257            Queue::Unbounded { .. } => f.debug_struct("Queue::Unbounded").finish(),
258        }
259    }
260}