Skip to main content

oximedia_core/
work_queue.rs

1//! A bounded, priority-aware work queue for media pipeline tasks.
2//!
3//! [`WorkQueue`] holds [`WorkItem`]s ordered by priority (higher value = more
4//! urgent).  [`QueueStats`] tracks lifetime throughput counters.
5//!
6//! # Examples
7//!
8//! ```
9//! use oximedia_core::work_queue::{WorkItem, WorkQueue};
10//!
11//! let mut q: WorkQueue<u32> = WorkQueue::new(8);
12//! q.push(WorkItem::new(42_u32, 10)).expect("queue not full");
13//! q.push(WorkItem::new(99_u32, 20)).expect("queue not full");
14//! // Highest-priority item comes out first.
15//! let item = q.pop().expect("queue not empty");
16//! assert_eq!(item.payload, 99_u32);
17//! ```
18
19#![allow(dead_code)]
20#![allow(clippy::module_name_repetitions)]
21
22/// A single work item carrying an arbitrary payload and a scheduling priority.
23///
24/// Higher `priority` values are dequeued first.
25///
26/// # Examples
27///
28/// ```
29/// use oximedia_core::work_queue::WorkItem;
30///
31/// let item = WorkItem::new("transcode frame 7", 5_u32);
32/// assert_eq!(item.priority, 5);
33/// ```
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct WorkItem<T> {
36    /// The payload to be processed.
37    pub payload: T,
38    /// Scheduling priority (higher = dequeued sooner).
39    pub priority: u32,
40}
41
42impl<T> WorkItem<T> {
43    /// Creates a new [`WorkItem`] with the given payload and priority.
44    #[must_use]
45    pub const fn new(payload: T, priority: u32) -> Self {
46        Self { payload, priority }
47    }
48}
49
50/// Error returned by queue operations.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum QueueError {
53    /// The queue has reached its capacity limit.
54    Full,
55    /// A batch request exceeded the queue's capacity.
56    BatchTooLarge,
57}
58
59impl std::fmt::Display for QueueError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Full => write!(f, "work queue is full"),
63            Self::BatchTooLarge => write!(f, "batch size exceeds queue capacity"),
64        }
65    }
66}
67
68impl std::error::Error for QueueError {}
69
70/// Lifetime throughput statistics for a [`WorkQueue`].
71///
72/// # Examples
73///
74/// ```
75/// use oximedia_core::work_queue::{WorkItem, WorkQueue};
76///
77/// let mut q: WorkQueue<()> = WorkQueue::new(16);
78/// q.push(WorkItem::new((), 1)).expect("queue not full");
79/// let _ = q.pop();
80/// let stats = q.stats();
81/// assert_eq!(stats.total_pushed, 1);
82/// assert_eq!(stats.total_popped, 1);
83/// ```
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub struct QueueStats {
86    /// Cumulative number of items successfully pushed.
87    pub total_pushed: u64,
88    /// Cumulative number of items removed via pop or `pop_batch`.
89    pub total_popped: u64,
90    /// Cumulative number of push attempts that were rejected due to capacity.
91    pub total_rejected: u64,
92}
93
94impl QueueStats {
95    /// Returns the number of items pushed but not yet popped.
96    ///
97    /// This is a lower-bound estimate; it saturates at 0 to avoid wrapping.
98    #[inline]
99    #[must_use]
100    pub fn in_flight(&self) -> u64 {
101        self.total_pushed.saturating_sub(self.total_popped)
102    }
103}
104
105/// A bounded, priority-ordered work queue.
106///
107/// Items are stored in a `Vec` sorted in ascending priority order so that
108/// the highest-priority item is always at the back and can be removed in O(1).
109/// Pushes maintain sorted order via binary search insertion at O(n) in the
110/// worst case, which is acceptable for the typical small queue sizes used in
111/// media pipeline scheduling.
112///
113/// # Examples
114///
115/// ```
116/// use oximedia_core::work_queue::{WorkItem, WorkQueue};
117///
118/// let mut q: WorkQueue<i32> = WorkQueue::new(4);
119/// q.push(WorkItem::new(1, 5)).expect("queue not full");
120/// q.push(WorkItem::new(2, 1)).expect("queue not full");
121/// q.push(WorkItem::new(3, 9)).expect("queue not full");
122/// assert_eq!(q.pop().expect("queue not empty").payload, 3); // priority 9 first
123/// assert_eq!(q.len(), 2);
124/// ```
125#[derive(Debug)]
126pub struct WorkQueue<T> {
127    /// Items stored in ascending priority order (highest priority at the back).
128    items: Vec<WorkItem<T>>,
129    capacity: usize,
130    stats: QueueStats,
131}
132
133impl<T> WorkQueue<T> {
134    /// Creates a new queue with the given maximum capacity.
135    ///
136    /// # Panics
137    ///
138    /// Panics if `capacity` is zero.
139    #[must_use]
140    pub fn new(capacity: usize) -> Self {
141        assert!(capacity > 0, "WorkQueue capacity must be > 0");
142        Self {
143            items: Vec::with_capacity(capacity.min(256)),
144            capacity,
145            stats: QueueStats::default(),
146        }
147    }
148
149    /// Returns the maximum number of items this queue can hold.
150    #[inline]
151    #[must_use]
152    pub const fn capacity(&self) -> usize {
153        self.capacity
154    }
155
156    /// Returns the current number of items in the queue.
157    #[inline]
158    #[must_use]
159    pub fn len(&self) -> usize {
160        self.items.len()
161    }
162
163    /// Returns `true` if the queue contains no items.
164    #[inline]
165    #[must_use]
166    pub fn is_empty(&self) -> bool {
167        self.items.is_empty()
168    }
169
170    /// Returns `true` if the queue has reached its capacity.
171    #[inline]
172    #[must_use]
173    pub fn is_full(&self) -> bool {
174        self.items.len() >= self.capacity
175    }
176
177    /// Pushes a work item into the queue in priority order.
178    ///
179    /// # Errors
180    ///
181    /// Returns [`QueueError::Full`] if the queue is at capacity.
182    pub fn push(&mut self, item: WorkItem<T>) -> Result<(), QueueError> {
183        if self.is_full() {
184            self.stats.total_rejected += 1;
185            return Err(QueueError::Full);
186        }
187        // Binary search for the insertion position (ascending by priority).
188        let pos = self
189            .items
190            .partition_point(|existing| existing.priority <= item.priority);
191        self.items.insert(pos, item);
192        self.stats.total_pushed += 1;
193        Ok(())
194    }
195
196    /// Removes and returns the highest-priority item, or `None` if empty.
197    pub fn pop(&mut self) -> Option<WorkItem<T>> {
198        let item = self.items.pop(); // highest-priority is at the back
199        if item.is_some() {
200            self.stats.total_popped += 1;
201        }
202        item
203    }
204
205    /// Removes and returns up to `n` highest-priority items.
206    ///
207    /// # Errors
208    ///
209    /// Returns [`QueueError::BatchTooLarge`] if `n > capacity`.
210    pub fn pop_batch(&mut self, n: usize) -> Result<Vec<WorkItem<T>>, QueueError> {
211        if n > self.capacity {
212            return Err(QueueError::BatchTooLarge);
213        }
214        let take = n.min(self.items.len());
215        let start = self.items.len().saturating_sub(take);
216        let batch: Vec<WorkItem<T>> = self.items.drain(start..).rev().collect();
217        self.stats.total_popped += batch.len() as u64;
218        Ok(batch)
219    }
220
221    /// Peeks at the highest-priority item without removing it.
222    #[must_use]
223    pub fn peek(&self) -> Option<&WorkItem<T>> {
224        self.items.last()
225    }
226
227    /// Removes all items from the queue (statistics are preserved).
228    pub fn clear(&mut self) {
229        self.items.clear();
230    }
231
232    /// Returns a snapshot of the lifetime statistics.
233    #[must_use]
234    pub fn stats(&self) -> QueueStats {
235        self.stats
236    }
237
238    /// Returns an iterator over all items in ascending priority order.
239    pub fn iter(&self) -> impl Iterator<Item = &WorkItem<T>> {
240        self.items.iter()
241    }
242}
243
244// ---------------------------------------------------------------------------
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn new_queue_is_empty() {
252        let q: WorkQueue<u32> = WorkQueue::new(10);
253        assert!(q.is_empty());
254        assert_eq!(q.len(), 0);
255    }
256
257    #[test]
258    fn push_and_pop_single_item() {
259        let mut q: WorkQueue<u32> = WorkQueue::new(4);
260        q.push(WorkItem::new(7_u32, 1))
261            .expect("push should succeed");
262        let item = q.pop().expect("pop should return item");
263        assert_eq!(item.payload, 7);
264        assert!(q.is_empty());
265    }
266
267    #[test]
268    fn pop_respects_priority_order() {
269        let mut q: WorkQueue<u32> = WorkQueue::new(8);
270        q.push(WorkItem::new(1_u32, 5))
271            .expect("push should succeed");
272        q.push(WorkItem::new(2_u32, 1))
273            .expect("push should succeed");
274        q.push(WorkItem::new(3_u32, 9))
275            .expect("push should succeed");
276        assert_eq!(q.pop().expect("pop should return item").payload, 3); // priority 9
277        assert_eq!(q.pop().expect("pop should return item").payload, 1); // priority 5
278        assert_eq!(q.pop().expect("pop should return item").payload, 2); // priority 1
279    }
280
281    #[test]
282    fn pop_empty_returns_none() {
283        let mut q: WorkQueue<()> = WorkQueue::new(4);
284        assert!(q.pop().is_none());
285    }
286
287    #[test]
288    fn push_at_capacity_returns_error() {
289        let mut q: WorkQueue<u32> = WorkQueue::new(2);
290        q.push(WorkItem::new(1_u32, 1))
291            .expect("push should succeed");
292        q.push(WorkItem::new(2_u32, 2))
293            .expect("push should succeed");
294        let err = q.push(WorkItem::new(3_u32, 3));
295        assert_eq!(err, Err(QueueError::Full));
296    }
297
298    #[test]
299    fn is_full_and_capacity() {
300        let mut q: WorkQueue<u32> = WorkQueue::new(1);
301        assert!(!q.is_full());
302        q.push(WorkItem::new(0_u32, 1))
303            .expect("push should succeed");
304        assert!(q.is_full());
305        assert_eq!(q.capacity(), 1);
306    }
307
308    #[test]
309    fn peek_does_not_remove() {
310        let mut q: WorkQueue<u32> = WorkQueue::new(4);
311        q.push(WorkItem::new(42_u32, 10))
312            .expect("push should succeed");
313        assert_eq!(q.peek().expect("peek should return item").payload, 42);
314        assert_eq!(q.len(), 1); // still there
315    }
316
317    #[test]
318    fn clear_empties_queue() {
319        let mut q: WorkQueue<u32> = WorkQueue::new(8);
320        q.push(WorkItem::new(1_u32, 1))
321            .expect("push should succeed");
322        q.push(WorkItem::new(2_u32, 2))
323            .expect("push should succeed");
324        q.clear();
325        assert!(q.is_empty());
326    }
327
328    #[test]
329    fn pop_batch_returns_highest_first() {
330        let mut q: WorkQueue<u32> = WorkQueue::new(8);
331        for i in 0_u32..5 {
332            q.push(WorkItem::new(i, i)).expect("push should succeed");
333        }
334        let batch = q.pop_batch(3).expect("pop_batch should succeed");
335        assert_eq!(batch.len(), 3);
336        assert_eq!(batch[0].priority, 4); // highest
337        assert_eq!(batch[1].priority, 3);
338        assert_eq!(batch[2].priority, 2);
339    }
340
341    #[test]
342    fn pop_batch_too_large_returns_error() {
343        let mut q: WorkQueue<u32> = WorkQueue::new(4);
344        let err = q.pop_batch(5);
345        assert_eq!(err, Err(QueueError::BatchTooLarge));
346    }
347
348    #[test]
349    fn stats_track_push_and_pop() {
350        let mut q: WorkQueue<u32> = WorkQueue::new(8);
351        q.push(WorkItem::new(1_u32, 1))
352            .expect("push should succeed");
353        q.push(WorkItem::new(2_u32, 2))
354            .expect("push should succeed");
355        let _ = q.pop();
356        let s = q.stats();
357        assert_eq!(s.total_pushed, 2);
358        assert_eq!(s.total_popped, 1);
359        assert_eq!(s.in_flight(), 1);
360    }
361
362    #[test]
363    fn stats_count_rejected_pushes() {
364        let mut q: WorkQueue<u32> = WorkQueue::new(1);
365        q.push(WorkItem::new(1_u32, 1))
366            .expect("push should succeed");
367        let _ = q.push(WorkItem::new(2_u32, 2)); // rejected
368        assert_eq!(q.stats().total_rejected, 1);
369    }
370
371    #[test]
372    fn queue_error_display() {
373        assert!(!QueueError::Full.to_string().is_empty());
374        assert!(!QueueError::BatchTooLarge.to_string().is_empty());
375    }
376
377    #[test]
378    fn iter_yields_all_items() {
379        let mut q: WorkQueue<u32> = WorkQueue::new(8);
380        for i in 0_u32..4 {
381            q.push(WorkItem::new(i, i)).expect("push should succeed");
382        }
383        assert_eq!(q.iter().count(), 4);
384    }
385
386    #[test]
387    fn work_item_is_clone() {
388        let a = WorkItem::new(String::from("hello"), 5_u32);
389        let b = a.clone();
390        assert_eq!(a.payload, b.payload);
391        assert_eq!(a.priority, b.priority);
392    }
393}