use std::{
collections::VecDeque,
sync::atomic::{AtomicUsize, Ordering},
};
use crate::arch::sync::Mutex;
use super::task::Task;
pub const DEFAULT_CAPACITY: usize = 1024;
pub const MAX_CAPACITY: usize = 4096;
pub struct WorkQueue {
queue: Mutex<VecDeque<Task>>,
capacity: usize,
dropped: AtomicUsize,
}
impl WorkQueue {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let capacity = capacity.min(MAX_CAPACITY);
Self {
queue: Mutex::new(VecDeque::with_capacity(capacity)),
capacity,
dropped: AtomicUsize::new(0),
}
}
pub fn push(&self, task: Task) -> bool {
let mut queue = self.queue.lock();
if queue.len() >= self.capacity {
self.dropped.fetch_add(1, Ordering::Relaxed);
return false;
}
queue.push_back(task);
true
}
#[must_use]
pub fn try_pop(&self) -> Option<Task> {
self.queue.lock().pop_front()
}
#[must_use]
pub fn drain(&self) -> Vec<Task> {
let mut queue = self.queue.lock();
queue.drain(..).collect()
}
pub fn process_pending(&self, limit: usize) -> usize {
let mut processed = 0;
while processed < limit {
let Some(mut task) = self.try_pop() else {
break;
};
let _ = task.execute();
processed += 1;
}
processed
}
#[must_use]
pub fn len(&self) -> usize {
self.queue.lock().len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.queue.lock().is_empty()
}
#[inline]
#[must_use]
pub const fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn dropped_count(&self) -> usize {
self.dropped.load(Ordering::Relaxed)
}
pub fn clear(&self) -> usize {
let mut queue = self.queue.lock();
let count = queue.len();
queue.clear();
count
}
}
impl Default for WorkQueue {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for WorkQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkQueue")
.field("len", &self.len())
.field("capacity", &self.capacity)
.field("dropped", &self.dropped_count())
.finish_non_exhaustive()
}
}