ragc_core/
memory_bounded_queue.rs

1// Memory-bounded queue with backpressure
2// Matches C++ AGC's CBoundedPQueue behavior
3
4use std::collections::VecDeque;
5use std::sync::{Arc, Condvar, Mutex};
6
7/// A queue bounded by total bytes (not item count)
8///
9/// Key properties:
10/// - `push()` blocks when adding would exceed capacity
11/// - `pull()` blocks when queue is empty (returns None when closed)
12/// - Provides automatic backpressure for constant memory usage
13///
14/// This matches C++ AGC's CBoundedPQueue architecture.
15pub struct MemoryBoundedQueue<T> {
16    inner: Arc<Mutex<QueueInner<T>>>,
17    capacity_bytes: usize,
18    not_full: Arc<Condvar>,
19    not_empty: Arc<Condvar>,
20}
21
22struct QueueInner<T> {
23    items: VecDeque<(T, usize)>, // (item, size_bytes)
24    current_size: usize,         // Total bytes currently in queue
25    closed: bool,                // No more pushes allowed
26}
27
28impl<T> MemoryBoundedQueue<T> {
29    /// Create a new memory-bounded queue
30    ///
31    /// # Arguments
32    /// * `capacity_bytes` - Maximum total bytes allowed in queue
33    ///
34    /// # Example
35    /// ```
36    /// use ragc_core::MemoryBoundedQueue;
37    /// let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(2 * 1024 * 1024 * 1024); // 2 GB
38    /// ```
39    pub fn new(capacity_bytes: usize) -> Self {
40        Self {
41            inner: Arc::new(Mutex::new(QueueInner {
42                items: VecDeque::new(),
43                current_size: 0,
44                closed: false,
45            })),
46            capacity_bytes,
47            not_full: Arc::new(Condvar::new()),
48            not_empty: Arc::new(Condvar::new()),
49        }
50    }
51
52    /// Push an item to the queue with its size
53    ///
54    /// **BLOCKS** if adding this item would exceed capacity.
55    /// Returns error if queue is closed.
56    ///
57    /// # Arguments
58    /// * `item` - The item to push
59    /// * `size_bytes` - Size of the item in bytes
60    ///
61    /// # Example
62    /// ```no_run
63    /// # use ragc_core::MemoryBoundedQueue;
64    /// # let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
65    /// let contig_data = vec![b'A'; 1000];
66    /// queue.push(contig_data.clone(), contig_data.len()).unwrap(); // Blocks if queue is full!
67    /// ```
68    pub fn push(&self, item: T, size_bytes: usize) -> Result<(), PushError> {
69        let mut inner = self.inner.lock().unwrap();
70
71        // Wait while queue would be too full
72        while inner.current_size + size_bytes > self.capacity_bytes && !inner.closed {
73            inner = self.not_full.wait(inner).unwrap();
74        }
75
76        // Check if closed while we were waiting
77        if inner.closed {
78            return Err(PushError::Closed);
79        }
80
81        // Add item
82        inner.items.push_back((item, size_bytes));
83        inner.current_size += size_bytes;
84
85        // Signal that queue is not empty
86        self.not_empty.notify_one();
87
88        Ok(())
89    }
90
91    /// Try to push without blocking
92    ///
93    /// Returns `Err(WouldBlock)` if adding would exceed capacity.
94    pub fn try_push(&self, item: T, size_bytes: usize) -> Result<(), TryPushError> {
95        let mut inner = self.inner.lock().unwrap();
96
97        if inner.closed {
98            return Err(TryPushError::Closed);
99        }
100
101        if inner.current_size + size_bytes > self.capacity_bytes {
102            return Err(TryPushError::WouldBlock);
103        }
104
105        // Add item
106        inner.items.push_back((item, size_bytes));
107        inner.current_size += size_bytes;
108
109        // Signal that queue is not empty
110        self.not_empty.notify_one();
111
112        Ok(())
113    }
114
115    /// Pull an item from the queue
116    ///
117    /// **BLOCKS** if queue is empty.
118    /// Returns `None` if queue is closed and empty.
119    ///
120    /// # Example
121    /// ```no_run
122    /// # use ragc_core::MemoryBoundedQueue;
123    /// # let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
124    /// while let Some(item) = queue.pull() {
125    ///     // process(item);
126    /// }
127    /// // Queue is closed and empty - we're done!
128    /// ```
129    pub fn pull(&self) -> Option<T> {
130        let mut inner = self.inner.lock().unwrap();
131
132        // Wait while queue is empty and not closed
133        while inner.items.is_empty() && !inner.closed {
134            inner = self.not_empty.wait(inner).unwrap();
135        }
136
137        // If closed and empty, return None
138        if inner.items.is_empty() {
139            return None;
140        }
141
142        // Remove item
143        let (item, size) = inner.items.pop_front().unwrap();
144        inner.current_size -= size;
145
146        // Signal that queue has space
147        self.not_full.notify_one();
148
149        Some(item)
150    }
151
152    /// Try to pull without blocking
153    ///
154    /// Returns `None` if queue is empty (even if not closed).
155    pub fn try_pull(&self) -> Option<T> {
156        let mut inner = self.inner.lock().unwrap();
157
158        if inner.items.is_empty() {
159            return None;
160        }
161
162        // Remove item
163        let (item, size) = inner.items.pop_front().unwrap();
164        inner.current_size -= size;
165
166        // Signal that queue has space
167        self.not_full.notify_one();
168
169        Some(item)
170    }
171
172    /// Close the queue
173    ///
174    /// After closing:
175    /// - No more pushes allowed (returns error)
176    /// - Pulls will drain remaining items, then return None
177    /// - Workers can detect completion via `pull()` returning None
178    pub fn close(&self) {
179        let mut inner = self.inner.lock().unwrap();
180        inner.closed = true;
181
182        // Wake up all waiting threads
183        self.not_full.notify_all();
184        self.not_empty.notify_all();
185    }
186
187    /// Check if queue is closed
188    pub fn is_closed(&self) -> bool {
189        self.inner.lock().unwrap().closed
190    }
191
192    /// Get current size in bytes
193    pub fn current_size(&self) -> usize {
194        self.inner.lock().unwrap().current_size
195    }
196
197    /// Get current item count
198    pub fn len(&self) -> usize {
199        self.inner.lock().unwrap().items.len()
200    }
201
202    /// Check if queue is empty
203    pub fn is_empty(&self) -> bool {
204        self.inner.lock().unwrap().items.is_empty()
205    }
206
207    /// Get capacity in bytes
208    pub fn capacity(&self) -> usize {
209        self.capacity_bytes
210    }
211}
212
213// Make queue cloneable (clones share the same underlying queue)
214impl<T> Clone for MemoryBoundedQueue<T> {
215    fn clone(&self) -> Self {
216        Self {
217            inner: Arc::clone(&self.inner),
218            capacity_bytes: self.capacity_bytes,
219            not_full: Arc::clone(&self.not_full),
220            not_empty: Arc::clone(&self.not_empty),
221        }
222    }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub enum PushError {
227    Closed,
228}
229
230impl std::fmt::Display for PushError {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        match self {
233            PushError::Closed => write!(f, "Queue is closed"),
234        }
235    }
236}
237
238impl std::error::Error for PushError {}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum TryPushError {
242    Closed,
243    WouldBlock,
244}
245
246impl std::fmt::Display for TryPushError {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        match self {
249            TryPushError::Closed => write!(f, "Queue is closed"),
250            TryPushError::WouldBlock => write!(f, "Queue is full - would block"),
251        }
252    }
253}
254
255impl std::error::Error for TryPushError {}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use std::sync::atomic::{AtomicBool, Ordering};
261    use std::thread;
262    use std::time::Duration;
263
264    #[test]
265    fn test_basic_push_pull() {
266        let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
267
268        // Push some data
269        let data = vec![0u8; 100];
270        queue.push(data.clone(), 100).unwrap();
271
272        // Pull it back
273        let pulled = queue.pull().unwrap();
274        assert_eq!(pulled, data);
275    }
276
277    #[test]
278    fn test_backpressure() {
279        let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
280
281        // Fill queue to capacity
282        queue.push(vec![0u8; 512], 512).unwrap();
283        queue.push(vec![0u8; 512], 512).unwrap();
284
285        // Try to push more - should block!
286        let blocked = Arc::new(AtomicBool::new(false));
287        let blocked_clone = Arc::clone(&blocked);
288        let queue_clone = queue.clone();
289
290        let handle = thread::spawn(move || {
291            blocked_clone.store(true, Ordering::SeqCst);
292            queue_clone.push(vec![0u8; 100], 100).unwrap();
293            blocked_clone.store(false, Ordering::SeqCst);
294        });
295
296        // Wait a bit - thread should still be blocked
297        thread::sleep(Duration::from_millis(100));
298        assert!(blocked.load(Ordering::SeqCst), "Push should be blocked!");
299
300        // Pull an item - should unblock the push
301        queue.pull().unwrap();
302
303        // Wait for thread to finish
304        handle.join().unwrap();
305        assert!(
306            !blocked.load(Ordering::SeqCst),
307            "Push should have completed!"
308        );
309    }
310
311    #[test]
312    fn test_close_queue() {
313        let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
314
315        // Push some items
316        queue.push(vec![0u8; 100], 100).unwrap();
317        queue.push(vec![0u8; 100], 100).unwrap();
318
319        // Close queue
320        queue.close();
321
322        // Can't push anymore
323        assert!(queue.push(vec![0u8; 100], 100).is_err());
324
325        // Can still pull existing items
326        assert!(queue.pull().is_some());
327        assert!(queue.pull().is_some());
328
329        // Now empty - returns None
330        assert!(queue.pull().is_none());
331    }
332
333    #[test]
334    fn test_try_operations() {
335        let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(100);
336
337        // try_push succeeds when there's space
338        assert!(queue.try_push(vec![0u8; 50], 50).is_ok());
339
340        // try_push fails when would exceed capacity
341        assert_eq!(
342            queue.try_push(vec![0u8; 60], 60),
343            Err(TryPushError::WouldBlock)
344        );
345
346        // try_pull succeeds when there's an item
347        assert!(queue.try_pull().is_some());
348
349        // try_pull returns None when empty
350        assert!(queue.try_pull().is_none());
351    }
352
353    #[test]
354    fn test_multiple_producers_consumers() {
355        let queue: MemoryBoundedQueue<usize> = MemoryBoundedQueue::new(1000);
356
357        // Spawn 3 producers
358        let mut producers = vec![];
359        for i in 0..3 {
360            let q = queue.clone();
361            producers.push(thread::spawn(move || {
362                for j in 0..100 {
363                    q.push(i * 100 + j, 10).unwrap();
364                }
365            }));
366        }
367
368        // Spawn 2 consumers
369        let mut consumers = vec![];
370        for _ in 0..2 {
371            let q = queue.clone();
372            consumers.push(thread::spawn(move || {
373                let mut count = 0;
374                while let Some(_) = q.pull() {
375                    count += 1;
376                    if count == 150 {
377                        break; // Each consumer gets 150 items
378                    }
379                }
380                count
381            }));
382        }
383
384        // Wait for producers
385        for p in producers {
386            p.join().unwrap();
387        }
388
389        // Close queue
390        queue.close();
391
392        // Wait for consumers
393        let mut total = 0;
394        for c in consumers {
395            total += c.join().unwrap();
396        }
397
398        // Should have consumed all 300 items
399        assert_eq!(total, 300);
400    }
401
402    #[test]
403    fn test_size_tracking() {
404        let queue: MemoryBoundedQueue<Vec<u8>> = MemoryBoundedQueue::new(1024);
405
406        assert_eq!(queue.current_size(), 0);
407
408        queue.push(vec![0u8; 100], 100).unwrap();
409        assert_eq!(queue.current_size(), 100);
410
411        queue.push(vec![0u8; 200], 200).unwrap();
412        assert_eq!(queue.current_size(), 300);
413
414        queue.pull().unwrap();
415        assert_eq!(queue.current_size(), 200);
416
417        queue.pull().unwrap();
418        assert_eq!(queue.current_size(), 0);
419    }
420}