Skip to main content

fgumi_lib/unified_pipeline/
queue.rs

1//! Memory-bounded queue types for pipeline flow control.
2//!
3//! This module provides queues that enforce memory limits rather than item counts,
4//! enabling precise control over pipeline memory usage.
5//!
6//! # Key Types
7//!
8//! - [`OrderedQueue`]: A reorder buffer with smart backpressure to prevent deadlock
9//! - [`QueueStats`]: Statistics collected per queue for dynamic rebalancing
10
11use parking_lot::Mutex;
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14
15/// Statistics collected per queue for rebalancing decisions.
16#[derive(Debug, Clone, Default)]
17pub struct QueueStats {
18    /// Average memory usage over the epoch.
19    pub avg_bytes: u64,
20    /// Peak memory usage during the epoch.
21    pub peak_bytes: u64,
22    /// Total time blocked waiting to push (milliseconds).
23    pub time_blocked_ms: u64,
24}
25
26/// A reorder buffer that outputs items in serial order.
27///
28/// Uses smart backpressure to prevent deadlock:
29/// - When waiting for `next_seq`: MUST accept items (refusing would deadlock)
30/// - When we have `next_seq`: CAN refuse items (consumer can drain)
31///
32/// # Deadlock Prevention
33///
34/// The key insight is that if we're waiting for serial N, we must accept
35/// serials N+1, N+2, etc. because serial N might be produced by another
36/// thread that's blocked trying to push to this queue. Only when we have
37/// serial N available can we safely apply backpressure, because the consumer
38/// can make progress by draining serial N.
39///
40/// # Example
41///
42/// ```ignore
43/// let queue = OrderedQueue::new(1_000_000); // 1MB limit
44/// queue.insert(2, item2, 1000)?; // Accepted (waiting for 0)
45/// queue.insert(0, item0, 1000)?; // Accepted (now has 0)
46/// let (item, size) = queue.try_pop_next().unwrap(); // Returns item0
47/// queue.insert(1, item1, 1000)?; // Accepted (now has 1)
48/// ```
49pub struct OrderedQueue<T> {
50    inner: Mutex<OrderedQueueInner<T>>,
51    current_bytes: AtomicU64,
52    limit_bytes: AtomicU64,
53    next_seq: AtomicU64,  // Cached for lock-free checks
54    has_next: AtomicBool, // Cached: do we have next_seq?
55
56    // Stats
57    peak_bytes: AtomicU64,
58    samples_sum: AtomicU64,
59    samples_count: AtomicU64,
60    blocked_ns: AtomicU64,
61}
62
63struct OrderedQueueInner<T> {
64    buffer: HashMap<u64, (T, usize)>,
65    next_seq: u64,
66}
67
68impl<T> OrderedQueue<T> {
69    /// Create a new ordered queue with the given memory limit.
70    #[must_use]
71    pub fn new(limit_bytes: u64) -> Self {
72        Self {
73            inner: Mutex::new(OrderedQueueInner { buffer: HashMap::new(), next_seq: 0 }),
74            current_bytes: AtomicU64::new(0),
75            limit_bytes: AtomicU64::new(limit_bytes),
76            next_seq: AtomicU64::new(0),
77            has_next: AtomicBool::new(false),
78            peak_bytes: AtomicU64::new(0),
79            samples_sum: AtomicU64::new(0),
80            samples_count: AtomicU64::new(0),
81            blocked_ns: AtomicU64::new(0),
82        }
83    }
84
85    /// Check if we can accept an item (lock-free fast path).
86    ///
87    /// Returns true if:
88    /// - We don't have `next_seq` (must accept to make progress), OR
89    /// - We're under the memory limit
90    pub fn can_accept(&self, heap_size: usize) -> bool {
91        // If we don't have next_seq, we MUST accept (deadlock avoidance)
92        if !self.has_next.load(Ordering::Acquire) {
93            return true;
94        }
95
96        // We have next_seq, so consumer can drain. Apply backpressure.
97        let current = self.current_bytes.load(Ordering::Acquire);
98        let limit = self.limit_bytes.load(Ordering::Acquire);
99        current + heap_size as u64 <= limit
100    }
101
102    /// Insert an item into the reorder buffer.
103    ///
104    /// Acceptance rule:
105    /// - If we do NOT have `next_seq`: ACCEPT (must accumulate for progress)
106    /// - If we DO have `next_seq`: only accept if under memory limit
107    ///
108    /// Returns `Err((item, heap_size))` if rejected due to backpressure.
109    ///
110    /// # Errors
111    ///
112    /// Returns the item and heap size if rejected due to memory backpressure.
113    pub fn insert(&self, serial: u64, item: T, heap_size: usize) -> Result<(), (T, usize)> {
114        let mut inner = self.inner.lock();
115
116        let has_next = inner.buffer.contains_key(&inner.next_seq);
117
118        if has_next {
119            // Consumer can drain - apply backpressure
120            let current = self.current_bytes.load(Ordering::Acquire);
121            let limit = self.limit_bytes.load(Ordering::Acquire);
122            if current + heap_size as u64 > limit {
123                return Err((item, heap_size));
124            }
125        }
126        // else: must accept, we need to accumulate until next_seq arrives
127
128        inner.buffer.insert(serial, (item, heap_size));
129        let new_current =
130            self.current_bytes.fetch_add(heap_size as u64, Ordering::AcqRel) + heap_size as u64;
131
132        // Update cached state
133        let new_has_next = inner.buffer.contains_key(&inner.next_seq);
134        self.has_next.store(new_has_next, Ordering::Release);
135
136        // Update peak using CAS loop
137        let mut peak = self.peak_bytes.load(Ordering::Relaxed);
138        while new_current > peak {
139            match self.peak_bytes.compare_exchange_weak(
140                peak,
141                new_current,
142                Ordering::Relaxed,
143                Ordering::Relaxed,
144            ) {
145                Ok(_) => break,
146                Err(p) => peak = p,
147            }
148        }
149
150        Ok(())
151    }
152
153    /// Try to pop the next item in serial order.
154    ///
155    /// Returns `Some((item, heap_size))` if `next_seq` is available.
156    pub fn try_pop_next(&self) -> Option<(T, usize)> {
157        let mut inner = self.inner.lock();
158
159        let next = inner.next_seq;
160        if let Some((item, heap_size)) = inner.buffer.remove(&next) {
161            inner.next_seq += 1;
162            self.current_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
163
164            // Update cached state
165            self.next_seq.store(inner.next_seq, Ordering::Release);
166            let new_has_next = inner.buffer.contains_key(&inner.next_seq);
167            self.has_next.store(new_has_next, Ordering::Release);
168
169            Some((item, heap_size))
170        } else {
171            None
172        }
173    }
174
175    /// Get the next expected serial number.
176    pub fn next_seq(&self) -> u64 {
177        self.next_seq.load(Ordering::Acquire)
178    }
179
180    /// Check if we have the next expected serial (can make progress).
181    pub fn can_pop(&self) -> bool {
182        self.has_next.load(Ordering::Acquire)
183    }
184
185    /// Current memory usage in bytes.
186    pub fn current_bytes(&self) -> u64 {
187        self.current_bytes.load(Ordering::Acquire)
188    }
189
190    /// Update the memory limit (for dynamic rebalancing).
191    pub fn set_limit(&self, new_limit: u64) {
192        self.limit_bytes.store(new_limit, Ordering::Release);
193    }
194
195    /// Get current limit.
196    pub fn limit_bytes(&self) -> u64 {
197        self.limit_bytes.load(Ordering::Acquire)
198    }
199
200    /// Number of items in the buffer.
201    pub fn len(&self) -> usize {
202        self.inner.lock().buffer.len()
203    }
204
205    /// Check if buffer is empty.
206    pub fn is_empty(&self) -> bool {
207        self.inner.lock().buffer.is_empty()
208    }
209
210    /// Record a sample for stats.
211    pub fn record_sample(&self) {
212        let current = self.current_bytes.load(Ordering::Relaxed);
213        self.samples_sum.fetch_add(current, Ordering::Relaxed);
214        self.samples_count.fetch_add(1, Ordering::Relaxed);
215    }
216
217    /// Record blocked time in nanoseconds.
218    pub fn record_blocked(&self, ns: u64) {
219        self.blocked_ns.fetch_add(ns, Ordering::Relaxed);
220    }
221
222    /// Collect and reset stats.
223    pub fn collect_stats(&self) -> QueueStats {
224        let peak = self.peak_bytes.swap(0, Ordering::Relaxed);
225        let sum = self.samples_sum.swap(0, Ordering::Relaxed);
226        let count = self.samples_count.swap(0, Ordering::Relaxed);
227        let blocked = self.blocked_ns.swap(0, Ordering::Relaxed);
228
229        QueueStats {
230            avg_bytes: if count > 0 { sum / count } else { 0 },
231            peak_bytes: peak,
232            time_blocked_ms: blocked / 1_000_000,
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn test_ordered_queue_basic() {
243        let queue: OrderedQueue<u32> = OrderedQueue::new(1000);
244
245        // Insert out of order
246        assert!(queue.insert(2, 200, 10).is_ok());
247        assert!(queue.insert(0, 100, 10).is_ok());
248        assert!(queue.insert(1, 150, 10).is_ok());
249
250        // Pop in order
251        let (val, _) = queue.try_pop_next().unwrap();
252        assert_eq!(val, 100);
253        let (val, _) = queue.try_pop_next().unwrap();
254        assert_eq!(val, 150);
255        let (val, _) = queue.try_pop_next().unwrap();
256        assert_eq!(val, 200);
257
258        assert!(queue.try_pop_next().is_none());
259    }
260
261    #[test]
262    fn test_ordered_queue_backpressure_when_has_next() {
263        let queue: OrderedQueue<u32> = OrderedQueue::new(100);
264
265        // Insert serial 0 - now we have next_seq
266        assert!(queue.insert(0, 100, 50).is_ok());
267        assert!(queue.can_pop());
268
269        // We have next_seq, so backpressure applies
270        // Try to insert something that would exceed limit
271        assert!(queue.insert(1, 200, 60).is_err());
272
273        // But we can still insert if under limit
274        assert!(queue.insert(1, 200, 40).is_ok());
275    }
276
277    #[test]
278    fn test_ordered_queue_must_accept_when_waiting() {
279        let queue: OrderedQueue<u32> = OrderedQueue::new(100);
280
281        // No next_seq yet - must accept even if over limit
282        assert!(queue.insert(5, 500, 200).is_ok()); // Way over 100 byte limit
283        assert!(!queue.can_pop()); // Still waiting for serial 0
284
285        // Still must accept because we don't have serial 0
286        assert!(queue.insert(3, 300, 200).is_ok());
287        assert!(queue.insert(1, 100, 200).is_ok());
288
289        // Now insert serial 0
290        assert!(queue.insert(0, 0, 10).is_ok());
291        assert!(queue.can_pop()); // Now we have next_seq
292
293        // NOW backpressure applies - should reject
294        assert!(queue.insert(2, 200, 200).is_err());
295    }
296
297    /// Test that `OrderedQueue` backpressure respects memory limits.
298    #[test]
299    #[allow(clippy::cast_possible_truncation)]
300    fn test_ordered_queue_backpressure_memory_bound() {
301        // Small limit: 500 bytes
302        let queue: OrderedQueue<Vec<u8>> = OrderedQueue::new(500);
303
304        // Insert serial 0 first so backpressure can apply
305        assert!(queue.insert(0, vec![0u8; 100], 100).is_ok());
306
307        // Now try to insert items
308        let mut pushed = 0;
309        let mut rejected = 0;
310
311        for i in 1..20 {
312            let item = vec![i as u8; 100];
313            match queue.insert(i, item, 100) {
314                Ok(()) => pushed += 1,
315                Err(_) => rejected += 1,
316            }
317        }
318
319        // Should have some accepted and some rejected
320        assert!(pushed > 0, "Should accept some items");
321        assert!(rejected > 0, "Should reject items when over limit");
322
323        // Verify we can drain the queue
324        let mut count = 0;
325        while queue.try_pop_next().is_some() {
326            count += 1;
327        }
328        assert!(count > 0, "Should pop the items we inserted");
329    }
330}