ragc_core/
memory_bounded_queue.rs1use std::collections::VecDeque;
5use std::sync::{Arc, Condvar, Mutex};
6
7pub struct MemoryBoundedQueue<T> {
16 inner: Arc<Mutex<QueueInner<T>>>,
17 capacity_bytes: usize,
18 not_full: Arc<Condvar>,
19 not_empty: Arc<Condvar>,
20}
21
22struct QueueInner<T> {
23 items: VecDeque<(T, usize)>, current_size: usize, closed: bool, }
27
28impl<T> MemoryBoundedQueue<T> {
29 pub fn new(capacity_bytes: usize) -> Self {
40 Self {
41 inner: Arc::new(Mutex::new(QueueInner {
42 items: VecDeque::new(),
43 current_size: 0,
44 closed: false,
45 })),
46 capacity_bytes,
47 not_full: Arc::new(Condvar::new()),
48 not_empty: Arc::new(Condvar::new()),
49 }
50 }
51
52 pub fn push(&self, item: T, size_bytes: usize) -> Result<(), PushError> {
69 let mut inner = self.inner.lock().unwrap();
70
71 while inner.current_size + size_bytes > self.capacity_bytes && !inner.closed {
73 inner = self.not_full.wait(inner).unwrap();
74 }
75
76 if inner.closed {
78 return Err(PushError::Closed);
79 }
80
81 inner.items.push_back((item, size_bytes));
83 inner.current_size += size_bytes;
84
85 self.not_empty.notify_one();
87
88 Ok(())
89 }
90
91 pub fn try_push(&self, item: T, size_bytes: usize) -> Result<(), TryPushError> {
95 let mut inner = self.inner.lock().unwrap();
96
97 if inner.closed {
98 return Err(TryPushError::Closed);
99 }
100
101 if inner.current_size + size_bytes > self.capacity_bytes {
102 return Err(TryPushError::WouldBlock);
103 }
104
105 inner.items.push_back((item, size_bytes));
107 inner.current_size += size_bytes;
108
109 self.not_empty.notify_one();
111
112 Ok(())
113 }
114
115 pub fn pull(&self) -> Option<T> {
130 let mut inner = self.inner.lock().unwrap();
131
132 while inner.items.is_empty() && !inner.closed {
134 inner = self.not_empty.wait(inner).unwrap();
135 }
136
137 if inner.items.is_empty() {
139 return None;
140 }
141
142 let (item, size) = inner.items.pop_front().unwrap();
144 inner.current_size -= size;
145
146 self.not_full.notify_one();
148
149 Some(item)
150 }
151
152 pub fn try_pull(&self) -> Option<T> {
156 let mut inner = self.inner.lock().unwrap();
157
158 if inner.items.is_empty() {
159 return None;
160 }
161
162 let (item, size) = inner.items.pop_front().unwrap();
164 inner.current_size -= size;
165
166 self.not_full.notify_one();
168
169 Some(item)
170 }
171
172 pub fn close(&self) {
179 let mut inner = self.inner.lock().unwrap();
180 inner.closed = true;
181
182 self.not_full.notify_all();
184 self.not_empty.notify_all();
185 }
186
187 pub fn is_closed(&self) -> bool {
189 self.inner.lock().unwrap().closed
190 }
191
192 pub fn current_size(&self) -> usize {
194 self.inner.lock().unwrap().current_size
195 }
196
197 pub fn len(&self) -> usize {
199 self.inner.lock().unwrap().items.len()
200 }
201
202 pub fn is_empty(&self) -> bool {
204 self.inner.lock().unwrap().items.is_empty()
205 }
206
207 pub fn capacity(&self) -> usize {
209 self.capacity_bytes
210 }
211}
212
213impl<T> Clone for MemoryBoundedQueue<T> {
215 fn clone(&self) -> Self {
216 Self {
217 inner: Arc::clone(&self.inner),
218 capacity_bytes: self.capacity_bytes,
219 not_full: Arc::clone(&self.not_full),
220 not_empty: Arc::clone(&self.not_empty),
221 }
222 }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum PushError {
227 Closed,
228}
229
230impl std::fmt::Display for PushError {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 match self {
233 PushError::Closed => write!(f, "Queue is closed"),
234 }
235 }
236}
237
238impl std::error::Error for PushError {}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum TryPushError {
242 Closed,
243 WouldBlock,
244}
245
246impl std::fmt::Display for TryPushError {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 match self {
249 TryPushError::Closed => write!(f, "Queue is closed"),
250 TryPushError::WouldBlock => write!(f, "Queue is full - would block"),
251 }
252 }
253}
254
255impl std::error::Error for TryPushError {}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use std::sync::atomic::{AtomicBool, Ordering};
261 use std::thread;
262 use std::time::Duration;
263
264 #[test]
265 fn test_basic_push_pull() {
266 let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
267
268 let data = vec![0u8; 100];
270 queue.push(data.clone(), 100).unwrap();
271
272 let pulled = queue.pull().unwrap();
274 assert_eq!(pulled, data);
275 }
276
277 #[test]
278 fn test_backpressure() {
279 let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
280
281 queue.push(vec![0u8; 512], 512).unwrap();
283 queue.push(vec![0u8; 512], 512).unwrap();
284
285 let blocked = Arc::new(AtomicBool::new(false));
287 let blocked_clone = Arc::clone(&blocked);
288 let queue_clone = queue.clone();
289
290 let handle = thread::spawn(move || {
291 blocked_clone.store(true, Ordering::SeqCst);
292 queue_clone.push(vec![0u8; 100], 100).unwrap();
293 blocked_clone.store(false, Ordering::SeqCst);
294 });
295
296 thread::sleep(Duration::from_millis(100));
298 assert!(blocked.load(Ordering::SeqCst), "Push should be blocked!");
299
300 queue.pull().unwrap();
302
303 handle.join().unwrap();
305 assert!(
306 !blocked.load(Ordering::SeqCst),
307 "Push should have completed!"
308 );
309 }
310
311 #[test]
312 fn test_close_queue() {
313 let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
314
315 queue.push(vec![0u8; 100], 100).unwrap();
317 queue.push(vec![0u8; 100], 100).unwrap();
318
319 queue.close();
321
322 assert!(queue.push(vec![0u8; 100], 100).is_err());
324
325 assert!(queue.pull().is_some());
327 assert!(queue.pull().is_some());
328
329 assert!(queue.pull().is_none());
331 }
332
333 #[test]
334 fn test_try_operations() {
335 let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(100);
336
337 assert!(queue.try_push(vec![0u8; 50], 50).is_ok());
339
340 assert_eq!(
342 queue.try_push(vec![0u8; 60], 60),
343 Err(TryPushError::WouldBlock)
344 );
345
346 assert!(queue.try_pull().is_some());
348
349 assert!(queue.try_pull().is_none());
351 }
352
353 #[test]
354 fn test_multiple_producers_consumers() {
355 let queue: MemoryBoundedQueue<usize> = MemoryBoundedQueue::new(1000);
356
357 let mut producers = vec![];
359 for i in 0..3 {
360 let q = queue.clone();
361 producers.push(thread::spawn(move || {
362 for j in 0..100 {
363 q.push(i * 100 + j, 10).unwrap();
364 }
365 }));
366 }
367
368 let mut consumers = vec![];
370 for _ in 0..2 {
371 let q = queue.clone();
372 consumers.push(thread::spawn(move || {
373 let mut count = 0;
374 while let Some(_) = q.pull() {
375 count += 1;
376 if count == 150 {
377 break; }
379 }
380 count
381 }));
382 }
383
384 for p in producers {
386 p.join().unwrap();
387 }
388
389 queue.close();
391
392 let mut total = 0;
394 for c in consumers {
395 total += c.join().unwrap();
396 }
397
398 assert_eq!(total, 300);
400 }
401
402 #[test]
403 fn test_size_tracking() {
404 let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
405
406 assert_eq!(queue.current_size(), 0);
407
408 queue.push(vec![0u8; 100], 100).unwrap();
409 assert_eq!(queue.current_size(), 100);
410
411 queue.push(vec![0u8; 200], 200).unwrap();
412 assert_eq!(queue.current_size(), 300);
413
414 queue.pull().unwrap();
415 assert_eq!(queue.current_size(), 200);
416
417 queue.pull().unwrap();
418 assert_eq!(queue.current_size(), 0);
419 }
420}