rs2_stream/
queue.rs

1
2//! Queue implementation for RStream
3//!
4//! This module provides a concurrent queue with Stream interface for dequeuing
5//! and async methods for enqueuing.
6
7use async_stream::stream;
8use futures_core::Stream;
9use futures_util::StreamExt;
10use std::fmt;
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13
14/// Error types for Queue operations
15#[derive(Debug, Clone, PartialEq)]
16pub enum QueueError {
17    /// Queue has been closed
18    Closed,
19    /// Queue is full (for bounded queues with try_enqueue)
20    Full,
21    /// Channel disconnected
22    Disconnected,
23}
24
25impl fmt::Display for QueueError {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        match self {
28            QueueError::Closed => write!(f, "Queue is closed"),
29            QueueError::Full => write!(f, "Queue is full"),
30            QueueError::Disconnected => write!(f, "Queue channel disconnected"),
31        }
32    }
33}
34
35impl std::error::Error for QueueError {}
36
37/// A Queue represents a concurrent queue with a Stream interface for dequeuing
38/// and async methods for enqueuing.
39#[derive(Clone)]
40pub enum Queue<T> {
41    Bounded {
42        sender: Arc<Mutex<Option<mpsc::Sender<T>>>>,
43        receiver: Arc<Mutex<Option<mpsc::Receiver<T>>>>,
44        capacity: usize,
45    },
46    Unbounded {
47        sender: Arc<Mutex<Option<mpsc::UnboundedSender<T>>>>,
48        receiver: Arc<Mutex<Option<mpsc::UnboundedReceiver<T>>>>,
49    },
50}
51
52impl<T> Queue<T>
53where
54    T: Send + 'static,
55{
56    /// Create a new bounded queue with the given capacity
57    pub fn bounded(capacity: usize) -> Self {
58        let (sender, receiver) = mpsc::channel(capacity);
59        Queue::Bounded {
60            sender: Arc::new(Mutex::new(Some(sender))),
61            receiver: Arc::new(Mutex::new(Some(receiver))),
62            capacity,
63        }
64    }
65
66    /// Create a new unbounded queue
67    pub fn unbounded() -> Self {
68        let (sender, receiver) = mpsc::unbounded_channel();
69        Queue::Unbounded {
70            sender: Arc::new(Mutex::new(Some(sender))),
71            receiver: Arc::new(Mutex::new(Some(receiver))),
72        }
73    }
74
75    /// Enqueue an item into the queue
76    pub async fn enqueue(&self, item: T) -> Result<(), QueueError> {
77        match self {
78            Queue::Bounded { sender, .. } => {
79                let guard = sender.lock().await;
80                match &*guard {
81                    Some(sender) => {
82                        match sender.send(item).await {
83                            Ok(_) => Ok(()),
84                            Err(_) => Err(QueueError::Disconnected),
85                        }
86                    }
87                    None => Err(QueueError::Closed),
88                }
89            }
90            Queue::Unbounded { sender, .. } => {
91                let guard = sender.lock().await;
92                match &*guard {
93                    Some(sender) => {
94                        match sender.send(item) {
95                            Ok(_) => Ok(()),
96                            Err(_) => Err(QueueError::Disconnected),
97                        }
98                    }
99                    None => Err(QueueError::Closed),
100                }
101            }
102        }
103    }
104
105    /// Try to enqueue an item without blocking
106    pub async fn try_enqueue(&self, item: T) -> Result<(), QueueError> {
107        match self {
108            Queue::Bounded { sender, .. } => {
109                let guard = sender.lock().await;
110                match &*guard {
111                    Some(sender) => {
112                        match sender.try_send(item) {
113                            Ok(_) => Ok(()),
114                            Err(mpsc::error::TrySendError::Full(_)) => Err(QueueError::Full),
115                            Err(mpsc::error::TrySendError::Closed(_)) => Err(QueueError::Disconnected),
116                        }
117                    }
118                    None => Err(QueueError::Closed),
119                }
120            }
121            Queue::Unbounded { sender, .. } => {
122                let guard = sender.lock().await;
123                match &*guard {
124                    Some(sender) => {
125                        match sender.send(item) {
126                            Ok(_) => Ok(()),
127                            Err(_) => Err(QueueError::Disconnected),
128                        }
129                    }
130                    None => Err(QueueError::Closed),
131                }
132            }
133        }
134    }
135
136    /// Get a rs2_stream for dequeuing items
137    pub fn dequeue(&self) -> impl Stream<Item = T> + Send + 'static {
138        match self {
139            Queue::Bounded { receiver, .. } => {
140                let receiver = Arc::clone(receiver);
141
142                stream! {
143                    loop {
144                        let item = {
145                            let mut guard = receiver.lock().await;
146                            if let Some(rx) = &mut *guard {
147                                rx.recv().await
148                            } else {
149                                None
150                            }
151                        };
152                        
153                        match item {
154                            Some(item) => yield item,
155                            None => break,
156                        }
157                    }
158                }.boxed()
159            }
160            Queue::Unbounded { receiver, .. } => {
161                let receiver = Arc::clone(receiver);
162
163                stream! {
164                    loop {
165                        let item = {
166                            let mut guard = receiver.lock().await;
167                            if let Some(rx) = &mut *guard {
168                                rx.recv().await
169                            } else {
170                                None
171                            }
172                        };
173                        
174                        match item {
175                            Some(item) => yield item,
176                            None => break,
177                        }
178                    }
179                }.boxed()
180            }
181        }
182    }
183
184    /// Close the queue, preventing further enqueues
185    pub async fn close(&self) {
186        match self {
187            Queue::Bounded { sender, .. } => {
188                let mut guard = sender.lock().await;
189                *guard = None;
190            }
191            Queue::Unbounded { sender, .. } => {
192                let mut guard = sender.lock().await;
193                *guard = None;
194            }
195        }
196    }
197
198    /// Get the capacity of the queue (None for unbounded)
199    pub fn capacity(&self) -> Option<usize> {
200        match self {
201            Queue::Bounded { capacity, .. } => Some(*capacity),
202            Queue::Unbounded { .. } => None,
203        }
204    }
205
206    /// Check if the queue is empty
207    pub async fn is_empty(&self) -> bool {
208        self.len().await == 0
209    }
210
211    /// Get the current number of items in the queue
212    pub async fn len(&self) -> usize {
213        match self {
214            Queue::Bounded { receiver, .. } => {
215                let guard = receiver.lock().await;
216                if let Some(rx) = &*guard {
217                    rx.len()
218                } else {
219                    0
220                }
221            }
222            Queue::Unbounded { receiver, .. } => {
223                let guard = receiver.lock().await;
224                if let Some(rx) = &*guard {
225                    rx.len()
226                } else {
227                    0
228                }
229            }
230        }
231    }
232}
233
234impl<T> fmt::Debug for Queue<T> {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        match self {
237            Queue::Bounded { capacity, .. } => {
238                f.debug_struct("Queue::Bounded")
239                    .field("capacity", capacity)
240                    .finish()
241            }
242            Queue::Unbounded { .. } => {
243                f.debug_struct("Queue::Unbounded").finish()
244            }
245        }
246    }
247}