use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::time::Instant;
use crate::task::{Task, TaskPriority};
pub struct TaskQueue {
tasks: VecDeque<Task>,
enabled: bool,
priority: TaskPriority,
}
impl TaskQueue {
#[inline]
#[must_use]
pub fn new(priority: TaskPriority) -> Self {
Self {
tasks: VecDeque::new(),
enabled: true,
priority,
}
}
#[inline]
pub fn push(&mut self, task: Task) {
self.tasks.push_back(task);
}
#[inline]
pub fn pop(&mut self) -> Option<Task> {
if !self.enabled {
return None;
}
self.tasks.pop_front()
}
#[inline]
#[must_use]
pub fn front(&self) -> Option<&Task> {
if !self.enabled {
return None;
}
self.tasks.front()
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.tasks.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
}
#[inline]
#[must_use]
pub fn has_ready(&self) -> bool {
self.enabled && !self.tasks.is_empty()
}
#[inline]
#[must_use]
pub fn is_enabled(&self) -> bool {
self.enabled
}
#[inline]
pub fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
}
#[inline]
#[must_use]
pub fn priority(&self) -> TaskPriority {
self.priority
}
}
struct DelayedEntry {
task: Task,
deadline: Instant,
}
impl PartialEq for DelayedEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline
}
}
impl Eq for DelayedEntry {}
impl PartialOrd for DelayedEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DelayedEntry {
fn cmp(&self, other: &Self) -> Ordering {
other.deadline.cmp(&self.deadline)
}
}
const STARVATION_THRESHOLD: u32 = 64;
pub struct TaskQueueManager {
queues: [TaskQueue; TaskPriority::COUNT],
delayed: BinaryHeap<DelayedEntry>,
consecutive_high: u32,
}
impl TaskQueueManager {
#[must_use]
pub fn new() -> Self {
Self {
queues: [
TaskQueue::new(TaskPriority::Input),
TaskQueue::new(TaskPriority::UserBlocking),
TaskQueue::new(TaskPriority::Normal),
TaskQueue::new(TaskPriority::Timer),
TaskQueue::new(TaskPriority::BestEffort),
TaskQueue::new(TaskPriority::Idle),
],
delayed: BinaryHeap::new(),
consecutive_high: 0,
}
}
pub fn push(&mut self, task: Task) {
if let Some(deadline) = task.run_at() {
if task.is_ready() {
self.queues[task.priority().as_index()].push(task);
} else {
self.delayed.push(DelayedEntry { task, deadline });
}
} else {
self.queues[task.priority().as_index()].push(task);
}
}
pub fn pick(&mut self) -> Option<Task> {
if self.consecutive_high >= STARVATION_THRESHOLD {
self.consecutive_high = 0;
if let Some(task) = self.pick_lowest_nonempty() {
return Some(task);
}
}
for idx in 0..TaskPriority::COUNT {
if let Some(task) = self.queues[idx].pop() {
if idx <= TaskPriority::UserBlocking.as_index() {
self.consecutive_high += 1;
} else {
self.consecutive_high = 0;
}
return Some(task);
}
}
None
}
pub fn promote_delayed(&mut self) {
let now = Instant::now();
while let Some(entry) = self.delayed.peek() {
if entry.deadline > now {
break; }
let entry = self.delayed.pop().unwrap();
self.queues[entry.task.priority().as_index()].push(entry.task);
}
}
#[must_use]
pub fn next_delayed_ready_in(&self) -> Option<std::time::Duration> {
self.delayed
.peek()
.map(|entry| entry.deadline.saturating_duration_since(Instant::now()))
}
#[inline]
#[must_use]
pub fn queue(&self, priority: TaskPriority) -> &TaskQueue {
&self.queues[priority.as_index()]
}
#[inline]
pub fn queue_mut(&mut self, priority: TaskPriority) -> &mut TaskQueue {
&mut self.queues[priority.as_index()]
}
#[must_use]
pub fn ready_count(&self) -> usize {
self.queues
.iter()
.filter(|q| q.is_enabled())
.map(|q| q.len())
.sum()
}
#[inline]
#[must_use]
pub fn delayed_count(&self) -> usize {
self.delayed.len()
}
#[must_use]
pub fn has_ready(&self) -> bool {
self.queues.iter().any(|q| q.has_ready())
}
#[must_use]
pub fn is_empty(&self) -> bool {
!self.has_ready() && self.delayed.is_empty()
}
#[inline]
#[must_use]
pub fn has_delayed(&self) -> bool {
!self.delayed.is_empty()
}
fn pick_lowest_nonempty(&mut self) -> Option<Task> {
for idx in (0..TaskPriority::COUNT).rev() {
if let Some(task) = self.queues[idx].pop() {
return Some(task);
}
}
None
}
}
impl Default for TaskQueueManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::Cell;
use std::rc::Rc;
use std::time::Duration;
#[test]
fn queue_fifo_order() {
let log = Rc::new(std::cell::RefCell::new(Vec::new()));
let mut q = TaskQueue::new(TaskPriority::Normal);
for i in 0..3 {
let l = log.clone();
q.push(Task::new(TaskPriority::Normal, move || {
l.borrow_mut().push(i)
}));
}
assert_eq!(q.len(), 3);
while let Some(task) = q.pop() {
task.run();
}
assert_eq!(*log.borrow(), vec![0, 1, 2]);
}
#[test]
fn queue_disabled_returns_none() {
let mut q = TaskQueue::new(TaskPriority::Normal);
q.push(Task::new(TaskPriority::Normal, || {}));
q.set_enabled(false);
assert!(!q.is_enabled());
assert!(q.pop().is_none());
assert!(!q.has_ready());
assert_eq!(q.len(), 1);
q.set_enabled(true);
assert!(q.pop().is_some());
}
#[test]
fn queue_front_peek() {
let mut q = TaskQueue::new(TaskPriority::Input);
assert!(q.front().is_none());
q.push(Task::new(TaskPriority::Input, || {}));
assert!(q.front().is_some());
assert_eq!(q.len(), 1); }
#[test]
fn queue_front_none_when_disabled() {
let mut q = TaskQueue::new(TaskPriority::Normal);
q.push(Task::new(TaskPriority::Normal, || {}));
q.set_enabled(false);
assert!(q.front().is_none());
}
#[test]
fn set_empty_returns_none() {
let mut mgr = TaskQueueManager::new();
assert!(mgr.pick().is_none());
assert!(mgr.is_empty());
}
#[test]
fn set_higher_priority_first() {
let log = Rc::new(std::cell::RefCell::new(Vec::new()));
let mut mgr = TaskQueueManager::new();
let l = log.clone();
mgr.push(Task::new(TaskPriority::Idle, move || {
l.borrow_mut().push("idle")
}));
let l = log.clone();
mgr.push(Task::new(TaskPriority::Input, move || {
l.borrow_mut().push("input")
}));
let l = log.clone();
mgr.push(Task::new(TaskPriority::Normal, move || {
l.borrow_mut().push("normal")
}));
while let Some(task) = mgr.pick() {
task.run();
}
assert_eq!(*log.borrow(), vec!["input", "normal", "idle"]);
}
#[test]
fn set_fifo_within_priority() {
let log = Rc::new(std::cell::RefCell::new(Vec::new()));
let mut mgr = TaskQueueManager::new();
for i in 0..3 {
let l = log.clone();
mgr.push(Task::new(TaskPriority::Normal, move || {
l.borrow_mut().push(i)
}));
}
while let Some(task) = mgr.pick() {
task.run();
}
assert_eq!(*log.borrow(), vec![0, 1, 2]);
}
#[test]
fn set_disabled_queue_skipped() {
let mut mgr = TaskQueueManager::new();
mgr.push(Task::new(TaskPriority::Normal, || {}));
mgr.queue_mut(TaskPriority::Normal).set_enabled(false);
assert!(mgr.pick().is_none());
mgr.queue_mut(TaskPriority::Normal).set_enabled(true);
assert!(mgr.pick().is_some());
}
#[test]
fn set_anti_starvation() {
let log = Rc::new(std::cell::RefCell::new(Vec::<&str>::new()));
let mut mgr = TaskQueueManager::new();
for _ in 0..STARVATION_THRESHOLD + 1 {
let l = log.clone();
mgr.push(Task::new(TaskPriority::Input, move || {
l.borrow_mut().push("input")
}));
}
let l = log.clone();
mgr.push(Task::new(TaskPriority::Idle, move || {
l.borrow_mut().push("idle")
}));
for _ in 0..STARVATION_THRESHOLD + 1 {
mgr.pick().unwrap().run();
}
let entries = log.borrow();
assert_eq!(entries[STARVATION_THRESHOLD as usize], "idle");
}
#[test]
fn anti_starvation_resets_when_no_low_priority() {
let mut mgr = TaskQueueManager::new();
for _ in 0..STARVATION_THRESHOLD * 3 {
mgr.push(Task::new(TaskPriority::Input, || {}));
}
for _ in 0..STARVATION_THRESHOLD * 3 {
assert!(mgr.pick().is_some());
}
}
#[test]
fn delayed_task_not_immediately_ready() {
let mut mgr = TaskQueueManager::new();
mgr.push(Task::delayed(
TaskPriority::Timer,
Duration::from_secs(60),
|| {},
));
assert_eq!(mgr.ready_count(), 0);
assert_eq!(mgr.delayed_count(), 1);
assert!(mgr.pick().is_none());
assert!(mgr.has_delayed());
}
#[test]
fn delayed_task_with_zero_delay_goes_to_queue() {
let counter = Rc::new(Cell::new(0u32));
let mut mgr = TaskQueueManager::new();
let c = counter.clone();
mgr.push(Task::delayed(
TaskPriority::Timer,
Duration::ZERO,
move || c.set(1),
));
assert_eq!(mgr.ready_count(), 1);
mgr.pick().unwrap().run();
assert_eq!(counter.get(), 1);
}
#[test]
fn promote_delayed_uses_heap() {
let mut mgr = TaskQueueManager::new();
mgr.push(Task::delayed(
TaskPriority::Timer,
Duration::from_secs(60),
|| {},
));
mgr.push(Task::delayed(
TaskPriority::Timer,
Duration::from_secs(30),
|| {},
));
mgr.push(Task::delayed(
TaskPriority::Timer,
Duration::from_secs(90),
|| {},
));
assert_eq!(mgr.delayed_count(), 3);
mgr.promote_delayed();
assert_eq!(mgr.ready_count(), 0);
let next = mgr.next_delayed_ready_in().unwrap();
assert!(next <= Duration::from_secs(31));
assert!(next >= Duration::from_secs(28));
}
#[test]
fn next_delayed_ready_in_none_when_empty() {
let mgr = TaskQueueManager::new();
assert!(mgr.next_delayed_ready_in().is_none());
}
#[test]
fn set_queue_access() {
let mgr = TaskQueueManager::new();
assert_eq!(
mgr.queue(TaskPriority::Input).priority(),
TaskPriority::Input
);
assert_eq!(mgr.queue(TaskPriority::Idle).priority(), TaskPriority::Idle);
}
#[test]
fn ready_count_excludes_disabled() {
let mut mgr = TaskQueueManager::new();
mgr.push(Task::new(TaskPriority::Input, || {}));
mgr.push(Task::new(TaskPriority::Normal, || {}));
assert_eq!(mgr.ready_count(), 2);
mgr.queue_mut(TaskPriority::Input).set_enabled(false);
assert_eq!(mgr.ready_count(), 1); }
#[test]
fn ready_count_excludes_delayed() {
let mut mgr = TaskQueueManager::new();
mgr.push(Task::new(TaskPriority::Input, || {}));
mgr.push(Task::delayed(
TaskPriority::Timer,
Duration::from_secs(60),
|| {},
));
assert_eq!(mgr.ready_count(), 1);
assert!(mgr.has_ready());
}
}