use noxu_sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
pub struct OutputQueue {
queue: Mutex<Vec<Vec<u8>>>,
shutdown: AtomicBool,
max_queue_size: usize,
}
impl OutputQueue {
pub fn new(max_queue_size: usize) -> Self {
OutputQueue {
queue: Mutex::new(Vec::new()),
shutdown: AtomicBool::new(false),
max_queue_size,
}
}
pub fn enqueue(&self, message: Vec<u8>) -> bool {
if self.shutdown.load(Ordering::Acquire) {
return false;
}
let mut queue = self.queue.lock();
if queue.len() >= self.max_queue_size {
return false;
}
queue.push(message);
true
}
pub fn dequeue_batch(&self, max: usize) -> Vec<Vec<u8>> {
let mut queue = self.queue.lock();
let count = max.min(queue.len());
queue.drain(..count).collect()
}
pub fn len(&self) -> usize {
self.queue.lock().len()
}
pub fn is_empty(&self) -> bool {
self.queue.lock().is_empty()
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
pub fn clear(&self) {
self.queue.lock().clear();
}
}
impl std::fmt::Debug for OutputQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OutputQueue")
.field("len", &self.len())
.field("max_queue_size", &self.max_queue_size)
.field("shutdown", &self.is_shutdown())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_queue() {
let q = OutputQueue::new(10);
assert!(q.is_empty());
assert_eq!(q.len(), 0);
assert!(!q.is_shutdown());
}
#[test]
fn test_enqueue_dequeue() {
let q = OutputQueue::new(10);
assert!(q.enqueue(vec![1, 2, 3]));
assert!(q.enqueue(vec![4, 5]));
assert_eq!(q.len(), 2);
assert!(!q.is_empty());
let batch = q.dequeue_batch(10);
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], vec![1, 2, 3]);
assert_eq!(batch[1], vec![4, 5]);
assert!(q.is_empty());
}
#[test]
fn test_batch_dequeue_partial() {
let q = OutputQueue::new(100);
for i in 0..10 {
q.enqueue(vec![i]);
}
assert_eq!(q.len(), 10);
let batch = q.dequeue_batch(3);
assert_eq!(batch.len(), 3);
assert_eq!(batch[0], vec![0]);
assert_eq!(batch[1], vec![1]);
assert_eq!(batch[2], vec![2]);
assert_eq!(q.len(), 7);
let batch2 = q.dequeue_batch(5);
assert_eq!(batch2.len(), 5);
assert_eq!(q.len(), 2);
}
#[test]
fn test_capacity_limit() {
let q = OutputQueue::new(3);
assert!(q.enqueue(vec![1]));
assert!(q.enqueue(vec![2]));
assert!(q.enqueue(vec![3]));
assert!(!q.enqueue(vec![4]));
assert_eq!(q.len(), 3);
q.dequeue_batch(1);
assert!(q.enqueue(vec![4]));
assert_eq!(q.len(), 3);
}
#[test]
fn test_shutdown_rejects_enqueue() {
let q = OutputQueue::new(10);
assert!(q.enqueue(vec![1]));
q.shutdown();
assert!(q.is_shutdown());
assert!(!q.enqueue(vec![2]));
let batch = q.dequeue_batch(10);
assert_eq!(batch.len(), 1);
}
#[test]
fn test_clear() {
let q = OutputQueue::new(10);
q.enqueue(vec![1]);
q.enqueue(vec![2]);
q.enqueue(vec![3]);
assert_eq!(q.len(), 3);
q.clear();
assert!(q.is_empty());
assert_eq!(q.len(), 0);
}
#[test]
fn test_dequeue_empty() {
let q = OutputQueue::new(10);
let batch = q.dequeue_batch(5);
assert!(batch.is_empty());
}
#[test]
fn test_dequeue_batch_zero() {
let q = OutputQueue::new(10);
q.enqueue(vec![1]);
let batch = q.dequeue_batch(0);
assert!(batch.is_empty());
assert_eq!(q.len(), 1);
}
#[test]
fn test_shutdown_then_clear() {
let q = OutputQueue::new(10);
q.enqueue(vec![1]);
q.enqueue(vec![2]);
q.shutdown();
q.clear();
assert!(q.is_empty());
assert!(q.is_shutdown());
}
#[test]
fn test_concurrent_enqueue() {
use std::sync::Arc;
use std::thread;
let q = Arc::new(OutputQueue::new(1000));
let mut handles = vec![];
for t in 0..4 {
let queue = Arc::clone(&q);
handles.push(thread::spawn(move || {
let mut count = 0;
for i in 0..100 {
if queue.enqueue(vec![t, i]) {
count += 1;
}
}
count
}));
}
let total: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
assert_eq!(q.len(), total);
assert!(total <= 400);
}
#[test]
fn test_debug_format() {
let q = OutputQueue::new(42);
q.enqueue(vec![1]);
let debug = format!("{:?}", q);
assert!(debug.contains("OutputQueue"));
assert!(debug.contains("max_queue_size: 42"));
}
}