#![allow(dead_code)]
#![allow(clippy::module_name_repetitions)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WorkItem<T> {
pub payload: T,
pub priority: u32,
}
impl<T> WorkItem<T> {
#[must_use]
pub const fn new(payload: T, priority: u32) -> Self {
Self { payload, priority }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueueError {
Full,
BatchTooLarge,
}
impl std::fmt::Display for QueueError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Full => write!(f, "work queue is full"),
Self::BatchTooLarge => write!(f, "batch size exceeds queue capacity"),
}
}
}
impl std::error::Error for QueueError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct QueueStats {
pub total_pushed: u64,
pub total_popped: u64,
pub total_rejected: u64,
}
impl QueueStats {
#[inline]
#[must_use]
pub fn in_flight(&self) -> u64 {
self.total_pushed.saturating_sub(self.total_popped)
}
}
#[derive(Debug)]
pub struct WorkQueue<T> {
items: Vec<WorkItem<T>>,
capacity: usize,
stats: QueueStats,
}
impl<T> WorkQueue<T> {
#[must_use]
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "WorkQueue capacity must be > 0");
Self {
items: Vec::with_capacity(capacity.min(256)),
capacity,
stats: QueueStats::default(),
}
}
#[inline]
#[must_use]
pub const fn capacity(&self) -> usize {
self.capacity
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.items.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
#[inline]
#[must_use]
pub fn is_full(&self) -> bool {
self.items.len() >= self.capacity
}
pub fn push(&mut self, item: WorkItem<T>) -> Result<(), QueueError> {
if self.is_full() {
self.stats.total_rejected += 1;
return Err(QueueError::Full);
}
let pos = self
.items
.partition_point(|existing| existing.priority <= item.priority);
self.items.insert(pos, item);
self.stats.total_pushed += 1;
Ok(())
}
pub fn pop(&mut self) -> Option<WorkItem<T>> {
let item = self.items.pop(); if item.is_some() {
self.stats.total_popped += 1;
}
item
}
pub fn pop_batch(&mut self, n: usize) -> Result<Vec<WorkItem<T>>, QueueError> {
if n > self.capacity {
return Err(QueueError::BatchTooLarge);
}
let take = n.min(self.items.len());
let start = self.items.len().saturating_sub(take);
let batch: Vec<WorkItem<T>> = self.items.drain(start..).rev().collect();
self.stats.total_popped += batch.len() as u64;
Ok(batch)
}
#[must_use]
pub fn peek(&self) -> Option<&WorkItem<T>> {
self.items.last()
}
pub fn clear(&mut self) {
self.items.clear();
}
#[must_use]
pub fn stats(&self) -> QueueStats {
self.stats
}
pub fn iter(&self) -> impl Iterator<Item = &WorkItem<T>> {
self.items.iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_queue_is_empty() {
let q: WorkQueue<u32> = WorkQueue::new(10);
assert!(q.is_empty());
assert_eq!(q.len(), 0);
}
#[test]
fn push_and_pop_single_item() {
let mut q: WorkQueue<u32> = WorkQueue::new(4);
q.push(WorkItem::new(7_u32, 1))
.expect("push should succeed");
let item = q.pop().expect("pop should return item");
assert_eq!(item.payload, 7);
assert!(q.is_empty());
}
#[test]
fn pop_respects_priority_order() {
let mut q: WorkQueue<u32> = WorkQueue::new(8);
q.push(WorkItem::new(1_u32, 5))
.expect("push should succeed");
q.push(WorkItem::new(2_u32, 1))
.expect("push should succeed");
q.push(WorkItem::new(3_u32, 9))
.expect("push should succeed");
assert_eq!(q.pop().expect("pop should return item").payload, 3); assert_eq!(q.pop().expect("pop should return item").payload, 1); assert_eq!(q.pop().expect("pop should return item").payload, 2); }
#[test]
fn pop_empty_returns_none() {
let mut q: WorkQueue<()> = WorkQueue::new(4);
assert!(q.pop().is_none());
}
#[test]
fn push_at_capacity_returns_error() {
let mut q: WorkQueue<u32> = WorkQueue::new(2);
q.push(WorkItem::new(1_u32, 1))
.expect("push should succeed");
q.push(WorkItem::new(2_u32, 2))
.expect("push should succeed");
let err = q.push(WorkItem::new(3_u32, 3));
assert_eq!(err, Err(QueueError::Full));
}
#[test]
fn is_full_and_capacity() {
let mut q: WorkQueue<u32> = WorkQueue::new(1);
assert!(!q.is_full());
q.push(WorkItem::new(0_u32, 1))
.expect("push should succeed");
assert!(q.is_full());
assert_eq!(q.capacity(), 1);
}
#[test]
fn peek_does_not_remove() {
let mut q: WorkQueue<u32> = WorkQueue::new(4);
q.push(WorkItem::new(42_u32, 10))
.expect("push should succeed");
assert_eq!(q.peek().expect("peek should return item").payload, 42);
assert_eq!(q.len(), 1); }
#[test]
fn clear_empties_queue() {
let mut q: WorkQueue<u32> = WorkQueue::new(8);
q.push(WorkItem::new(1_u32, 1))
.expect("push should succeed");
q.push(WorkItem::new(2_u32, 2))
.expect("push should succeed");
q.clear();
assert!(q.is_empty());
}
#[test]
fn pop_batch_returns_highest_first() {
let mut q: WorkQueue<u32> = WorkQueue::new(8);
for i in 0_u32..5 {
q.push(WorkItem::new(i, i)).expect("push should succeed");
}
let batch = q.pop_batch(3).expect("pop_batch should succeed");
assert_eq!(batch.len(), 3);
assert_eq!(batch[0].priority, 4); assert_eq!(batch[1].priority, 3);
assert_eq!(batch[2].priority, 2);
}
#[test]
fn pop_batch_too_large_returns_error() {
let mut q: WorkQueue<u32> = WorkQueue::new(4);
let err = q.pop_batch(5);
assert_eq!(err, Err(QueueError::BatchTooLarge));
}
#[test]
fn stats_track_push_and_pop() {
let mut q: WorkQueue<u32> = WorkQueue::new(8);
q.push(WorkItem::new(1_u32, 1))
.expect("push should succeed");
q.push(WorkItem::new(2_u32, 2))
.expect("push should succeed");
let _ = q.pop();
let s = q.stats();
assert_eq!(s.total_pushed, 2);
assert_eq!(s.total_popped, 1);
assert_eq!(s.in_flight(), 1);
}
#[test]
fn stats_count_rejected_pushes() {
let mut q: WorkQueue<u32> = WorkQueue::new(1);
q.push(WorkItem::new(1_u32, 1))
.expect("push should succeed");
let _ = q.push(WorkItem::new(2_u32, 2)); assert_eq!(q.stats().total_rejected, 1);
}
#[test]
fn queue_error_display() {
assert!(!QueueError::Full.to_string().is_empty());
assert!(!QueueError::BatchTooLarge.to_string().is_empty());
}
#[test]
fn iter_yields_all_items() {
let mut q: WorkQueue<u32> = WorkQueue::new(8);
for i in 0_u32..4 {
q.push(WorkItem::new(i, i)).expect("push should succeed");
}
assert_eq!(q.iter().count(), 4);
}
#[test]
fn work_item_is_clone() {
let a = WorkItem::new(String::from("hello"), 5_u32);
let b = a.clone();
assert_eq!(a.payload, b.payload);
assert_eq!(a.priority, b.priority);
}
}