use crate::types::{TaskId, Time};
use crate::util::CachePadded;
use parking_lot::Mutex;
use std::cmp::Ordering as CmpOrdering;
use std::collections::BinaryHeap;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use super::global_queue::{CountReservation, GlobalFifoQueue};
const READY_COMBINER_IN_FLIGHT_THRESHOLD: usize = 4;
const READY_COMBINER_BACKLOG_THRESHOLD: usize = 256;
const READY_COMBINER_MAX_BATCH: usize = 64;
#[derive(Debug, Clone, Copy)]
pub struct PriorityTask {
pub task: TaskId,
pub priority: u8,
}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub struct ReadyCombinerSnapshot {
pub direct_injections: usize,
pub deferred_injections: usize,
pub combined_injections: usize,
pub fallback_injections: usize,
pub combiner_claim_failures: usize,
pub mode_entries: usize,
pub mode_exits: usize,
pub flushes: usize,
pub max_batch: usize,
pub max_in_flight: usize,
pub current_in_flight: usize,
pub pending_len: usize,
pub ready_len: usize,
}
#[derive(Debug, Default)]
struct ReadyCombiner {
active: CachePadded<AtomicBool>,
in_flight: CachePadded<AtomicUsize>,
pending: Mutex<Vec<PriorityTask>>,
direct_injections: CachePadded<AtomicUsize>,
deferred_injections: CachePadded<AtomicUsize>,
combined_injections: CachePadded<AtomicUsize>,
fallback_injections: CachePadded<AtomicUsize>,
combiner_claim_failures: CachePadded<AtomicUsize>,
mode_entries: CachePadded<AtomicUsize>,
mode_exits: CachePadded<AtomicUsize>,
flushes: CachePadded<AtomicUsize>,
max_batch: CachePadded<AtomicUsize>,
max_in_flight: CachePadded<AtomicUsize>,
}
impl ReadyCombiner {
#[inline]
fn begin_injection(&self) -> usize {
let observed = self.in_flight.fetch_add(1, Ordering::AcqRel) + 1;
self.max_in_flight.fetch_max(observed, Ordering::Relaxed);
observed
}
#[inline]
fn finish_injection(&self) {
let _ = self.in_flight.fetch_sub(1, Ordering::AcqRel);
}
#[inline]
fn should_combine(in_flight: usize, ready_backlog: usize) -> bool {
in_flight >= READY_COMBINER_IN_FLIGHT_THRESHOLD
|| (in_flight > 1 && ready_backlog >= READY_COMBINER_BACKLOG_THRESHOLD)
}
#[inline]
fn snapshot(&self, ready_len: usize) -> ReadyCombinerSnapshot {
ReadyCombinerSnapshot {
direct_injections: self.direct_injections.load(Ordering::Relaxed),
deferred_injections: self.deferred_injections.load(Ordering::Relaxed),
combined_injections: self.combined_injections.load(Ordering::Relaxed),
fallback_injections: self.fallback_injections.load(Ordering::Relaxed),
combiner_claim_failures: self.combiner_claim_failures.load(Ordering::Relaxed),
mode_entries: self.mode_entries.load(Ordering::Relaxed),
mode_exits: self.mode_exits.load(Ordering::Relaxed),
flushes: self.flushes.load(Ordering::Relaxed),
max_batch: self.max_batch.load(Ordering::Relaxed),
max_in_flight: self.max_in_flight.load(Ordering::Relaxed),
current_in_flight: self.in_flight.load(Ordering::Relaxed),
pending_len: self.pending.lock().len(),
ready_len,
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct TimedTask {
pub task: TaskId,
pub deadline: Time,
generation: u64,
}
impl TimedTask {
fn new(task: TaskId, deadline: Time, generation: u64) -> Self {
Self {
task,
deadline,
generation,
}
}
}
impl Ord for TimedTask {
#[inline]
fn cmp(&self, other: &Self) -> CmpOrdering {
other
.deadline
.cmp(&self.deadline)
.then_with(|| {
let diff = other.generation.wrapping_sub(self.generation).cast_signed();
diff.cmp(&0)
})
.then_with(|| other.task.cmp(&self.task))
}
}
impl PartialOrd for TimedTask {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}
#[derive(Debug)]
pub struct GlobalInjector {
cancel_queue: GlobalFifoQueue<PriorityTask>,
timed_queue: Mutex<TimedQueue>,
ready_queue: GlobalFifoQueue<PriorityTask>,
ready_combiner: ReadyCombiner,
timed_count: CachePadded<AtomicUsize>,
cached_earliest_deadline: CachePadded<AtomicU64>,
}
#[derive(Debug, Default)]
struct TimedQueue {
heap: BinaryHeap<TimedTask>,
next_generation: u64,
}
impl Default for GlobalInjector {
fn default() -> Self {
Self {
cancel_queue: GlobalFifoQueue::default(),
timed_queue: Mutex::new(TimedQueue::default()),
ready_queue: GlobalFifoQueue::default(),
ready_combiner: ReadyCombiner::default(),
timed_count: CachePadded::new(AtomicUsize::new(0)),
cached_earliest_deadline: CachePadded::new(AtomicU64::new(u64::MAX)),
}
}
}
impl GlobalInjector {
#[inline]
fn saturating_decrement(counter: &AtomicUsize) {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| {
count.checked_sub(1)
});
}
#[inline]
fn decrement_timed_count(&self) {
Self::saturating_decrement(&self.timed_count);
}
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn inject_cancel(&self, task: TaskId, priority: u8) {
self.cancel_queue.push(PriorityTask { task, priority });
}
#[inline]
pub fn inject_timed(&self, task: TaskId, deadline: Time) {
self.timed_count.fetch_add(1, Ordering::Relaxed);
let mut queue = self.timed_queue.lock();
let generation = queue.next_generation;
queue.next_generation += 1;
queue.heap.push(TimedTask::new(task, deadline, generation));
let earliest = queue
.heap
.peek()
.map_or(u64::MAX, |t| t.deadline.as_nanos());
self.cached_earliest_deadline
.store(earliest, Ordering::Relaxed);
drop(queue);
}
#[inline]
pub fn inject_ready(&self, task: TaskId, priority: u8) {
let entry = PriorityTask { task, priority };
let in_flight = self.ready_combiner.begin_injection();
let ready_backlog = self.ready_queue.len();
if ReadyCombiner::should_combine(in_flight, ready_backlog) {
self.inject_ready_contentious(entry);
} else {
self.ready_queue.push(entry);
self.ready_combiner
.direct_injections
.fetch_add(1, Ordering::Relaxed);
}
self.ready_combiner.finish_injection();
}
fn inject_ready_contentious(&self, entry: PriorityTask) {
if self
.ready_combiner
.active
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.ready_combiner
.mode_entries
.fetch_add(1, Ordering::Relaxed);
self.flush_ready_combiner_with(entry);
return;
}
self.ready_combiner
.combiner_claim_failures
.fetch_add(1, Ordering::Relaxed);
if let Some(mut pending) = self.ready_combiner.pending.try_lock() {
if self.ready_combiner.active.load(Ordering::Acquire) {
pending.push(entry);
if self.ready_combiner.active.load(Ordering::Acquire) {
self.ready_combiner
.deferred_injections
.fetch_add(1, Ordering::Relaxed);
return;
}
if pending
.last()
.is_some_and(|last| last.task == entry.task && last.priority == entry.priority)
{
pending.pop();
}
}
}
self.ready_queue.push(entry);
self.ready_combiner
.fallback_injections
.fetch_add(1, Ordering::Relaxed);
}
fn flush_ready_combiner_with(&self, first: PriorityTask) {
let mut batch = Vec::with_capacity(READY_COMBINER_MAX_BATCH);
batch.push(first);
loop {
{
let mut pending = self.ready_combiner.pending.lock();
let take = READY_COMBINER_MAX_BATCH.saturating_sub(batch.len());
if take > 0 {
let drain_len = take.min(pending.len());
batch.extend(pending.drain(..drain_len));
}
}
self.publish_ready_batch(&mut batch);
let mut pending = self.ready_combiner.pending.lock();
if pending.is_empty() {
self.ready_combiner.active.store(false, Ordering::Release);
self.ready_combiner
.mode_exits
.fetch_add(1, Ordering::Relaxed);
break;
}
let take = READY_COMBINER_MAX_BATCH.min(pending.len());
batch.extend(pending.drain(..take));
}
}
fn publish_ready_batch(&self, batch: &mut Vec<PriorityTask>) {
let count = batch.len();
if count == 0 {
return;
}
self.ready_combiner
.max_batch
.fetch_max(count, Ordering::Relaxed);
self.ready_combiner.flushes.fetch_add(1, Ordering::Relaxed);
self.ready_combiner
.combined_injections
.fetch_add(count, Ordering::Relaxed);
let mut reservation = self.ready_queue.reserve_count(count);
for entry in batch.drain(..) {
self.ready_queue.push_uncounted(entry);
reservation.publish_one();
}
}
#[inline]
pub(crate) fn inject_ready_uncounted(&self, task: TaskId, priority: u8) {
self.ready_queue
.push_uncounted(PriorityTask { task, priority });
}
#[inline]
pub(crate) fn reserve_ready_count(&self, count: usize) -> CountReservation<'_, PriorityTask> {
self.ready_queue.reserve_count(count)
}
#[inline]
#[must_use]
pub fn pop_cancel(&self) -> Option<PriorityTask> {
self.cancel_queue.pop()
}
#[inline]
#[must_use]
pub fn pop_timed(&self) -> Option<TimedTask> {
if self.timed_count.load(Ordering::Relaxed) == 0 {
return None;
}
let mut queue = self.timed_queue.lock();
let result = queue.heap.pop();
let earliest = queue
.heap
.peek()
.map_or(u64::MAX, |t| t.deadline.as_nanos());
self.cached_earliest_deadline
.store(earliest, Ordering::Relaxed);
drop(queue);
if result.is_some() {
self.decrement_timed_count();
}
result
}
#[inline]
#[must_use]
pub fn peek_earliest_deadline(&self) -> Option<Time> {
if self.timed_count.load(Ordering::Relaxed) == 0 {
return None;
}
let cached = self.cached_earliest_deadline.load(Ordering::Relaxed);
if cached == u64::MAX {
None
} else {
Some(Time::from_nanos(cached))
}
}
#[inline]
#[must_use]
pub fn pop_timed_if_due(&self, now: Time) -> Option<TimedTask> {
if self.timed_count.load(Ordering::Relaxed) == 0 {
return None;
}
let cached = self.cached_earliest_deadline.load(Ordering::Relaxed);
if cached != u64::MAX && Time::from_nanos(cached) > now {
return None;
}
let mut queue = self.timed_queue.lock();
if let Some(entry) = queue.heap.peek() {
if entry.deadline <= now {
let result = queue.heap.pop();
let earliest = queue
.heap
.peek()
.map_or(u64::MAX, |t| t.deadline.as_nanos());
self.cached_earliest_deadline
.store(earliest, Ordering::Relaxed);
drop(queue);
if result.is_some() {
self.decrement_timed_count();
}
return result;
}
}
None
}
#[inline]
#[must_use]
pub fn pop_ready(&self) -> Option<PriorityTask> {
self.ready_queue.pop()
}
#[inline]
pub(crate) fn pop_ready_batch_into(&self, max: usize, out: &mut Vec<PriorityTask>) -> usize {
self.ready_queue.pop_batch_into(max, out)
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.cancel_queue.is_empty()
&& self.timed_count.load(Ordering::Relaxed) == 0
&& self.ready_queue.is_empty()
}
#[inline]
#[must_use]
pub fn has_runnable_work(&self, now: Time) -> bool {
if !self.cancel_queue.is_empty() || !self.ready_queue.is_empty() {
return true;
}
if self.timed_count.load(Ordering::Relaxed) == 0 {
return false;
}
let earliest = self.cached_earliest_deadline.load(Ordering::Relaxed);
earliest != u64::MAX && Time::from_nanos(earliest) <= now
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.cancel_queue.len() + self.timed_count.load(Ordering::Relaxed) + self.ready_queue.len()
}
#[inline]
#[must_use]
pub fn has_cancel_work(&self) -> bool {
!self.cancel_queue.is_empty()
}
#[inline]
#[must_use]
pub fn cancel_count(&self) -> usize {
self.cancel_queue.len()
}
#[inline]
#[must_use]
pub fn has_timed_work(&self) -> bool {
self.timed_count.load(Ordering::Relaxed) > 0
}
#[inline]
#[must_use]
pub fn has_ready_work(&self) -> bool {
!self.ready_queue.is_empty()
}
#[inline]
#[must_use]
pub fn ready_count(&self) -> usize {
self.ready_queue.len()
}
#[inline]
#[must_use]
pub fn ready_combiner_snapshot(&self) -> ReadyCombinerSnapshot {
self.ready_combiner.snapshot(self.ready_queue.len())
}
#[doc(hidden)]
#[cfg(any(test, feature = "test-internals"))]
pub fn seed_ready_combiner_pressure_for_test(
&self,
max_in_flight: usize,
combiner_claim_failures: usize,
) {
self.ready_combiner.active.store(false, Ordering::Release);
self.ready_combiner.in_flight.store(0, Ordering::Release);
self.ready_combiner
.max_in_flight
.store(max_in_flight, Ordering::Relaxed);
self.ready_combiner
.combiner_claim_failures
.store(combiner_claim_failures, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use std::collections::HashSet;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
fn task(id: u32) -> TaskId {
TaskId::new_for_test(1, id)
}
fn contention_task(producer: usize, offset: usize) -> TaskId {
TaskId::new_for_test(
u32::try_from(producer + 10).expect("producer id should fit in u32"),
u32::try_from(offset).expect("task offset should fit in u32"),
)
}
fn run_ready_combiner_contention_case(
producers: usize,
items_per_producer: usize,
) -> ReadyCombinerSnapshot {
let injector = Arc::new(GlobalInjector::new());
let barrier = Arc::new(Barrier::new(producers));
let max_enqueue_tail_ns = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let handles = (0..producers)
.map(|producer| {
let injector = Arc::clone(&injector);
let barrier = Arc::clone(&barrier);
let max_enqueue_tail_ns = Arc::clone(&max_enqueue_tail_ns);
thread::spawn(move || {
barrier.wait();
for offset in 0..items_per_producer {
let enqueue_start = Instant::now();
injector.inject_ready(contention_task(producer, offset), 50);
let elapsed_ns =
u64::try_from(enqueue_start.elapsed().as_nanos()).unwrap_or(u64::MAX);
max_enqueue_tail_ns.fetch_max(elapsed_ns, Ordering::Relaxed);
}
})
})
.collect::<Vec<_>>();
for handle in handles {
handle
.join()
.expect("ready combiner producer should complete");
}
let mut seen = HashSet::new();
while let Some(task) = injector.pop_ready() {
assert!(
seen.insert(task.task),
"ready combiner scenario must not double-enqueue task {:?}",
task.task
);
}
let total_items = producers * items_per_producer;
assert_eq!(
seen.len(),
total_items,
"ready combiner scenario must not lose tasks"
);
let snapshot = injector.ready_combiner_snapshot();
assert_eq!(
snapshot.direct_injections
+ snapshot.fallback_injections
+ snapshot.combined_injections,
total_items,
"direct + fallback + combined publications must account for every injected task"
);
assert_eq!(
snapshot.mode_entries, snapshot.mode_exits,
"combining mode should not remain active after producers finish"
);
assert_eq!(
snapshot.pending_len, 0,
"combiner pending buffer should drain completely"
);
assert_eq!(snapshot.ready_len, 0, "ready queue should drain completely");
assert_eq!(
snapshot.current_in_flight, 0,
"ready injection in-flight count should converge to zero"
);
println!(
"READY_COMBINER_SCENARIO producers={producers} items_per_producer={items_per_producer} total_items={total_items} elapsed_ns={} max_enqueue_tail_ns={} direct_injections={} deferred_injections={} combined_injections={} fallback_injections={} combiner_claim_failures={} mode_entries={} mode_exits={} mode_switches={} flushes={} max_batch={} max_in_flight={}",
u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX),
max_enqueue_tail_ns.load(Ordering::Relaxed),
snapshot.direct_injections,
snapshot.deferred_injections,
snapshot.combined_injections,
snapshot.fallback_injections,
snapshot.combiner_claim_failures,
snapshot.mode_entries,
snapshot.mode_exits,
snapshot.mode_entries + snapshot.mode_exits,
snapshot.flushes,
snapshot.max_batch,
snapshot.max_in_flight
);
snapshot
}
#[test]
fn inject_and_pop_cancel() {
let injector = GlobalInjector::new();
injector.inject_cancel(task(1), 100);
injector.inject_cancel(task(2), 50);
assert!(!injector.is_empty());
assert!(injector.has_cancel_work());
let first = injector.pop_cancel().unwrap();
assert_eq!(first.task, task(1));
let second = injector.pop_cancel().unwrap();
assert_eq!(second.task, task(2));
assert!(injector.pop_cancel().is_none());
}
#[test]
fn inject_and_pop_timed() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert!(injector.has_timed_work());
let first = injector.pop_timed().unwrap();
assert_eq!(first.task, task(2)); assert_eq!(first.deadline, Time::from_secs(50));
let second = injector.pop_timed().unwrap();
assert_eq!(second.task, task(1)); assert_eq!(second.deadline, Time::from_secs(100));
}
#[test]
fn edf_ordering_multiple_tasks() {
let injector = GlobalInjector::new();
injector.inject_timed(task(3), Time::from_secs(75));
injector.inject_timed(task(1), Time::from_secs(25));
injector.inject_timed(task(4), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert_eq!(injector.pop_timed().unwrap().task, task(1));
assert_eq!(injector.pop_timed().unwrap().task, task(2));
assert_eq!(injector.pop_timed().unwrap().task, task(3));
assert_eq!(injector.pop_timed().unwrap().task, task(4));
}
#[test]
fn equal_deadlines_fifo_order() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(50));
injector.inject_timed(task(2), Time::from_secs(50));
injector.inject_timed(task(3), Time::from_secs(50));
assert_eq!(injector.pop_timed().unwrap().task, task(1));
assert_eq!(injector.pop_timed().unwrap().task, task(2));
assert_eq!(injector.pop_timed().unwrap().task, task(3));
}
#[test]
fn pop_timed_if_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert!(injector.pop_timed_if_due(Time::from_secs(25)).is_none());
assert!(injector.has_timed_work());
let due = injector.pop_timed_if_due(Time::from_secs(50)).unwrap();
assert_eq!(due.task, task(2));
assert!(injector.pop_timed_if_due(Time::from_secs(75)).is_none());
let due = injector.pop_timed_if_due(Time::from_secs(100)).unwrap();
assert_eq!(due.task, task(1));
}
#[test]
fn peek_earliest_deadline() {
let injector = GlobalInjector::new();
assert!(injector.peek_earliest_deadline().is_none());
injector.inject_timed(task(1), Time::from_secs(100));
assert_eq!(
injector.peek_earliest_deadline(),
Some(Time::from_secs(100))
);
injector.inject_timed(task(2), Time::from_secs(50));
assert_eq!(injector.peek_earliest_deadline(), Some(Time::from_secs(50)));
assert_eq!(injector.peek_earliest_deadline(), Some(Time::from_secs(50)));
}
#[test]
fn inject_and_pop_ready() {
let injector = GlobalInjector::new();
injector.inject_ready(task(1), 100);
assert!(injector.has_ready_work());
let popped = injector.pop_ready().unwrap();
assert_eq!(popped.task, task(1));
assert_eq!(popped.priority, 100);
}
#[test]
fn pending_count_accuracy() {
let injector = GlobalInjector::new();
assert_eq!(injector.len(), 0);
injector.inject_cancel(task(1), 100);
injector.inject_timed(task(2), Time::from_secs(10));
injector.inject_ready(task(3), 50);
assert_eq!(injector.len(), 3);
let _ = injector.pop_cancel();
assert_eq!(injector.len(), 2);
let _ = injector.pop_timed();
let _ = injector.pop_ready();
assert_eq!(injector.len(), 0);
}
#[test]
fn pop_does_not_underflow_when_counter_lags() {
let injector = GlobalInjector::new();
injector.cancel_queue.push_uncounted(PriorityTask {
task: task(10),
priority: 1,
});
let popped_cancel = injector.pop_cancel().expect("cancel task should pop");
assert_eq!(popped_cancel.task, task(10));
injector.ready_queue.push_uncounted(PriorityTask {
task: task(11),
priority: 2,
});
assert_eq!(injector.ready_queue.len(), 0);
let popped_ready = injector.pop_ready().expect("ready task should pop");
assert_eq!(popped_ready.task, task(11));
assert_eq!(injector.ready_queue.len(), 0);
}
#[test]
fn readiness_checks_use_ready_counter_to_avoid_false_empty_window() {
let injector = GlobalInjector::new();
injector.ready_queue.add_count(1);
assert!(
!injector.is_empty(),
"counter-visible ready work must not report empty"
);
assert!(
injector.has_ready_work(),
"counter-visible ready work must report ready lane activity"
);
assert!(
injector.has_runnable_work(Time::ZERO),
"counter-visible ready work must report runnable work"
);
injector.ready_queue.push_uncounted(PriorityTask {
task: task(14),
priority: 9,
});
let popped_ready = injector.pop_ready().expect("ready task should pop");
assert_eq!(popped_ready.task, task(14));
assert_eq!(injector.ready_queue.len(), 0);
assert!(injector.is_empty(), "injector returns empty after pop");
}
#[test]
fn timed_pop_does_not_underflow_when_counter_lags() {
let injector = GlobalInjector::new();
{
let mut timed = injector.timed_queue.lock();
timed
.heap
.push(TimedTask::new(task(12), Time::from_secs(10), 0));
}
injector.timed_count.fetch_add(1, Ordering::Relaxed);
let popped_timed = injector.pop_timed().expect("timed task should pop");
assert_eq!(popped_timed.task, task(12));
assert_eq!(injector.timed_count.load(Ordering::Relaxed), 0);
}
#[test]
fn timed_pop_if_due_does_not_underflow_when_counter_lags() {
let injector = GlobalInjector::new();
{
let mut timed = injector.timed_queue.lock();
timed
.heap
.push(TimedTask::new(task(13), Time::from_secs(10), 0));
}
injector.timed_count.fetch_add(1, Ordering::Relaxed);
assert!(injector.pop_timed_if_due(Time::from_secs(9)).is_none());
assert_eq!(injector.timed_count.load(Ordering::Relaxed), 1);
let popped_timed = injector
.pop_timed_if_due(Time::from_secs(10))
.expect("timed task should pop when due");
assert_eq!(popped_timed.task, task(13));
assert_eq!(injector.timed_count.load(Ordering::Relaxed), 0);
}
#[test]
fn has_runnable_work_empty_returns_false() {
let injector = GlobalInjector::new();
assert!(
!injector.has_runnable_work(Time::ZERO),
"empty injector has no runnable work"
);
}
#[test]
fn has_runnable_work_cancel_always_runnable() {
let injector = GlobalInjector::new();
injector.inject_cancel(task(1), 100);
assert!(
injector.has_runnable_work(Time::ZERO),
"cancel work is always runnable regardless of time"
);
}
#[test]
fn has_runnable_work_ready_always_runnable() {
let injector = GlobalInjector::new();
injector.inject_ready(task(1), 50);
assert!(
injector.has_runnable_work(Time::ZERO),
"ready work is always runnable regardless of time"
);
}
#[test]
fn has_runnable_work_timed_not_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
assert!(
!injector.has_runnable_work(Time::from_secs(50)),
"timed work with future deadline is not runnable"
);
}
#[test]
fn has_runnable_work_timed_exactly_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
assert!(
injector.has_runnable_work(Time::from_secs(100)),
"timed work at exactly its deadline is runnable"
);
}
#[test]
fn has_runnable_work_timed_past_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
assert!(
injector.has_runnable_work(Time::from_secs(200)),
"timed work past its deadline is runnable"
);
}
#[test]
fn has_runnable_work_only_timed_with_mixed_deadlines() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(100));
injector.inject_timed(task(2), Time::from_secs(50));
assert!(
!injector.has_runnable_work(Time::from_secs(25)),
"no timed work due at t=25"
);
assert!(
injector.has_runnable_work(Time::from_secs(50)),
"earliest timed work (t=50) is due"
);
}
#[test]
fn peek_earliest_deadline_updates_after_pop() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(50));
injector.inject_timed(task(2), Time::from_secs(100));
assert_eq!(injector.peek_earliest_deadline(), Some(Time::from_secs(50)));
let _ = injector.pop_timed();
assert_eq!(
injector.peek_earliest_deadline(),
Some(Time::from_secs(100)),
"peek should reflect next earliest after pop"
);
let _ = injector.pop_timed();
assert_eq!(
injector.peek_earliest_deadline(),
None,
"peek should be None after draining all timed work"
);
}
#[test]
fn peek_earliest_deadline_updates_after_pop_if_due() {
let injector = GlobalInjector::new();
injector.inject_timed(task(1), Time::from_secs(50));
injector.inject_timed(task(2), Time::from_secs(100));
let _ = injector.pop_timed_if_due(Time::from_secs(50));
assert_eq!(
injector.peek_earliest_deadline(),
Some(Time::from_secs(100)),
"peek updated after pop_timed_if_due"
);
}
#[test]
fn concurrent_decrements_saturate_counters_at_zero() {
for _ in 0..2_000 {
let injector = Arc::new(GlobalInjector::new());
injector.ready_queue.add_count(1);
let barrier = Arc::new(Barrier::new(3));
let i1 = Arc::clone(&injector);
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
i1.ready_queue.decrement_count();
});
let i2 = Arc::clone(&injector);
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
i2.ready_queue.decrement_count();
});
barrier.wait();
h1.join().expect("first decrement thread should complete");
h2.join().expect("second decrement thread should complete");
assert_eq!(
injector.ready_queue.len(),
0,
"ready counter must saturate at zero"
);
}
}
#[test]
fn ready_combiner_low_contention_preserves_direct_path() {
let injector = GlobalInjector::new();
injector.inject_ready(task(31), 90);
let snapshot = injector.ready_combiner_snapshot();
assert_eq!(
snapshot.direct_injections, 1,
"single ready injection should stay on the direct fast path"
);
assert_eq!(
snapshot.mode_entries, 0,
"low-contention ready injection must not enter combining mode"
);
assert_eq!(snapshot.ready_len, 1, "direct path should publish one task");
let popped = injector.pop_ready().expect("direct ready task should pop");
assert_eq!(popped.task, task(31));
assert_eq!(popped.priority, 90);
}
#[test]
fn ready_combiner_falls_back_to_direct_queue_when_pending_buffer_is_busy() {
let injector = GlobalInjector::new();
injector
.ready_combiner
.in_flight
.store(READY_COMBINER_IN_FLIGHT_THRESHOLD - 1, Ordering::Release);
injector
.ready_combiner
.active
.store(true, Ordering::Release);
let pending_guard = injector.ready_combiner.pending.lock();
injector.inject_ready(task(32), 10);
drop(pending_guard);
injector
.ready_combiner
.active
.store(false, Ordering::Release);
injector
.ready_combiner
.in_flight
.store(0, Ordering::Release);
let snapshot = injector.ready_combiner_snapshot();
assert_eq!(
snapshot.fallback_injections, 1,
"busy pending buffer should fall back to the baseline queue path"
);
assert_eq!(
snapshot.combiner_claim_failures, 1,
"fallback should record the failed active-combiner CAS"
);
let popped = injector
.pop_ready()
.expect("fallback direct ready task should pop");
assert_eq!(popped.task, task(32));
assert!(
injector.pop_ready().is_none(),
"fallback path must not double-enqueue"
);
}
#[test]
fn ready_combiner_handoff_flushes_deferred_batch_without_loss_or_duplicates() {
let injector = GlobalInjector::new();
injector
.ready_combiner
.active
.store(true, Ordering::Release);
{
let mut pending = injector.ready_combiner.pending.lock();
pending.push(PriorityTask {
task: task(42),
priority: 7,
});
pending.push(PriorityTask {
task: task(43),
priority: 8,
});
}
injector.flush_ready_combiner_with(PriorityTask {
task: task(41),
priority: 6,
});
let snapshot = injector.ready_combiner_snapshot();
assert_eq!(
snapshot.combined_injections, 3,
"combiner should publish the owner task and deferred suffix"
);
assert_eq!(snapshot.flushes, 1, "three tasks fit in one combiner flush");
assert_eq!(
snapshot.max_batch, 3,
"max batch should record handoff size"
);
assert_eq!(
snapshot.pending_len, 0,
"handoff should leave no pending ready tasks"
);
let drained = std::iter::from_fn(|| injector.pop_ready()).collect::<Vec<_>>();
assert_eq!(
drained.len(),
3,
"handoff should publish exactly three tasks"
);
assert_eq!(drained[0].task, task(41));
assert_eq!(drained[1].task, task(42));
assert_eq!(drained[2].task, task(43));
assert!(
injector.pop_ready().is_none(),
"combiner handoff must not leave duplicate tasks behind"
);
}
#[test]
fn ready_combiner_contention_scenario_logs_required_producer_counts() {
let one = run_ready_combiner_contention_case(1, 128);
assert_eq!(
one.mode_entries, 0,
"single-producer run should preserve the low-contention direct path"
);
assert_eq!(
one.direct_injections, 128,
"single-producer run should account for all tasks as direct injections"
);
for producers in [8, 32, 64] {
let snapshot = run_ready_combiner_contention_case(producers, 128);
assert!(
snapshot.max_in_flight >= 1,
"scenario should report an in-flight pressure metric"
);
}
}
#[test]
fn priority_task_debug_clone_copy() {
let pt = PriorityTask {
task: task(1),
priority: 5,
};
let dbg = format!("{pt:?}");
assert!(dbg.contains("PriorityTask"), "{dbg}");
let copied = pt;
let cloned = pt;
assert_eq!(copied.task, cloned.task);
assert_eq!(copied.priority, cloned.priority);
}
#[test]
fn timed_task_debug_clone_copy_eq() {
let tt = TimedTask::new(task(1), Time::from_nanos(1000), 0);
let dbg = format!("{tt:?}");
assert!(dbg.contains("TimedTask"), "{dbg}");
let copied = tt;
let cloned = tt;
assert_eq!(copied, cloned);
}
}