oximedia_core/
work_queue.rs1#![allow(dead_code)]
20#![allow(clippy::module_name_repetitions)]
21
22#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct WorkItem<T> {
36 pub payload: T,
38 pub priority: u32,
40}
41
42impl<T> WorkItem<T> {
43 #[must_use]
45 pub const fn new(payload: T, priority: u32) -> Self {
46 Self { payload, priority }
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum QueueError {
53 Full,
55 BatchTooLarge,
57}
58
59impl std::fmt::Display for QueueError {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Full => write!(f, "work queue is full"),
63 Self::BatchTooLarge => write!(f, "batch size exceeds queue capacity"),
64 }
65 }
66}
67
68impl std::error::Error for QueueError {}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub struct QueueStats {
86 pub total_pushed: u64,
88 pub total_popped: u64,
90 pub total_rejected: u64,
92}
93
94impl QueueStats {
95 #[inline]
99 #[must_use]
100 pub fn in_flight(&self) -> u64 {
101 self.total_pushed.saturating_sub(self.total_popped)
102 }
103}
104
105#[derive(Debug)]
126pub struct WorkQueue<T> {
127 items: Vec<WorkItem<T>>,
129 capacity: usize,
130 stats: QueueStats,
131}
132
133impl<T> WorkQueue<T> {
134 #[must_use]
140 pub fn new(capacity: usize) -> Self {
141 assert!(capacity > 0, "WorkQueue capacity must be > 0");
142 Self {
143 items: Vec::with_capacity(capacity.min(256)),
144 capacity,
145 stats: QueueStats::default(),
146 }
147 }
148
149 #[inline]
151 #[must_use]
152 pub const fn capacity(&self) -> usize {
153 self.capacity
154 }
155
156 #[inline]
158 #[must_use]
159 pub fn len(&self) -> usize {
160 self.items.len()
161 }
162
163 #[inline]
165 #[must_use]
166 pub fn is_empty(&self) -> bool {
167 self.items.is_empty()
168 }
169
170 #[inline]
172 #[must_use]
173 pub fn is_full(&self) -> bool {
174 self.items.len() >= self.capacity
175 }
176
177 pub fn push(&mut self, item: WorkItem<T>) -> Result<(), QueueError> {
183 if self.is_full() {
184 self.stats.total_rejected += 1;
185 return Err(QueueError::Full);
186 }
187 let pos = self
189 .items
190 .partition_point(|existing| existing.priority <= item.priority);
191 self.items.insert(pos, item);
192 self.stats.total_pushed += 1;
193 Ok(())
194 }
195
196 pub fn pop(&mut self) -> Option<WorkItem<T>> {
198 let item = self.items.pop(); if item.is_some() {
200 self.stats.total_popped += 1;
201 }
202 item
203 }
204
205 pub fn pop_batch(&mut self, n: usize) -> Result<Vec<WorkItem<T>>, QueueError> {
211 if n > self.capacity {
212 return Err(QueueError::BatchTooLarge);
213 }
214 let take = n.min(self.items.len());
215 let start = self.items.len().saturating_sub(take);
216 let batch: Vec<WorkItem<T>> = self.items.drain(start..).rev().collect();
217 self.stats.total_popped += batch.len() as u64;
218 Ok(batch)
219 }
220
221 #[must_use]
223 pub fn peek(&self) -> Option<&WorkItem<T>> {
224 self.items.last()
225 }
226
227 pub fn clear(&mut self) {
229 self.items.clear();
230 }
231
232 #[must_use]
234 pub fn stats(&self) -> QueueStats {
235 self.stats
236 }
237
238 pub fn iter(&self) -> impl Iterator<Item = &WorkItem<T>> {
240 self.items.iter()
241 }
242}
243
244#[cfg(test)]
247mod tests {
248 use super::*;
249
250 #[test]
251 fn new_queue_is_empty() {
252 let q: WorkQueue<u32> = WorkQueue::new(10);
253 assert!(q.is_empty());
254 assert_eq!(q.len(), 0);
255 }
256
257 #[test]
258 fn push_and_pop_single_item() {
259 let mut q: WorkQueue<u32> = WorkQueue::new(4);
260 q.push(WorkItem::new(7_u32, 1))
261 .expect("push should succeed");
262 let item = q.pop().expect("pop should return item");
263 assert_eq!(item.payload, 7);
264 assert!(q.is_empty());
265 }
266
267 #[test]
268 fn pop_respects_priority_order() {
269 let mut q: WorkQueue<u32> = WorkQueue::new(8);
270 q.push(WorkItem::new(1_u32, 5))
271 .expect("push should succeed");
272 q.push(WorkItem::new(2_u32, 1))
273 .expect("push should succeed");
274 q.push(WorkItem::new(3_u32, 9))
275 .expect("push should succeed");
276 assert_eq!(q.pop().expect("pop should return item").payload, 3); assert_eq!(q.pop().expect("pop should return item").payload, 1); assert_eq!(q.pop().expect("pop should return item").payload, 2); }
280
281 #[test]
282 fn pop_empty_returns_none() {
283 let mut q: WorkQueue<()> = WorkQueue::new(4);
284 assert!(q.pop().is_none());
285 }
286
287 #[test]
288 fn push_at_capacity_returns_error() {
289 let mut q: WorkQueue<u32> = WorkQueue::new(2);
290 q.push(WorkItem::new(1_u32, 1))
291 .expect("push should succeed");
292 q.push(WorkItem::new(2_u32, 2))
293 .expect("push should succeed");
294 let err = q.push(WorkItem::new(3_u32, 3));
295 assert_eq!(err, Err(QueueError::Full));
296 }
297
298 #[test]
299 fn is_full_and_capacity() {
300 let mut q: WorkQueue<u32> = WorkQueue::new(1);
301 assert!(!q.is_full());
302 q.push(WorkItem::new(0_u32, 1))
303 .expect("push should succeed");
304 assert!(q.is_full());
305 assert_eq!(q.capacity(), 1);
306 }
307
308 #[test]
309 fn peek_does_not_remove() {
310 let mut q: WorkQueue<u32> = WorkQueue::new(4);
311 q.push(WorkItem::new(42_u32, 10))
312 .expect("push should succeed");
313 assert_eq!(q.peek().expect("peek should return item").payload, 42);
314 assert_eq!(q.len(), 1); }
316
317 #[test]
318 fn clear_empties_queue() {
319 let mut q: WorkQueue<u32> = WorkQueue::new(8);
320 q.push(WorkItem::new(1_u32, 1))
321 .expect("push should succeed");
322 q.push(WorkItem::new(2_u32, 2))
323 .expect("push should succeed");
324 q.clear();
325 assert!(q.is_empty());
326 }
327
328 #[test]
329 fn pop_batch_returns_highest_first() {
330 let mut q: WorkQueue<u32> = WorkQueue::new(8);
331 for i in 0_u32..5 {
332 q.push(WorkItem::new(i, i)).expect("push should succeed");
333 }
334 let batch = q.pop_batch(3).expect("pop_batch should succeed");
335 assert_eq!(batch.len(), 3);
336 assert_eq!(batch[0].priority, 4); assert_eq!(batch[1].priority, 3);
338 assert_eq!(batch[2].priority, 2);
339 }
340
341 #[test]
342 fn pop_batch_too_large_returns_error() {
343 let mut q: WorkQueue<u32> = WorkQueue::new(4);
344 let err = q.pop_batch(5);
345 assert_eq!(err, Err(QueueError::BatchTooLarge));
346 }
347
348 #[test]
349 fn stats_track_push_and_pop() {
350 let mut q: WorkQueue<u32> = WorkQueue::new(8);
351 q.push(WorkItem::new(1_u32, 1))
352 .expect("push should succeed");
353 q.push(WorkItem::new(2_u32, 2))
354 .expect("push should succeed");
355 let _ = q.pop();
356 let s = q.stats();
357 assert_eq!(s.total_pushed, 2);
358 assert_eq!(s.total_popped, 1);
359 assert_eq!(s.in_flight(), 1);
360 }
361
362 #[test]
363 fn stats_count_rejected_pushes() {
364 let mut q: WorkQueue<u32> = WorkQueue::new(1);
365 q.push(WorkItem::new(1_u32, 1))
366 .expect("push should succeed");
367 let _ = q.push(WorkItem::new(2_u32, 2)); assert_eq!(q.stats().total_rejected, 1);
369 }
370
371 #[test]
372 fn queue_error_display() {
373 assert!(!QueueError::Full.to_string().is_empty());
374 assert!(!QueueError::BatchTooLarge.to_string().is_empty());
375 }
376
377 #[test]
378 fn iter_yields_all_items() {
379 let mut q: WorkQueue<u32> = WorkQueue::new(8);
380 for i in 0_u32..4 {
381 q.push(WorkItem::new(i, i)).expect("push should succeed");
382 }
383 assert_eq!(q.iter().count(), 4);
384 }
385
386 #[test]
387 fn work_item_is_clone() {
388 let a = WorkItem::new(String::from("hello"), 5_u32);
389 let b = a.clone();
390 assert_eq!(a.payload, b.payload);
391 assert_eq!(a.priority, b.priority);
392 }
393}