use crate::types::{TaskId, Time};
use crate::util::CachePadded;
use crossbeam_queue::SegQueue;
use parking_lot::Mutex;
use std::cmp::Ordering as CmpOrdering;
use std::collections::BinaryHeap;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[derive(Debug, Clone, Copy)]
pub struct PriorityTask {
pub task: TaskId,
pub priority: u8,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct TimedTask {
pub task: TaskId,
pub deadline: Time,
generation: u64,
}
impl TimedTask {
fn new(task: TaskId, deadline: Time, generation: u64) -> Self {
Self {
task,
deadline,
generation,
}
}
}
impl Ord for TimedTask {
#[inline]
fn cmp(&self, other: &Self) -> CmpOrdering {
other
.deadline
.cmp(&self.deadline)
.then_with(|| {
let diff = other.generation.wrapping_sub(self.generation).cast_signed();
diff.cmp(&0)
})
.then_with(|| other.task.cmp(&self.task))
}
}
impl PartialOrd for TimedTask {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}
#[derive(Debug)]
pub struct GlobalInjector {
cancel_queue: SegQueue<PriorityTask>,
timed_queue: Mutex<TimedQueue>,
ready_queue: SegQueue<PriorityTask>,
ready_count: CachePadded<AtomicUsize>,
timed_count: CachePadded<AtomicUsize>,
cached_earliest_deadline: CachePadded<AtomicU64>,
}
#[derive(Debug, Default)]
struct TimedQueue {
heap: BinaryHeap<TimedTask>,
next_generation: u64,
}
impl Default for GlobalInjector {
fn default() -> Self {
Self {
cancel_queue: SegQueue::new(),
timed_queue: Mutex::new(TimedQueue::default()),
ready_queue: SegQueue::new(),
ready_count: CachePadded::new(AtomicUsize::new(0)),
timed_count: CachePadded::new(AtomicUsize::new(0)),
cached_earliest_deadline: CachePadded::new(AtomicU64::new(u64::MAX)),
}
}
}
impl GlobalInjector {
#[inline]
fn saturating_decrement(counter: &AtomicUsize) {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
count.checked_sub(1)
});
}
#[inline]
fn decrement_ready_count(&self) {
Self::saturating_decrement(&self.ready_count);
}
#[inline]
fn decrement_timed_count(&self) {
Self::saturating_decrement(&self.timed_count);
}
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn inject_cancel(&self, task: TaskId, priority: u8) {
self.cancel_queue.push(PriorityTask { task, priority });
}
#[inline]
pub fn inject_timed(&self, task: TaskId, deadline: Time) {
self.timed_count.fetch_add(1, Ordering::Relaxed);
let mut queue = self.timed_queue.lock();
let generation = queue.next_generation;
queue.next_generation += 1;
queue.heap.push(TimedTask::new(task, deadline, generation));
let earliest = queue
.heap
.peek()
.map_or(u64::MAX, |t| t.deadline.as_nanos());
self.cached_earliest_deadline
.store(earliest, Ordering::Relaxed);
drop(queue);
}
#[inline]
pub fn inject_ready(&self, task: TaskId, priority: u8) {
self.ready_count.fetch_add(1, Ordering::Relaxed);
self.ready_queue.push(PriorityTask { task, priority });
}
#[inline]
pub(crate) fn inject_ready_uncounted(&self, task: TaskId, priority: u8) {
self.ready_queue.push(PriorityTask { task, priority });
}
#[inline]
pub(crate) fn add_ready_count(&self, count: usize) {
if count > 0 {
self.ready_count.fetch_add(count, Ordering::Relaxed);
}
}
#[inline]
#[must_use]
pub fn pop_cancel(&self) -> Option<PriorityTask> {
self.cancel_queue.pop()
}
#[inline]
#[must_use]
pub fn pop_timed(&self) -> Option<TimedTask> {
if self.timed_count.load(Ordering::Relaxed) == 0 {
return None;
}
let mut queue = self.timed_queue.lock();
let result = queue.heap.pop();
let earliest = queue
.heap
.peek()
.map_or(u64::MAX, |t| t.deadline.as_nanos());
self.cached_earliest_deadline
.store(earliest, Ordering::Relaxed);
drop(queue);
if result.is_some() {
self.decrement_timed_count();
}
result
}
#[inline]
#[must_use]
pub fn peek_earliest_deadline(&self) -> Option<Time> {
if self.timed_count.load(Ordering::Relaxed) == 0 {
return None;
}
let cached = self.cached_earliest_deadline.load(Ordering::Relaxed);
if cached == u64::MAX {
None
} else {
Some(Time::from_nanos(cached))
}
}
#[inline]
#[must_use]
pub fn pop_timed_if_due(&self, now: Time) -> Option<TimedTask> {
if self.timed_count.load(Ordering::Relaxed) == 0 {
return None;
}
let cached = self.cached_earliest_deadline.load(Ordering::Relaxed);
if cached != u64::MAX && Time::from_nanos(cached) > now {
return None;
}
let mut queue = self.timed_queue.lock();
if let Some(entry) = queue.heap.peek() {
if entry.deadline <= now {
let result = queue.heap.pop();
let earliest = queue
.heap
.peek()
.map_or(u64::MAX, |t| t.deadline.as_nanos());
self.cached_earliest_deadline
.store(earliest, Ordering::Relaxed);
drop(queue);
if result.is_some() {
self.decrement_timed_count();
}
return result;
}
}
None
}
#[inline]
#[must_use]
pub fn pop_ready(&self) -> Option<PriorityTask> {
let result = self.ready_queue.pop();
if result.is_some() {
self.decrement_ready_count();
}
result
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.cancel_queue.is_empty()
&& self.timed_count.load(Ordering::Relaxed) == 0
&& self.ready_count.load(Ordering::Relaxed) == 0
}
#[inline]
#[must_use]
pub fn has_runnable_work(&self, now: Time) -> bool {
if !self.cancel_queue.is_empty() || self.ready_count.load(Ordering::Relaxed) > 0 {
return true;
}
if self.timed_count.load(Ordering::Relaxed) == 0 {
return false;
}
let earliest = self.cached_earliest_deadline.load(Ordering::Relaxed);
earliest != u64::MAX && Time::from_nanos(earliest) <= now
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.cancel_queue.len()
+ self.timed_count.load(Ordering::Relaxed)
+ self.ready_count.load(Ordering::Relaxed)
}
#[inline]
#[must_use]
pub fn has_cancel_work(&self) -> bool {
!self.cancel_queue.is_empty()
}
#[inline]
#[must_use]
pub fn has_timed_work(&self) -> bool {
self.timed_count.load(Ordering::Relaxed) > 0
}
#[inline]
#[must_use]
pub fn has_ready_work(&self) -> bool {
self.ready_count.load(Ordering::Relaxed) > 0
}
#[inline]
#[must_use]
pub fn ready_count(&self) -> usize {
self.ready_count.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use std::thread;
fn task(id: u32) -> TaskId {
TaskId::new_for_test(1, id)
}
#[test]
fn inject_and_pop_cancel() {
let injector = GlobalInjector::new();
injector.inject_cancel(task(1), 100);
injector.inject_cancel(task(2), 50);
assert!(!injector.is_empty());
assert!(injector.has_cancel_work());
let first = injector.pop_cancel().unwrap();
assert_eq!(first.task, task(1));
let second = injector.pop_cancel().unwrap();
assert_eq!(second.task, task(2));
assert!(injector.pop_cancel().is_none());
}
#[test]
fn inject_and_pop_timed() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert!(injector.has_timed_work());
let first = injector.pop_timed().unwrap();
assert_eq!(first.task, task(2)); assert_eq!(first.deadline, Time::from_secs(50));
let second = injector.pop_timed().unwrap();
assert_eq!(second.task, task(1)); assert_eq!(second.deadline, Time::from_secs(100));
}
#[test]
fn edf_ordering_multiple_tasks() {
let injector = GlobalInjector::new();
injector.inject_timed(task(3), Time::from_secs(75));
injector.inject_timed(task(1), Time::from_secs(25));
injector.inject_timed(task(4), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert_eq!(injector.pop_timed().unwrap().task, task(1));
assert_eq!(injector.pop_timed().unwrap().task, task(2));
assert_eq!(injector.pop_timed().unwrap().task, task(3));
assert_eq!(injector.pop_timed().unwrap().task, task(4));
}
#[test]
fn equal_deadlines_fifo_order() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(50));
injector.inject_timed(task(2), Time::from_secs(50));
injector.inject_timed(task(3), Time::from_secs(50));
assert_eq!(injector.pop_timed().unwrap().task, task(1));
assert_eq!(injector.pop_timed().unwrap().task, task(2));
assert_eq!(injector.pop_timed().unwrap().task, task(3));
}
#[test]
fn pop_timed_if_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert!(injector.pop_timed_if_due(Time::from_secs(25)).is_none());
assert!(injector.has_timed_work());
let due = injector.pop_timed_if_due(Time::from_secs(50)).unwrap();
assert_eq!(due.task, task(2));
assert!(injector.pop_timed_if_due(Time::from_secs(75)).is_none());
let due = injector.pop_timed_if_due(Time::from_secs(100)).unwrap();
assert_eq!(due.task, task(1));
}
#[test]
fn peek_earliest_deadline() {
let injector = GlobalInjector::new();
assert!(injector.peek_earliest_deadline().is_none());
injector.inject_timed(task(1), Time::from_secs(100));
assert_eq!(
injector.peek_earliest_deadline(),
Some(Time::from_secs(100))
);
injector.inject_timed(task(2), Time::from_secs(50));
assert_eq!(injector.peek_earliest_deadline(), Some(Time::from_secs(50)));
assert_eq!(injector.peek_earliest_deadline(), Some(Time::from_secs(50)));
}
#[test]
fn inject_and_pop_ready() {
let injector = GlobalInjector::new();
injector.inject_ready(task(1), 100);
assert!(injector.has_ready_work());
let popped = injector.pop_ready().unwrap();
assert_eq!(popped.task, task(1));
assert_eq!(popped.priority, 100);
}
#[test]
fn pending_count_accuracy() {
let injector = GlobalInjector::new();
assert_eq!(injector.len(), 0);
injector.inject_cancel(task(1), 100);
injector.inject_timed(task(2), Time::from_secs(10));
injector.inject_ready(task(3), 50);
assert_eq!(injector.len(), 3);
let _ = injector.pop_cancel();
assert_eq!(injector.len(), 2);
let _ = injector.pop_timed();
let _ = injector.pop_ready();
assert_eq!(injector.len(), 0);
}
#[test]
fn pop_does_not_underflow_when_counter_lags() {
let injector = GlobalInjector::new();
injector.cancel_queue.push(PriorityTask {
task: task(10),
priority: 1,
});
let popped_cancel = injector.pop_cancel().expect("cancel task should pop");
assert_eq!(popped_cancel.task, task(10));
injector.ready_queue.push(PriorityTask {
task: task(11),
priority: 2,
});
assert_eq!(injector.ready_count.load(Ordering::Relaxed), 0);
let popped_ready = injector.pop_ready().expect("ready task should pop");
assert_eq!(popped_ready.task, task(11));
assert_eq!(injector.ready_count.load(Ordering::Relaxed), 0);
}
#[test]
fn readiness_checks_use_ready_counter_to_avoid_false_empty_window() {
let injector = GlobalInjector::new();
injector.ready_count.fetch_add(1, Ordering::Relaxed);
assert!(
!injector.is_empty(),
"counter-visible ready work must not report empty"
);
assert!(
injector.has_ready_work(),
"counter-visible ready work must report ready lane activity"
);
assert!(
injector.has_runnable_work(Time::ZERO),
"counter-visible ready work must report runnable work"
);
injector.ready_queue.push(PriorityTask {
task: task(14),
priority: 9,
});
let popped_ready = injector.pop_ready().expect("ready task should pop");
assert_eq!(popped_ready.task, task(14));
assert_eq!(injector.ready_count.load(Ordering::Relaxed), 0);
assert!(injector.is_empty(), "injector returns empty after pop");
}
#[test]
fn timed_pop_does_not_underflow_when_counter_lags() {
let injector = GlobalInjector::new();
{
let mut timed = injector.timed_queue.lock();
timed
.heap
.push(TimedTask::new(task(12), Time::from_secs(10), 0));
}
injector.timed_count.fetch_add(1, Ordering::Relaxed);
let popped_timed = injector.pop_timed().expect("timed task should pop");
assert_eq!(popped_timed.task, task(12));
assert_eq!(injector.timed_count.load(Ordering::Relaxed), 0);
}
#[test]
fn timed_pop_if_due_does_not_underflow_when_counter_lags() {
let injector = GlobalInjector::new();
{
let mut timed = injector.timed_queue.lock();
timed
.heap
.push(TimedTask::new(task(13), Time::from_secs(10), 0));
}
injector.timed_count.fetch_add(1, Ordering::Relaxed);
assert!(injector.pop_timed_if_due(Time::from_secs(9)).is_none());
assert_eq!(injector.timed_count.load(Ordering::Relaxed), 1);
let popped_timed = injector
.pop_timed_if_due(Time::from_secs(10))
.expect("timed task should pop when due");
assert_eq!(popped_timed.task, task(13));
assert_eq!(injector.timed_count.load(Ordering::Relaxed), 0);
}
#[test]
fn has_runnable_work_empty_returns_false() {
let injector = GlobalInjector::new();
assert!(
!injector.has_runnable_work(Time::ZERO),
"empty injector has no runnable work"
);
}
#[test]
fn has_runnable_work_cancel_always_runnable() {
let injector = GlobalInjector::new();
injector.inject_cancel(task(1), 100);
assert!(
injector.has_runnable_work(Time::ZERO),
"cancel work is always runnable regardless of time"
);
}
#[test]
fn has_runnable_work_ready_always_runnable() {
let injector = GlobalInjector::new();
injector.inject_ready(task(1), 50);
assert!(
injector.has_runnable_work(Time::ZERO),
"ready work is always runnable regardless of time"
);
}
#[test]
fn has_runnable_work_timed_not_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
assert!(
!injector.has_runnable_work(Time::from_secs(50)),
"timed work with future deadline is not runnable"
);
}
#[test]
fn has_runnable_work_timed_exactly_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
assert!(
injector.has_runnable_work(Time::from_secs(100)),
"timed work at exactly its deadline is runnable"
);
}
#[test]
fn has_runnable_work_timed_past_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
assert!(
injector.has_runnable_work(Time::from_secs(200)),
"timed work past its deadline is runnable"
);
}
#[test]
fn has_runnable_work_only_timed_with_mixed_deadlines() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert!(
!injector.has_runnable_work(Time::from_secs(25)),
"no timed work due at t=25"
);
assert!(
injector.has_runnable_work(Time::from_secs(50)),
"earliest timed work (t=50) is due"
);
}
#[test]
fn peek_earliest_deadline_updates_after_pop() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(50));
injector.inject_timed(task(2), Time::from_secs(100));
assert_eq!(injector.peek_earliest_deadline(), Some(Time::from_secs(50)));
let _ = injector.pop_timed();
assert_eq!(
injector.peek_earliest_deadline(),
Some(Time::from_secs(100)),
"peek should reflect next earliest after pop"
);
let _ = injector.pop_timed();
assert_eq!(
injector.peek_earliest_deadline(),
None,
"peek should be None after draining all timed work"
);
}
#[test]
fn peek_earliest_deadline_updates_after_pop_if_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(50));
injector.inject_timed(task(2), Time::from_secs(100));
let _ = injector.pop_timed_if_due(Time::from_secs(50));
assert_eq!(
injector.peek_earliest_deadline(),
Some(Time::from_secs(100)),
"peek updated after pop_timed_if_due"
);
}
#[test]
fn concurrent_decrements_saturate_counters_at_zero() {
for _ in 0..2_000 {
let injector = Arc::new(GlobalInjector::new());
injector.ready_count.store(1, Ordering::Relaxed);
let barrier = Arc::new(Barrier::new(3));
let i1 = Arc::clone(&injector);
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
i1.decrement_ready_count();
});
let i2 = Arc::clone(&injector);
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
i2.decrement_ready_count();
});
barrier.wait();
h1.join().expect("first decrement thread should complete");
h2.join().expect("second decrement thread should complete");
assert_eq!(
injector.ready_count.load(Ordering::Relaxed),
0,
"ready counter must saturate at zero"
);
}
}
#[test]
fn priority_task_debug_clone_copy() {
let pt = PriorityTask {
task: task(1),
priority: 5,
};
let dbg = format!("{pt:?}");
assert!(dbg.contains("PriorityTask"), "{dbg}");
let copied = pt;
let cloned = pt;
assert_eq!(copied.task, cloned.task);
assert_eq!(copied.priority, cloned.priority);
}
#[test]
fn timed_task_debug_clone_copy_eq() {
let tt = TimedTask::new(task(1), Time::from_nanos(1000), 0);
let dbg = format!("{tt:?}");
assert!(dbg.contains("TimedTask"), "{dbg}");
let copied = tt;
let cloned = tt;
assert_eq!(copied, cloned);
}
}