fgumi_lib/unified_pipeline/
queue.rs1use parking_lot::Mutex;
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14
15#[derive(Debug, Clone, Default)]
17pub struct QueueStats {
18 pub avg_bytes: u64,
20 pub peak_bytes: u64,
22 pub time_blocked_ms: u64,
24}
25
26pub struct OrderedQueue<T> {
50 inner: Mutex<OrderedQueueInner<T>>,
51 current_bytes: AtomicU64,
52 limit_bytes: AtomicU64,
53 next_seq: AtomicU64, has_next: AtomicBool, peak_bytes: AtomicU64,
58 samples_sum: AtomicU64,
59 samples_count: AtomicU64,
60 blocked_ns: AtomicU64,
61}
62
63struct OrderedQueueInner<T> {
64 buffer: HashMap<u64, (T, usize)>,
65 next_seq: u64,
66}
67
68impl<T> OrderedQueue<T> {
69 #[must_use]
71 pub fn new(limit_bytes: u64) -> Self {
72 Self {
73 inner: Mutex::new(OrderedQueueInner { buffer: HashMap::new(), next_seq: 0 }),
74 current_bytes: AtomicU64::new(0),
75 limit_bytes: AtomicU64::new(limit_bytes),
76 next_seq: AtomicU64::new(0),
77 has_next: AtomicBool::new(false),
78 peak_bytes: AtomicU64::new(0),
79 samples_sum: AtomicU64::new(0),
80 samples_count: AtomicU64::new(0),
81 blocked_ns: AtomicU64::new(0),
82 }
83 }
84
85 pub fn can_accept(&self, heap_size: usize) -> bool {
91 if !self.has_next.load(Ordering::Acquire) {
93 return true;
94 }
95
96 let current = self.current_bytes.load(Ordering::Acquire);
98 let limit = self.limit_bytes.load(Ordering::Acquire);
99 current + heap_size as u64 <= limit
100 }
101
102 pub fn insert(&self, serial: u64, item: T, heap_size: usize) -> Result<(), (T, usize)> {
114 let mut inner = self.inner.lock();
115
116 let has_next = inner.buffer.contains_key(&inner.next_seq);
117
118 if has_next {
119 let current = self.current_bytes.load(Ordering::Acquire);
121 let limit = self.limit_bytes.load(Ordering::Acquire);
122 if current + heap_size as u64 > limit {
123 return Err((item, heap_size));
124 }
125 }
126 inner.buffer.insert(serial, (item, heap_size));
129 let new_current =
130 self.current_bytes.fetch_add(heap_size as u64, Ordering::AcqRel) + heap_size as u64;
131
132 let new_has_next = inner.buffer.contains_key(&inner.next_seq);
134 self.has_next.store(new_has_next, Ordering::Release);
135
136 let mut peak = self.peak_bytes.load(Ordering::Relaxed);
138 while new_current > peak {
139 match self.peak_bytes.compare_exchange_weak(
140 peak,
141 new_current,
142 Ordering::Relaxed,
143 Ordering::Relaxed,
144 ) {
145 Ok(_) => break,
146 Err(p) => peak = p,
147 }
148 }
149
150 Ok(())
151 }
152
153 pub fn try_pop_next(&self) -> Option<(T, usize)> {
157 let mut inner = self.inner.lock();
158
159 let next = inner.next_seq;
160 if let Some((item, heap_size)) = inner.buffer.remove(&next) {
161 inner.next_seq += 1;
162 self.current_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
163
164 self.next_seq.store(inner.next_seq, Ordering::Release);
166 let new_has_next = inner.buffer.contains_key(&inner.next_seq);
167 self.has_next.store(new_has_next, Ordering::Release);
168
169 Some((item, heap_size))
170 } else {
171 None
172 }
173 }
174
175 pub fn next_seq(&self) -> u64 {
177 self.next_seq.load(Ordering::Acquire)
178 }
179
180 pub fn can_pop(&self) -> bool {
182 self.has_next.load(Ordering::Acquire)
183 }
184
185 pub fn current_bytes(&self) -> u64 {
187 self.current_bytes.load(Ordering::Acquire)
188 }
189
190 pub fn set_limit(&self, new_limit: u64) {
192 self.limit_bytes.store(new_limit, Ordering::Release);
193 }
194
195 pub fn limit_bytes(&self) -> u64 {
197 self.limit_bytes.load(Ordering::Acquire)
198 }
199
200 pub fn len(&self) -> usize {
202 self.inner.lock().buffer.len()
203 }
204
205 pub fn is_empty(&self) -> bool {
207 self.inner.lock().buffer.is_empty()
208 }
209
210 pub fn record_sample(&self) {
212 let current = self.current_bytes.load(Ordering::Relaxed);
213 self.samples_sum.fetch_add(current, Ordering::Relaxed);
214 self.samples_count.fetch_add(1, Ordering::Relaxed);
215 }
216
217 pub fn record_blocked(&self, ns: u64) {
219 self.blocked_ns.fetch_add(ns, Ordering::Relaxed);
220 }
221
222 pub fn collect_stats(&self) -> QueueStats {
224 let peak = self.peak_bytes.swap(0, Ordering::Relaxed);
225 let sum = self.samples_sum.swap(0, Ordering::Relaxed);
226 let count = self.samples_count.swap(0, Ordering::Relaxed);
227 let blocked = self.blocked_ns.swap(0, Ordering::Relaxed);
228
229 QueueStats {
230 avg_bytes: if count > 0 { sum / count } else { 0 },
231 peak_bytes: peak,
232 time_blocked_ms: blocked / 1_000_000,
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn test_ordered_queue_basic() {
243 let queue: OrderedQueue<u32> = OrderedQueue::new(1000);
244
245 assert!(queue.insert(2, 200, 10).is_ok());
247 assert!(queue.insert(0, 100, 10).is_ok());
248 assert!(queue.insert(1, 150, 10).is_ok());
249
250 let (val, _) = queue.try_pop_next().unwrap();
252 assert_eq!(val, 100);
253 let (val, _) = queue.try_pop_next().unwrap();
254 assert_eq!(val, 150);
255 let (val, _) = queue.try_pop_next().unwrap();
256 assert_eq!(val, 200);
257
258 assert!(queue.try_pop_next().is_none());
259 }
260
261 #[test]
262 fn test_ordered_queue_backpressure_when_has_next() {
263 let queue: OrderedQueue<u32> = OrderedQueue::new(100);
264
265 assert!(queue.insert(0, 100, 50).is_ok());
267 assert!(queue.can_pop());
268
269 assert!(queue.insert(1, 200, 60).is_err());
272
273 assert!(queue.insert(1, 200, 40).is_ok());
275 }
276
277 #[test]
278 fn test_ordered_queue_must_accept_when_waiting() {
279 let queue: OrderedQueue<u32> = OrderedQueue::new(100);
280
281 assert!(queue.insert(5, 500, 200).is_ok()); assert!(!queue.can_pop()); assert!(queue.insert(3, 300, 200).is_ok());
287 assert!(queue.insert(1, 100, 200).is_ok());
288
289 assert!(queue.insert(0, 0, 10).is_ok());
291 assert!(queue.can_pop()); assert!(queue.insert(2, 200, 200).is_err());
295 }
296
297 #[test]
299 #[allow(clippy::cast_possible_truncation)]
300 fn test_ordered_queue_backpressure_memory_bound() {
301 let queue: OrderedQueue<Vec<u8>> = OrderedQueue::new(500);
303
304 assert!(queue.insert(0, vec![0u8; 100], 100).is_ok());
306
307 let mut pushed = 0;
309 let mut rejected = 0;
310
311 for i in 1..20 {
312 let item = vec![i as u8; 100];
313 match queue.insert(i, item, 100) {
314 Ok(()) => pushed += 1,
315 Err(_) => rejected += 1,
316 }
317 }
318
319 assert!(pushed > 0, "Should accept some items");
321 assert!(rejected > 0, "Should reject items when over limit");
322
323 let mut count = 0;
325 while queue.try_pop_next().is_some() {
326 count += 1;
327 }
328 assert!(count > 0, "Should pop the items we inserted");
329 }
330}