use crate::cancel::progress_certificate::{DrainPhase, ProgressCertificate};
use crate::obligation::lyapunov::{
LyapunovGovernor, PotentialWeights, SchedulingSuggestion, StateSnapshot,
};
use crate::observability::spectral_health::{SpectralHealthMonitor, SpectralThresholds};
use crate::runtime::io_driver::IoDriverHandle;
use crate::runtime::scheduler::global_injector::GlobalInjector;
use crate::runtime::scheduler::local_queue::{self, LocalQueue};
use crate::runtime::scheduler::priority::Scheduler as PriorityScheduler;
use crate::runtime::scheduler::worker::Parker;
use crate::runtime::stored_task::AnyStoredTask;
use crate::runtime::{RuntimeState, TaskTable};
use crate::sync::ContendedMutex;
use crate::time::TimerDriverHandle;
use crate::tracing_compat::{error, trace};
use crate::types::{CxInner, TaskId, Time};
use crate::util::{CachePadded, DetHasher, DetRng};
use parking_lot::Mutex;
use parking_lot::RwLock;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
pub type WorkerId = usize;
const DEFAULT_CANCEL_STREAK_LIMIT: usize = 16;
const DEFAULT_BROWSER_READY_HANDOFF_LIMIT: usize = 0;
const DEFAULT_STEAL_BATCH_SIZE: usize = 4;
const DEFAULT_ENABLE_PARKING: bool = true;
const LOCAL_SCHEDULER_BURST_BUDGET: usize = 2048;
const LOCAL_SCHEDULER_MIN_CAPACITY: usize = 128;
const LOCAL_SCHEDULER_MAX_CAPACITY: usize = 1024;
const ADAPTIVE_STREAK_ARMS: [usize; 5] = [4, 8, 16, 32, 64];
const ADAPTIVE_EXP3_GAMMA: f64 = 0.07;
const ADAPTIVE_EPROCESS_LAMBDA: f64 = 0.5;
const SPIN_LIMIT: u32 = 8;
const YIELD_LIMIT: u32 = 2;
const SHORT_WAIT_LE_5MS_NANOS: u64 = 5_000_000;
type LocalReadyQueue = Mutex<VecDeque<TaskId>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum IoPhaseOutcome {
Progress,
Follower,
NoProgress,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BackoffTimeoutDecision {
ParkTimeout { nanos: u64 },
DeadlineDue,
}
#[inline]
fn select_backoff_deadline(
io_phase: IoPhaseOutcome,
timer_deadline: Option<Time>,
local_deadline: Option<Time>,
global_deadline: Option<Time>,
) -> Option<Time> {
if matches!(io_phase, IoPhaseOutcome::Follower) {
local_deadline
} else {
[timer_deadline, local_deadline, global_deadline]
.into_iter()
.flatten()
.min()
}
}
#[inline]
fn record_backoff_deadline_selection(
metrics: &mut PreemptionMetrics,
io_phase: IoPhaseOutcome,
timer_deadline: Option<Time>,
global_deadline: Option<Time>,
) {
if matches!(io_phase, IoPhaseOutcome::Follower)
&& (timer_deadline.is_some() || global_deadline.is_some())
{
metrics.follower_shared_deadline_ignored += 1;
}
}
#[inline]
fn record_backoff_timeout_park(
metrics: &mut PreemptionMetrics,
io_phase: IoPhaseOutcome,
nanos: u64,
) {
metrics.backoff_parks_total += 1;
metrics.backoff_timeout_parks_total += 1;
metrics.backoff_timeout_nanos_total = metrics.backoff_timeout_nanos_total.saturating_add(nanos);
if nanos <= SHORT_WAIT_LE_5MS_NANOS {
metrics.short_wait_le_5ms += 1;
}
if matches!(io_phase, IoPhaseOutcome::Follower) {
metrics.follower_timeout_parks += 1;
}
}
#[inline]
fn classify_backoff_timeout_decision(
_io_phase: IoPhaseOutcome,
next_deadline: Time,
now: Time,
) -> BackoffTimeoutDecision {
if next_deadline <= now {
BackoffTimeoutDecision::DeadlineDue
} else {
let nanos = next_deadline.duration_since(now);
BackoffTimeoutDecision::ParkTimeout { nanos }
}
}
#[inline]
fn record_backoff_indefinite_park(metrics: &mut PreemptionMetrics, io_phase: IoPhaseOutcome) {
metrics.backoff_parks_total += 1;
metrics.backoff_indefinite_parks += 1;
if matches!(io_phase, IoPhaseOutcome::Follower) {
metrics.follower_indefinite_parks += 1;
}
}
#[inline]
#[allow(clippy::cast_precision_loss)]
fn usize_to_f64(value: usize) -> f64 {
value as f64
}
#[inline]
#[allow(clippy::cast_precision_loss)]
fn u64_to_f64(value: u64) -> f64 {
value as f64
}
#[inline]
#[allow(clippy::cast_precision_loss)]
fn normalized_entropy(probs: &[f64]) -> f64 {
if probs.len() <= 1 {
return 0.0;
}
let mut entropy = 0.0_f64;
for &p in probs {
if p > f64::EPSILON {
entropy = p.mul_add(-p.ln(), entropy);
}
}
let max_entropy = (probs.len() as f64).ln();
if max_entropy <= f64::EPSILON {
0.0
} else {
(entropy / max_entropy).clamp(0.0, 1.0)
}
}
#[derive(Debug, Clone, Copy)]
struct AdaptiveEpochSnapshot {
potential: f64,
deadline_pressure: f64,
base_limit_exceedances: u64,
effective_limit_exceedances: u64,
fallback_cancel_dispatches: u64,
}
impl AdaptiveEpochSnapshot {
fn reward_against(self, end: Self, epoch_steps: u32) -> f64 {
let denom = self.potential.abs() + 1.0;
let normalized_drop = ((self.potential - end.potential) / denom).clamp(-1.0, 1.0);
let deadline_penalty = ((end.deadline_pressure - self.deadline_pressure).max(0.0)
/ (self.deadline_pressure.abs() + 1.0))
.clamp(0.0, 1.0);
let eps = f64::from(epoch_steps.max(1));
let base_exceedances = u64_to_f64(
end.base_limit_exceedances
.saturating_sub(self.base_limit_exceedances),
);
let effective_exceedances = u64_to_f64(
end.effective_limit_exceedances
.saturating_sub(self.effective_limit_exceedances),
);
let fairness_penalty = 2.0f64.mul_add(effective_exceedances, base_exceedances) / eps;
let fallback_penalty = u64_to_f64(
end.fallback_cancel_dispatches
.saturating_sub(self.fallback_cancel_dispatches),
) / eps;
let reward = 0.5f64.mul_add(normalized_drop, 0.5);
let reward = (-0.2f64).mul_add(deadline_penalty, reward);
let reward = (-0.2f64).mul_add(fairness_penalty.clamp(0.0, 1.0), reward);
let reward = (-0.1f64).mul_add(fallback_penalty.clamp(0.0, 1.0), reward);
reward.clamp(0.0, 1.0)
}
}
#[derive(Debug, Clone)]
struct AdaptiveCancelStreakPolicy {
arms: [usize; ADAPTIVE_STREAK_ARMS.len()],
weights: [f64; ADAPTIVE_STREAK_ARMS.len()],
probs: [f64; ADAPTIVE_STREAK_ARMS.len()],
pulls: [u64; ADAPTIVE_STREAK_ARMS.len()],
selected_arm: usize,
epoch_steps: u32,
steps_in_epoch: u32,
epoch_count: u64,
reward_ema: f64,
e_process_log: f64,
epoch_start: Option<AdaptiveEpochSnapshot>,
}
impl AdaptiveCancelStreakPolicy {
fn new(epoch_steps: u32) -> Self {
let arms = ADAPTIVE_STREAK_ARMS;
let mut policy = Self {
arms,
weights: [1.0; ADAPTIVE_STREAK_ARMS.len()],
probs: [0.0; ADAPTIVE_STREAK_ARMS.len()],
pulls: [0; ADAPTIVE_STREAK_ARMS.len()],
selected_arm: 2, epoch_steps: epoch_steps.max(1),
steps_in_epoch: 0,
epoch_count: 0,
reward_ema: 0.5,
e_process_log: 0.0,
epoch_start: None,
};
policy.refresh_probs();
policy
}
fn set_epoch_steps(&mut self, epoch_steps: u32) {
self.epoch_steps = epoch_steps.max(1);
}
fn current_limit(&self) -> usize {
self.arms[self.selected_arm]
}
fn refresh_probs(&mut self) {
let sum_w: f64 = self.weights.iter().sum();
let k = usize_to_f64(self.weights.len());
let uniform = 1.0 / k;
if sum_w <= f64::EPSILON {
self.probs.fill(uniform);
return;
}
for i in 0..self.weights.len() {
let exploit = self.weights[i] / sum_w;
self.probs[i] =
(1.0 - ADAPTIVE_EXP3_GAMMA).mul_add(exploit, ADAPTIVE_EXP3_GAMMA * uniform);
}
let sum_p: f64 = self.probs.iter().sum();
if sum_p > f64::EPSILON {
for p in &mut self.probs {
*p /= sum_p;
}
}
}
fn begin_epoch(&mut self, snapshot: AdaptiveEpochSnapshot) {
self.epoch_start = Some(snapshot);
}
fn on_dispatch(&mut self) -> bool {
self.steps_in_epoch = self.steps_in_epoch.saturating_add(1);
self.steps_in_epoch >= self.epoch_steps
}
fn sample_arm_from_u64(&self, sample: u64) -> usize {
#[allow(clippy::cast_precision_loss)]
let u = (sample as f64) / ((u64::MAX as f64) + 1.0);
let mut cdf = 0.0_f64;
for (idx, p) in self.probs.iter().enumerate() {
cdf += *p;
if u <= cdf || idx == self.probs.len() - 1 {
return idx;
}
}
self.probs.len() - 1
}
fn complete_epoch(&mut self, end: AdaptiveEpochSnapshot, sample: u64) -> Option<f64> {
let start = self.epoch_start?;
let reward = start.reward_against(end, self.epoch_steps);
let chosen = self.selected_arm;
let p = self.probs[chosen].clamp(1e-9, 1.0);
let k = usize_to_f64(self.weights.len());
let reward_hat = reward / p;
let exponent = (ADAPTIVE_EXP3_GAMMA * reward_hat / k).clamp(-20.0, 20.0);
self.weights[chosen] *= exponent.exp();
self.weights[chosen] = self.weights[chosen].clamp(1e-30, 1e30);
self.e_process_log += ADAPTIVE_EPROCESS_LAMBDA
.mul_add(reward - 0.5, -(ADAPTIVE_EPROCESS_LAMBDA.powi(2) / 8.0));
self.reward_ema = 0.9f64.mul_add(self.reward_ema, 0.1 * reward);
self.pulls[chosen] = self.pulls[chosen].saturating_add(1);
self.epoch_count = self.epoch_count.saturating_add(1);
self.steps_in_epoch = 0;
self.refresh_probs();
self.selected_arm = self.sample_arm_from_u64(sample);
self.epoch_start = Some(end);
Some(reward)
}
fn e_value(&self) -> f64 {
self.e_process_log.clamp(-60.0, 60.0).exp()
}
}
#[derive(Debug)]
pub(crate) struct WorkerCoordinator {
parkers: Vec<Parker>,
next_wake: CachePadded<AtomicUsize>,
mask: Option<usize>,
io_driver: Option<IoDriverHandle>,
}
impl WorkerCoordinator {
pub(crate) fn new(parkers: Vec<Parker>, io_driver: Option<IoDriverHandle>) -> Self {
let count = parkers.len();
let mask = if count > 0 && count.is_power_of_two() {
Some(count - 1)
} else {
None
};
Self {
parkers,
next_wake: CachePadded::new(AtomicUsize::new(0)),
mask,
io_driver,
}
}
#[inline]
pub(crate) fn wake_one(&self) {
let count = self.parkers.len();
if count == 0 {
return;
}
let idx = self.next_wake.fetch_add(1, Ordering::Relaxed);
let slot = self.mask.map_or_else(|| idx % count, |mask| idx & mask);
self.parkers[slot].unpark();
if let Some(io) = &self.io_driver {
let _ = io.wake();
}
}
#[inline]
pub(crate) fn wake_many(&self, num_wakes: usize) {
let count = self.parkers.len();
if count == 0 || num_wakes == 0 {
return;
}
if num_wakes >= count {
self.wake_all();
return;
}
let start_idx = self.next_wake.fetch_add(num_wakes, Ordering::Relaxed);
for i in 0..num_wakes {
let idx = start_idx.wrapping_add(i);
let slot = self.mask.map_or_else(|| idx % count, |mask| idx & mask);
self.parkers[slot].unpark();
}
if let Some(io) = &self.io_driver {
let _ = io.wake();
}
}
#[inline]
pub(crate) fn wake_worker(&self, worker_id: WorkerId) {
if let Some(parker) = self.parkers.get(worker_id) {
parker.unpark();
}
if let Some(io) = &self.io_driver {
let _ = io.wake();
}
}
#[inline]
pub(crate) fn wake_all(&self) {
for parker in &self.parkers {
parker.unpark();
}
if let Some(io) = &self.io_driver {
let _ = io.wake();
}
}
}
thread_local! {
static CURRENT_LOCAL: RefCell<Option<Arc<Mutex<PriorityScheduler>>>> =
const { RefCell::new(None) };
static CURRENT_LOCAL_READY: RefCell<Option<Arc<LocalReadyQueue>>> =
const { RefCell::new(None) };
static CURRENT_WORKER_ID: RefCell<Option<WorkerId>> = const { RefCell::new(None) };
}
#[derive(Debug)]
pub(crate) struct ScopedLocalScheduler {
prev: Option<Arc<Mutex<PriorityScheduler>>>,
}
impl ScopedLocalScheduler {
pub(crate) fn new(local: Arc<Mutex<PriorityScheduler>>) -> Self {
let prev = CURRENT_LOCAL.with(|cell| cell.replace(Some(local)));
Self { prev }
}
}
impl Drop for ScopedLocalScheduler {
fn drop(&mut self) {
let prev = self.prev.take();
CURRENT_LOCAL.with(|cell| {
*cell.borrow_mut() = prev;
});
}
}
pub(crate) struct ScopedWorkerId {
prev: Option<WorkerId>,
}
impl ScopedWorkerId {
pub(crate) fn new(id: WorkerId) -> Self {
let prev = CURRENT_WORKER_ID.with(|cell| cell.replace(Some(id)));
Self { prev }
}
}
impl Drop for ScopedWorkerId {
fn drop(&mut self) {
let prev = self.prev.take();
let _ = CURRENT_WORKER_ID.try_with(|cell| {
*cell.borrow_mut() = prev;
});
}
}
pub(crate) struct ScopedLocalReady {
prev: Option<Arc<LocalReadyQueue>>,
}
impl ScopedLocalReady {
pub(crate) fn new(queue: Arc<LocalReadyQueue>) -> Self {
let prev = CURRENT_LOCAL_READY.with(|cell| cell.replace(Some(queue)));
Self { prev }
}
}
impl Drop for ScopedLocalReady {
fn drop(&mut self) {
CURRENT_LOCAL_READY.with(|cell| {
*cell.borrow_mut() = self.prev.take();
});
}
}
#[inline]
pub(crate) fn schedule_local_task(task: TaskId) -> bool {
CURRENT_LOCAL_READY.with(|cell| {
cell.borrow().as_ref().is_some_and(|queue| {
queue.lock().push_back(task);
true
})
})
}
#[inline]
pub(crate) fn current_worker_id() -> Option<WorkerId> {
CURRENT_WORKER_ID.with(|cell| *cell.borrow())
}
fn has_trapped_scc(adjacency: &[Vec<usize>]) -> bool {
struct Tarjan<'a> {
adjacency: &'a [Vec<usize>],
index: usize,
stack: Vec<usize>,
on_stack: Vec<bool>,
indices: Vec<Option<usize>>,
lowlink: Vec<usize>,
trapped: bool,
}
impl Tarjan<'_> {
fn strongconnect(&mut self, v: usize) {
self.indices[v] = Some(self.index);
self.lowlink[v] = self.index;
self.index += 1;
self.stack.push(v);
self.on_stack[v] = true;
for &w in &self.adjacency[v] {
if self.indices[w].is_none() {
self.strongconnect(w);
self.lowlink[v] = self.lowlink[v].min(self.lowlink[w]);
} else if self.on_stack[w] {
self.lowlink[v] = self.lowlink[v].min(self.indices[w].unwrap_or(usize::MAX));
}
}
if self.lowlink[v] == self.indices[v].unwrap_or(usize::MAX) {
let mut component = Vec::new();
while let Some(w) = self.stack.pop() {
self.on_stack[w] = false;
component.push(w);
if w == v {
break;
}
}
let cyclic = component.len() > 1
|| component
.first()
.is_some_and(|n| self.adjacency[*n].contains(n));
if cyclic {
let component_set: BTreeSet<usize> = component.iter().copied().collect();
let mut has_egress = false;
for &u in &component {
if self.adjacency[u].iter().any(|v| !component_set.contains(v)) {
has_egress = true;
break;
}
}
if !has_egress {
self.trapped = true;
}
}
}
}
}
let n = adjacency.len();
let mut tarjan = Tarjan {
adjacency,
index: 0,
stack: Vec::new(),
on_stack: vec![false; n],
indices: vec![None; n],
lowlink: vec![0; n],
trapped: false,
};
for v in 0..n {
if tarjan.indices[v].is_none() {
tarjan.strongconnect(v);
if tarjan.trapped {
return true;
}
}
}
false
}
fn wait_graph_signals_from_state(state: &RuntimeState) -> (usize, Vec<(usize, usize)>, bool) {
let mut tasks: Vec<TaskId> = state
.tasks_iter()
.filter_map(|(_, task)| (!task.state.is_terminal()).then_some(task.id))
.collect();
tasks.sort();
let index_by_task: BTreeMap<TaskId, usize> = tasks
.iter()
.enumerate()
.map(|(idx, id)| (*id, idx))
.collect();
let mut undirected_edges: BTreeSet<(usize, usize)> = BTreeSet::new();
let mut adjacency = vec![Vec::new(); tasks.len()];
for (_, task) in state.tasks_iter() {
if task.state.is_terminal() {
continue;
}
let Some(&task_idx) = index_by_task.get(&task.id) else {
continue;
};
for waiter in &task.waiters {
if let Some(&waiter_idx) = index_by_task.get(waiter) {
adjacency[waiter_idx].push(task_idx);
if waiter_idx == task_idx {
continue;
}
undirected_edges.insert(if waiter_idx < task_idx {
(waiter_idx, task_idx)
} else {
(task_idx, waiter_idx)
});
}
}
}
for edges in &mut adjacency {
edges.sort_unstable();
edges.dedup();
}
let trapped_cycle = has_trapped_scc(&adjacency);
(
tasks.len(),
undirected_edges.into_iter().collect(),
trapped_cycle,
)
}
#[inline]
pub(crate) fn schedule_on_current_local(task: TaskId, priority: u8) -> bool {
if LocalQueue::schedule_local(task) {
return true;
}
CURRENT_LOCAL.with(|cell| {
if let Some(local) = cell.borrow().as_ref() {
local.lock().schedule(task, priority);
return true;
}
false
})
}
#[inline]
pub(crate) fn schedule_cancel_on_current_local(task: TaskId, priority: u8) -> bool {
CURRENT_LOCAL.with(|cell| {
let borrow = cell.borrow();
let Some(local) = borrow.as_ref() else {
return false;
};
CURRENT_LOCAL_READY.with(|lr_cell| {
if let Some(queue) = lr_cell.borrow().as_ref() {
let mut local_ready_guard = queue.lock();
if let Some(pos) = local_ready_guard.iter().position(|t| *t == task) {
local_ready_guard.remove(pos);
}
drop(local_ready_guard);
local.lock().move_to_cancel_lane(task, priority);
} else {
local.lock().move_to_cancel_lane(task, priority);
}
});
true
})
}
#[derive(Debug)]
pub struct ThreeLaneScheduler {
global: Arc<GlobalInjector>,
local_schedulers: Vec<Arc<Mutex<PriorityScheduler>>>,
local_ready: Vec<Arc<LocalReadyQueue>>,
parkers: Vec<Parker>,
workers: Vec<ThreeLaneWorker>,
shutdown: Arc<AtomicBool>,
coordinator: Arc<WorkerCoordinator>,
browser_ready_handoff_limit: usize,
steal_batch_size: usize,
enable_parking: bool,
#[allow(dead_code)] timer_driver: Option<TimerDriverHandle>,
state: Arc<ContendedMutex<RuntimeState>>,
task_table: Option<Arc<ContendedMutex<TaskTable>>>,
global_queue_limit: usize,
}
impl ThreeLaneScheduler {
#[inline]
fn initial_local_scheduler_capacity(worker_count: usize) -> usize {
let workers = worker_count.max(1);
let per_worker = LOCAL_SCHEDULER_BURST_BUDGET.div_ceil(workers);
per_worker.clamp(LOCAL_SCHEDULER_MIN_CAPACITY, LOCAL_SCHEDULER_MAX_CAPACITY)
}
pub fn new(worker_count: usize, state: &Arc<ContendedMutex<RuntimeState>>) -> Self {
Self::new_with_options(worker_count, state, DEFAULT_CANCEL_STREAK_LIMIT, false, 32)
}
pub fn new_with_cancel_limit(
worker_count: usize,
state: &Arc<ContendedMutex<RuntimeState>>,
cancel_streak_limit: usize,
) -> Self {
Self::new_with_options(worker_count, state, cancel_streak_limit, false, 32)
}
pub fn new_with_options(
worker_count: usize,
state: &Arc<ContendedMutex<RuntimeState>>,
cancel_streak_limit: usize,
enable_governor: bool,
governor_interval: u32,
) -> Self {
Self::new_with_options_and_task_table(
worker_count,
state,
None,
cancel_streak_limit,
enable_governor,
governor_interval,
)
}
#[allow(clippy::too_many_lines)]
pub fn new_with_options_and_task_table(
worker_count: usize,
state: &Arc<ContendedMutex<RuntimeState>>,
task_table: Option<Arc<ContendedMutex<TaskTable>>>,
cancel_streak_limit: usize,
enable_governor: bool,
governor_interval: u32,
) -> Self {
let cancel_streak_limit = cancel_streak_limit.max(1);
let browser_ready_handoff_limit = DEFAULT_BROWSER_READY_HANDOFF_LIMIT;
let governor_interval = governor_interval.max(1);
let steal_batch_size = DEFAULT_STEAL_BATCH_SIZE;
let enable_parking = DEFAULT_ENABLE_PARKING;
let global = Arc::new(GlobalInjector::new());
let shutdown = Arc::new(AtomicBool::new(false));
let mut workers = Vec::with_capacity(worker_count);
let mut parkers = Vec::with_capacity(worker_count);
let mut local_schedulers: Vec<Arc<Mutex<PriorityScheduler>>> =
Vec::with_capacity(worker_count);
let mut local_ready: Vec<Arc<LocalReadyQueue>> = Vec::with_capacity(worker_count);
let local_scheduler_capacity = Self::initial_local_scheduler_capacity(worker_count);
let (io_driver, timer_driver) = {
let guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
(guard.io_driver_handle(), guard.timer_driver_handle())
};
for _ in 0..worker_count {
local_schedulers.push(Arc::new(Mutex::new(PriorityScheduler::with_capacity(
local_scheduler_capacity,
))));
}
for _ in 0..worker_count {
local_ready.push(Arc::new(LocalReadyQueue::new(VecDeque::with_capacity(32))));
}
for _ in 0..worker_count {
parkers.push(Parker::new());
}
let coordinator = Arc::new(WorkerCoordinator::new(parkers.clone(), io_driver.clone()));
let fast_queues: Vec<LocalQueue> = (0..worker_count)
.map(|_| {
task_table.as_ref().map_or_else(
|| LocalQueue::new(Arc::clone(state)),
|tt| LocalQueue::new_with_task_table(Arc::clone(tt)),
)
})
.collect();
for id in 0..worker_count {
let parker = parkers[id].clone();
let stealers: Vec<_> = local_schedulers
.iter()
.enumerate()
.filter(|(i, _)| *i != id)
.map(|(_, sched)| Arc::clone(sched))
.collect();
let fast_stealers: Vec<_> = fast_queues
.iter()
.enumerate()
.filter(|(i, _)| *i != id)
.map(|(_, q)| q.stealer())
.collect();
workers.push(ThreeLaneWorker {
id,
local: Arc::clone(&local_schedulers[id]),
stealers,
fast_queue: fast_queues[id].clone(),
fast_stealers,
local_ready: Arc::clone(&local_ready[id]),
all_local_ready: local_ready.clone(),
global: Arc::clone(&global),
state: Arc::clone(state),
task_table: task_table.clone(),
parker,
coordinator: Arc::clone(&coordinator),
rng: DetRng::new(id as u64),
shutdown: Arc::clone(&shutdown),
io_driver: io_driver.clone(),
timer_driver: timer_driver.clone(),
steal_buffer: Vec::with_capacity(steal_batch_size.max(1)),
steal_batch_size,
enable_parking,
cancel_streak: 0,
ready_dispatch_streak: 0,
browser_ready_handoff_limit,
cancel_streak_limit,
governor: if enable_governor {
Some(LyapunovGovernor::with_defaults())
} else {
None
},
cached_suggestion: SchedulingSuggestion::NoPreference,
steps_since_snapshot: 0,
governor_interval,
preemption_metrics: PreemptionMetrics {
adaptive_current_limit: cancel_streak_limit,
adaptive_e_value: 1.0,
..PreemptionMetrics::default()
},
evidence_sink: None,
decision_contract: if enable_governor {
Some(super::decision_contract::SchedulerDecisionContract::new())
} else {
None
},
decision_posterior: if enable_governor {
Some(franken_decision::Posterior::uniform(
super::decision_contract::state::COUNT,
))
} else {
None
},
adaptive_cancel_policy: None,
spectral_monitor: if enable_governor {
Some(SpectralHealthMonitor::new(SpectralThresholds::default()))
} else {
None
},
drain_certificate: if enable_governor {
Some(ProgressCertificate::with_defaults())
} else {
None
},
decision_sequence: 0,
fairness_monitor: Mutex::new(FairnessMonitor::with_defaults()),
invariant_monitor: Mutex::new(
super::invariant_monitor::SchedulerInvariantMonitor::with_defaults(),
),
});
}
Self {
global,
local_schedulers,
local_ready,
parkers,
workers,
shutdown,
coordinator,
timer_driver,
state: Arc::clone(state),
task_table,
browser_ready_handoff_limit,
steal_batch_size,
enable_parking,
global_queue_limit: 0,
}
}
pub fn set_steal_batch_size(&mut self, size: usize) {
let size = size.max(1);
self.steal_batch_size = size;
for worker in &mut self.workers {
worker.steal_batch_size = size;
if worker.steal_buffer.capacity() < size {
worker
.steal_buffer
.reserve(size - worker.steal_buffer.capacity());
}
}
}
pub fn set_enable_parking(&mut self, enable: bool) {
self.enable_parking = enable;
for worker in &mut self.workers {
worker.enable_parking = enable;
}
}
pub fn set_browser_ready_handoff_limit(&mut self, limit: usize) {
self.browser_ready_handoff_limit = limit;
for worker in &mut self.workers {
worker.browser_ready_handoff_limit = limit;
if limit == 0 {
worker.ready_dispatch_streak = 0;
}
}
}
pub fn set_adaptive_cancel_streak(&mut self, enable: bool, epoch_steps: u32) {
let epoch_steps = epoch_steps.max(1);
for worker in &mut self.workers {
if enable {
if let Some(policy) = worker.adaptive_cancel_policy.as_mut() {
policy.set_epoch_steps(epoch_steps);
} else {
worker.adaptive_cancel_policy =
Some(AdaptiveCancelStreakPolicy::new(epoch_steps));
}
if let Some(policy) = worker.adaptive_cancel_policy.as_ref() {
worker.preemption_metrics.adaptive_current_limit = policy.current_limit();
worker.preemption_metrics.adaptive_reward_ema = policy.reward_ema;
worker.preemption_metrics.adaptive_e_value = policy.e_value();
}
} else {
worker.adaptive_cancel_policy = None;
worker.preemption_metrics.adaptive_current_limit = worker.cancel_streak_limit;
worker.preemption_metrics.adaptive_reward_ema = 0.0;
worker.preemption_metrics.adaptive_e_value = 1.0;
}
}
}
pub fn set_global_queue_limit(&mut self, limit: usize) {
self.global_queue_limit = limit;
}
#[must_use]
pub fn global_injector(&self) -> Arc<GlobalInjector> {
self.global.clone()
}
#[inline]
fn with_task_table_ref<R, F: FnOnce(&TaskTable) -> R>(&self, f: F) -> R {
if let Some(tt) = &self.task_table {
let guard = tt.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
f(&guard)
} else {
let state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
f(&state.tasks)
}
}
pub fn inject_cancel(&self, task: TaskId, priority: u8) {
let (is_local, pinned_worker) = self.with_task_table_ref(|tt| {
tt.task(task).map_or((false, None), |record| {
if record.is_local() {
record.wake_state.notify();
}
(record.is_local(), record.pinned_worker())
})
});
if is_local {
if let Some(worker_id) = pinned_worker {
if let Some(local) = self.local_schedulers.get(worker_id) {
if let Some(local_ready) = self.local_ready.get(worker_id) {
let mut local_ready_guard = local_ready.lock();
if let Some(pos) = local_ready_guard.iter().position(|t| *t == task) {
local_ready_guard.remove(pos);
}
drop(local_ready_guard);
local.lock().move_to_cancel_lane(task, priority);
} else {
local.lock().move_to_cancel_lane(task, priority);
}
if let Some(parker) = self.parkers.get(worker_id) {
parker.unpark();
}
return;
}
}
if schedule_cancel_on_current_local(task, priority) {
return;
}
debug_assert!(
false,
"Attempted to inject_cancel local task {task:?} without owner worker"
);
error!(
?task,
"inject_cancel: cannot route local task to owner worker, cancel skipped"
);
return;
}
self.global.inject_cancel(task, priority);
self.wake_one();
}
pub fn inject_timed(&self, task: TaskId, deadline: Time) {
let should_schedule = self.with_task_table_ref(|tt| {
tt.task(task)
.is_none_or(|record| record.wake_state.notify())
});
if should_schedule {
self.global.inject_timed(task, deadline);
self.wake_one();
}
}
#[inline]
fn inject_global_ready_checked(&self, task: TaskId, priority: u8) {
if self.global_queue_limit > 0 && self.global.ready_count() >= self.global_queue_limit {
crate::tracing_compat::warn!(
?task,
priority,
limit = self.global_queue_limit,
current = self.global.ready_count(),
"inject_ready: global ready queue at capacity, scheduling anyway"
);
}
self.global.inject_ready(task, priority);
self.wake_one();
}
pub fn inject_ready(&self, task: TaskId, priority: u8) {
let (should_schedule, is_local) = self.with_task_table_ref(|tt| {
tt.task(task).map_or((true, false), |record| {
(record.wake_state.notify(), record.is_local())
})
});
debug_assert!(
!is_local,
"Attempted to globally inject local task {task:?}. Local tasks must be scheduled on their owner thread."
);
if is_local {
error!(
?task,
"inject_ready: refusing to globally inject local task, scheduling skipped"
);
return;
}
if should_schedule {
self.inject_global_ready_checked(task, priority);
trace!(
?task,
priority, "inject_ready: task injected into global ready queue"
);
} else {
trace!(
?task,
priority, "inject_ready: task NOT scheduled (should_schedule=false)"
);
}
}
pub fn spawn(&self, task: TaskId, priority: u8) {
let (should_schedule, is_local, pinned_worker) = self.with_task_table_ref(|tt| {
tt.task(task).map_or((true, false, None), |record| {
(
record.wake_state.notify(),
record.is_local(),
record.pinned_worker(),
)
})
});
if !should_schedule {
return;
}
if is_local {
let current_worker = current_worker_id();
let is_pinned_here = match (pinned_worker, current_worker) {
(Some(pw), Some(cw)) => pw == cw,
(None, Some(_)) => true,
_ => false,
};
if is_pinned_here && schedule_local_task(task) {
return;
}
if let Some(worker_id) = pinned_worker {
if let Some(queue) = self.local_ready.get(worker_id) {
queue.lock().push_back(task);
self.coordinator.wake_worker(worker_id);
return;
}
}
debug_assert!(
false,
"Attempted to spawn local task {task:?} from non-owner thread or outside worker context"
);
error!(
?task,
"spawn: local task cannot be scheduled from non-owner thread, spawn skipped"
);
return;
}
if schedule_on_current_local(task, priority) {
return;
}
self.inject_global_ready_checked(task, priority);
}
pub fn wake(&self, task: TaskId, priority: u8) {
let (should_schedule, is_local, pinned_worker) = self.with_task_table_ref(|tt| {
tt.task(task).map_or((true, false, None), |record| {
(
record.wake_state.notify(),
record.is_local(),
record.pinned_worker(),
)
})
});
if !should_schedule {
return;
}
if is_local {
let current_worker = current_worker_id();
let is_pinned_here = match (pinned_worker, current_worker) {
(Some(pw), Some(cw)) => pw == cw,
(None, Some(_)) => true,
_ => false,
};
if is_pinned_here && schedule_local_task(task) {
return;
}
if let Some(worker_id) = pinned_worker {
if let Some(queue) = self.local_ready.get(worker_id) {
queue.lock().push_back(task);
self.coordinator.wake_worker(worker_id);
return;
}
}
debug_assert!(
false,
"Attempted to wake local task {task:?} via scheduler from non-owner thread. Use Waker instead."
);
error!(
?task,
"wake: local task cannot be woken from non-owner thread, wake skipped"
);
return;
}
if schedule_on_current_local(task, priority) {
return;
}
self.inject_global_ready_checked(task, priority);
}
#[inline]
fn wake_one(&self) {
self.coordinator.wake_one();
}
pub fn wake_all(&self) {
self.coordinator.wake_all();
}
pub fn take_workers(&mut self) -> Vec<ThreeLaneWorker> {
std::mem::take(&mut self.workers)
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
self.wake_all();
}
#[must_use]
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
}
#[derive(Debug)]
pub struct ThreeLaneWorker {
pub id: WorkerId,
pub local: Arc<Mutex<PriorityScheduler>>,
pub stealers: Vec<Arc<Mutex<PriorityScheduler>>>,
pub fast_queue: LocalQueue,
fast_stealers: Vec<local_queue::Stealer>,
local_ready: Arc<LocalReadyQueue>,
all_local_ready: Vec<Arc<LocalReadyQueue>>,
pub global: Arc<GlobalInjector>,
pub state: Arc<ContendedMutex<RuntimeState>>,
pub task_table: Option<Arc<ContendedMutex<TaskTable>>>,
pub parker: Parker,
pub(crate) coordinator: Arc<WorkerCoordinator>,
pub rng: DetRng,
pub shutdown: Arc<AtomicBool>,
pub io_driver: Option<IoDriverHandle>,
pub timer_driver: Option<TimerDriverHandle>,
steal_buffer: Vec<(TaskId, u8)>,
steal_batch_size: usize,
enable_parking: bool,
cancel_streak: usize,
ready_dispatch_streak: usize,
browser_ready_handoff_limit: usize,
cancel_streak_limit: usize,
governor: Option<LyapunovGovernor>,
cached_suggestion: SchedulingSuggestion,
steps_since_snapshot: u32,
governor_interval: u32,
preemption_metrics: PreemptionMetrics,
evidence_sink: Option<Arc<dyn crate::evidence_sink::EvidenceSink>>,
decision_contract: Option<super::decision_contract::SchedulerDecisionContract>,
decision_posterior: Option<franken_decision::Posterior>,
adaptive_cancel_policy: Option<AdaptiveCancelStreakPolicy>,
spectral_monitor: Option<SpectralHealthMonitor>,
drain_certificate: Option<ProgressCertificate>,
decision_sequence: u64,
fairness_monitor: Mutex<FairnessMonitor>,
invariant_monitor: Mutex<super::invariant_monitor::SchedulerInvariantMonitor>,
}
#[derive(Debug, Clone, Default)]
pub struct PreemptionMetrics {
pub cancel_dispatches: u64,
pub timed_dispatches: u64,
pub ready_dispatches: u64,
pub browser_ready_handoff_yields: u64,
pub fairness_yields: u64,
pub max_ready_dispatch_stall: usize,
pub max_timed_dispatch_stall: usize,
pub ready_priority_inversions: u64,
pub max_ready_priority_inversion_gap: u8,
pub max_cancel_streak: usize,
pub fallback_cancel_dispatches: u64,
pub base_limit_exceedances: u64,
pub effective_limit_exceedances: u64,
pub max_effective_limit_observed: usize,
pub adaptive_epochs: u64,
pub adaptive_current_limit: usize,
pub adaptive_reward_ema: f64,
pub adaptive_e_value: f64,
pub backoff_parks_total: u64,
pub backoff_timeout_parks_total: u64,
pub backoff_indefinite_parks: u64,
pub backoff_timeout_nanos_total: u64,
pub short_wait_le_5ms: u64,
pub follower_shared_deadline_ignored: u64,
pub follower_timeout_parks: u64,
pub follower_indefinite_parks: u64,
pub follower_short_wait_skip_le_5ms: u64,
}
impl PreemptionMetrics {
const RATIO_BPS_SCALE: u64 = 10_000;
#[inline]
fn ratio_bps(numerator: u64, denominator: u64) -> u16 {
if denominator == 0 {
return 0;
}
let raw = numerator
.saturating_mul(Self::RATIO_BPS_SCALE)
.saturating_div(denominator)
.min(Self::RATIO_BPS_SCALE);
raw as u16
}
#[must_use]
pub fn avg_timeout_park_nanos(&self) -> u64 {
if self.backoff_timeout_parks_total == 0 {
return 0;
}
self.backoff_timeout_nanos_total
.saturating_div(self.backoff_timeout_parks_total)
}
#[must_use]
pub fn short_wait_ratio_bps(&self) -> u16 {
Self::ratio_bps(self.short_wait_le_5ms, self.backoff_timeout_parks_total)
}
#[must_use]
pub fn follower_short_wait_avoidance_bps(&self) -> u16 {
let opportunities = self
.follower_short_wait_skip_le_5ms
.saturating_add(self.follower_timeout_parks);
Self::ratio_bps(self.follower_short_wait_skip_le_5ms, opportunities)
}
#[must_use]
pub fn max_non_cancel_dispatch_stall(&self) -> usize {
self.max_ready_dispatch_stall
.max(self.max_timed_dispatch_stall)
}
}
#[derive(Debug, Clone)]
pub struct FairnessConfig {
pub starvation_threshold_ns: u64,
pub analysis_window_size: usize,
pub priority_inversion_threshold: u8,
pub max_tracked_tasks: usize,
pub enable_per_task_tracking: bool,
}
impl Default for FairnessConfig {
fn default() -> Self {
Self {
starvation_threshold_ns: 100_000_000, analysis_window_size: 1000,
priority_inversion_threshold: 5,
max_tracked_tasks: 10_000,
enable_per_task_tracking: true,
}
}
}
#[derive(Debug, Clone)]
struct TaskStarvationInfo {
task_id: TaskId,
priority: u8,
enqueue_time_ns: u64,
skip_count: u32,
last_skip_time_ns: u64,
current_lane: u8,
total_wait_time_ns: u64,
}
impl TaskStarvationInfo {
fn new(task_id: TaskId, priority: u8, current_time_ns: u64, lane: u8) -> Self {
Self {
task_id,
priority,
enqueue_time_ns: current_time_ns,
skip_count: 0,
last_skip_time_ns: 0,
current_lane: lane,
total_wait_time_ns: 0,
}
}
fn record_skip(&mut self, current_time_ns: u64) {
self.skip_count = self.skip_count.saturating_add(1);
self.last_skip_time_ns = current_time_ns;
self.total_wait_time_ns = self.current_wait_time_ns(current_time_ns);
}
fn current_wait_time_ns(&self, current_time_ns: u64) -> u64 {
current_time_ns.saturating_sub(self.enqueue_time_ns)
}
fn is_starved(&self, threshold_ns: u64, current_time_ns: u64) -> bool {
self.current_wait_time_ns(current_time_ns) >= threshold_ns
}
}
#[derive(Debug, Clone)]
struct PriorityInversionEvent {
blocked_task_id: TaskId,
blocked_priority: u8,
executing_task_id: TaskId,
executing_priority: u8,
timestamp_ns: u64,
duration_ns: u64,
}
#[derive(Debug, Clone)]
struct StarvationAnalysisWindow {
events: Vec<u64>,
write_pos: usize,
total_events: u64,
size: usize,
}
impl StarvationAnalysisWindow {
fn new(size: usize) -> Self {
Self {
events: vec![0; size.max(1)],
write_pos: 0,
size: size.max(1),
total_events: 0,
}
}
fn record_event(&mut self, timestamp_ns: u64) {
self.events[self.write_pos] = timestamp_ns;
self.write_pos = (self.write_pos + 1) % self.size;
self.total_events = self.total_events.saturating_add(1);
}
fn events_in_window(&self, window_duration_ns: u64, current_time_ns: u64) -> u32 {
let threshold_time = current_time_ns.saturating_sub(window_duration_ns);
let mut count = 0;
let recorded_events = usize::try_from(self.total_events)
.unwrap_or(usize::MAX)
.min(self.size);
for &event_time in self.events.iter().take(recorded_events) {
if event_time >= threshold_time && event_time <= current_time_ns {
count += 1;
}
}
count
}
fn is_pattern_detected(
&self,
min_events: u32,
window_duration_ns: u64,
current_time_ns: u64,
) -> bool {
self.events_in_window(window_duration_ns, current_time_ns) >= min_events
}
}
#[derive(Debug)]
pub struct FairnessMonitor {
config: FairnessConfig,
tracked_tasks: std::collections::HashMap<TaskId, TaskStarvationInfo>,
priority_inversions: Vec<PriorityInversionEvent>,
starvation_window: StarvationAnalysisWindow,
total_starvation_events: u64,
total_priority_inversions: u64,
max_task_wait_time_ns: u64,
last_cleanup_time_ns: u64,
}
impl FairnessMonitor {
#[must_use]
pub fn new(config: FairnessConfig) -> Self {
let window_size = config.analysis_window_size;
Self {
config,
tracked_tasks: std::collections::HashMap::new(),
priority_inversions: Vec::new(),
starvation_window: StarvationAnalysisWindow::new(window_size),
total_starvation_events: 0,
total_priority_inversions: 0,
max_task_wait_time_ns: 0,
last_cleanup_time_ns: 0,
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(FairnessConfig::default())
}
pub fn record_task_enqueue(
&mut self,
task_id: TaskId,
priority: u8,
current_time_ns: u64,
lane: u8,
) {
if !self.config.enable_per_task_tracking {
return;
}
self.cleanup_if_needed(current_time_ns);
if self.tracked_tasks.len() >= self.config.max_tracked_tasks {
if let Some((oldest_task_id, _)) = self
.tracked_tasks
.iter()
.min_by_key(|(_, info)| info.enqueue_time_ns)
.map(|(id, info)| (*id, info.clone()))
{
self.tracked_tasks.remove(&oldest_task_id);
}
}
let info = TaskStarvationInfo::new(task_id, priority, current_time_ns, lane);
self.tracked_tasks.insert(task_id, info);
}
pub fn record_task_dispatch(&mut self, task_id: TaskId, current_time_ns: u64) -> Option<u64> {
if let Some(info) = self.tracked_tasks.remove(&task_id) {
let wait_time = info.current_wait_time_ns(current_time_ns);
if wait_time > self.max_task_wait_time_ns {
self.max_task_wait_time_ns = wait_time;
}
Some(wait_time)
} else {
None
}
}
pub fn record_task_skip(
&mut self,
skipped_task_id: TaskId,
executing_task_id: TaskId,
executing_priority: u8,
current_time_ns: u64,
) {
let (should_record_starvation, should_record_inversion, blocked_priority) = {
if let Some(info) = self.tracked_tasks.get_mut(&skipped_task_id) {
info.record_skip(current_time_ns);
let is_starved =
info.is_starved(self.config.starvation_threshold_ns, current_time_ns);
let is_inversion = info.priority > executing_priority;
let priority = info.priority;
(is_starved, is_inversion, priority)
} else {
(false, false, 0)
}
};
if should_record_starvation {
self.record_starvation_event(current_time_ns);
}
if should_record_inversion {
self.record_priority_inversion(
skipped_task_id,
blocked_priority,
executing_task_id,
executing_priority,
current_time_ns,
);
}
}
fn record_starvation_event(&mut self, timestamp_ns: u64) {
self.total_starvation_events = self.total_starvation_events.saturating_add(1);
self.starvation_window.record_event(timestamp_ns);
}
fn record_priority_inversion(
&mut self,
blocked_task: TaskId,
blocked_priority: u8,
executing_task: TaskId,
executing_priority: u8,
timestamp_ns: u64,
) {
self.total_priority_inversions = self.total_priority_inversions.saturating_add(1);
let inversion = PriorityInversionEvent {
blocked_task_id: blocked_task,
blocked_priority,
executing_task_id: executing_task,
executing_priority,
timestamp_ns,
duration_ns: 0, };
self.priority_inversions.push(inversion);
const MAX_TRACKED_INVERSIONS: usize = 1000;
if self.priority_inversions.len() > MAX_TRACKED_INVERSIONS {
self.priority_inversions
.drain(0..self.priority_inversions.len() - MAX_TRACKED_INVERSIONS);
}
}
#[must_use]
pub fn detect_starvation_pattern(&self, current_time_ns: u64) -> bool {
const PATTERN_WINDOW_NS: u64 = 1_000_000_000; const MIN_EVENTS_FOR_PATTERN: u32 = 10;
self.starvation_window.is_pattern_detected(
MIN_EVENTS_FOR_PATTERN,
PATTERN_WINDOW_NS,
current_time_ns,
)
}
#[must_use]
pub fn count_starved_tasks(&self, current_time_ns: u64) -> u32 {
self.tracked_tasks
.values()
.filter(|info| info.is_starved(self.config.starvation_threshold_ns, current_time_ns))
.count() as u32
}
#[must_use]
pub fn starvation_stats(&self, current_time_ns: u64) -> StarvationStats {
let currently_starved = self.count_starved_tasks(current_time_ns);
let total_tracked_wait_time_ns = self
.tracked_tasks
.values()
.map(|info| {
info.total_wait_time_ns
.max(info.current_wait_time_ns(current_time_ns))
})
.sum::<u64>();
let avg_wait_time_ns = if self.tracked_tasks.is_empty() {
0
} else {
total_tracked_wait_time_ns / self.tracked_tasks.len() as u64
};
let oldest_tracked_task = self
.tracked_tasks
.values()
.max_by_key(|info| info.current_wait_time_ns(current_time_ns))
.map(|info| StarvedTaskSummary {
task_id: info.task_id,
priority: info.priority,
current_lane: info.current_lane,
skip_count: info.skip_count,
wait_time_ns: info.current_wait_time_ns(current_time_ns),
total_wait_time_ns: info
.total_wait_time_ns
.max(info.current_wait_time_ns(current_time_ns)),
});
let latest_priority_inversion =
self.priority_inversions
.last()
.map(|event| PriorityInversionSummary {
blocked_task_id: event.blocked_task_id,
blocked_priority: event.blocked_priority,
executing_task_id: event.executing_task_id,
executing_priority: event.executing_priority,
priority_gap: event
.blocked_priority
.saturating_sub(event.executing_priority),
timestamp_ns: event.timestamp_ns,
duration_ns: event.duration_ns,
});
let max_priority_inversion_gap = self
.priority_inversions
.iter()
.map(|event| {
event
.blocked_priority
.saturating_sub(event.executing_priority)
})
.max()
.unwrap_or(0);
StarvationStats {
total_starvation_events: self.total_starvation_events,
currently_starved_tasks: currently_starved,
max_task_wait_time_ns: self.max_task_wait_time_ns,
avg_task_wait_time_ns: avg_wait_time_ns,
total_priority_inversions: self.total_priority_inversions,
tracked_tasks_count: self.tracked_tasks.len() as u32,
pattern_detected: self.detect_starvation_pattern(current_time_ns),
total_tracked_wait_time_ns,
oldest_tracked_task,
max_priority_inversion_gap,
latest_priority_inversion,
}
}
fn cleanup_if_needed(&mut self, current_time_ns: u64) {
const CLEANUP_INTERVAL_NS: u64 = 60_000_000_000; const MAX_TASK_AGE_NS: u64 = 300_000_000_000;
if current_time_ns.saturating_sub(self.last_cleanup_time_ns) < CLEANUP_INTERVAL_NS {
return;
}
self.last_cleanup_time_ns = current_time_ns;
let cutoff_time = current_time_ns.saturating_sub(MAX_TASK_AGE_NS);
self.tracked_tasks
.retain(|_, info| info.enqueue_time_ns >= cutoff_time);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StarvedTaskSummary {
pub task_id: TaskId,
pub priority: u8,
pub current_lane: u8,
pub skip_count: u32,
pub wait_time_ns: u64,
pub total_wait_time_ns: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PriorityInversionSummary {
pub blocked_task_id: TaskId,
pub blocked_priority: u8,
pub executing_task_id: TaskId,
pub executing_priority: u8,
pub priority_gap: u8,
pub timestamp_ns: u64,
pub duration_ns: u64,
}
#[derive(Debug, Clone, Default)]
pub struct StarvationStats {
pub total_starvation_events: u64,
pub currently_starved_tasks: u32,
pub max_task_wait_time_ns: u64,
pub avg_task_wait_time_ns: u64,
pub total_priority_inversions: u64,
pub tracked_tasks_count: u32,
pub pattern_detected: bool,
pub total_tracked_wait_time_ns: u64,
pub oldest_tracked_task: Option<StarvedTaskSummary>,
pub max_priority_inversion_gap: u8,
pub latest_priority_inversion: Option<PriorityInversionSummary>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PreemptionFairnessCertificate {
pub base_limit: usize,
pub effective_limit: usize,
pub observed_max_cancel_streak: usize,
pub cancel_dispatches: u64,
pub timed_dispatches: u64,
pub ready_dispatches: u64,
pub fairness_yields: u64,
pub observed_max_ready_stall_steps: usize,
pub observed_max_timed_stall_steps: usize,
pub ready_priority_inversions: u64,
pub max_ready_priority_inversion_gap: u8,
pub fallback_cancel_dispatches: u64,
pub base_limit_exceedances: u64,
pub effective_limit_exceedances: u64,
pub adaptive_enabled: bool,
pub adaptive_current_limit: usize,
}
impl PreemptionFairnessCertificate {
#[must_use]
pub fn ready_stall_bound_steps(&self) -> usize {
self.effective_limit.saturating_add(1)
}
#[must_use]
pub fn observed_non_cancel_stall_steps(&self) -> usize {
self.observed_max_ready_stall_steps
.max(self.observed_max_timed_stall_steps)
}
#[must_use]
pub fn invariant_holds(&self) -> bool {
self.effective_limit_exceedances == 0
&& self.observed_max_cancel_streak <= self.effective_limit
&& self.ready_priority_inversions == 0
}
#[must_use]
pub fn witness_hash(&self) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = DetHasher::default();
self.base_limit.hash(&mut h);
self.effective_limit.hash(&mut h);
self.observed_max_cancel_streak.hash(&mut h);
self.cancel_dispatches.hash(&mut h);
self.timed_dispatches.hash(&mut h);
self.ready_dispatches.hash(&mut h);
self.fairness_yields.hash(&mut h);
self.observed_max_ready_stall_steps.hash(&mut h);
self.observed_max_timed_stall_steps.hash(&mut h);
self.ready_priority_inversions.hash(&mut h);
self.max_ready_priority_inversion_gap.hash(&mut h);
self.fallback_cancel_dispatches.hash(&mut h);
self.base_limit_exceedances.hash(&mut h);
self.effective_limit_exceedances.hash(&mut h);
self.adaptive_enabled.hash(&mut h);
self.adaptive_current_limit.hash(&mut h);
h.finish()
}
}
impl ThreeLaneWorker {
#[inline]
fn current_time_ns(&self) -> u64 {
self.timer_driver
.as_ref()
.map_or(0, |timer| timer.now().as_nanos())
}
pub fn with_fairness_monitor<T>(&self, f: impl FnOnce(&FairnessMonitor) -> T) -> T {
f(&self.fairness_monitor.lock())
}
#[must_use]
pub fn starvation_stats(&self) -> StarvationStats {
let current_time = self.current_time_ns();
self.fairness_monitor.lock().starvation_stats(current_time)
}
#[must_use]
pub fn invariant_stats(&self) -> super::invariant_monitor::InvariantStats {
self.invariant_monitor.lock().stats()
}
#[must_use]
pub fn invariant_violations(
&self,
) -> std::collections::VecDeque<super::invariant_monitor::InvariantViolation> {
self.invariant_monitor.lock().violations().clone()
}
pub fn verify_scheduler_invariants(&mut self) {
if !self.invariant_monitor.lock().is_enabled() {
return;
}
let current_time = Time::from_nanos(self.current_time_ns());
{
let local_ready_guard = self.local_ready.lock();
let local_ready_tasks: Vec<_> = local_ready_guard.iter().copied().collect();
let ready_snapshot = super::invariant_monitor::QueueSnapshot {
name: "local_ready_queue".to_string(),
reported_depth: local_ready_tasks.len(),
actual_tasks: local_ready_tasks,
priority_range: if local_ready_guard.is_empty() {
None
} else {
Some((0, 255)) },
time_range: Some((current_time, current_time)), };
drop(local_ready_guard);
self.invariant_monitor
.lock()
.verify_queue_consistency(&ready_snapshot, current_time);
}
let fast_queue_tasks = self.fast_queue.snapshot_tasks();
let fast_snapshot = super::invariant_monitor::QueueSnapshot {
name: "fast_queue".to_string(),
reported_depth: fast_queue_tasks.len(),
actual_tasks: fast_queue_tasks,
priority_range: None,
time_range: Some((current_time, current_time)),
};
self.invariant_monitor
.lock()
.verify_queue_consistency(&fast_snapshot, current_time);
}
pub fn record_task_completion(&mut self, task: TaskId) {
if !self.invariant_monitor.lock().is_enabled() {
return;
}
let current_time = Time::from_nanos(self.current_time_ns());
self.invariant_monitor
.lock()
.record_task_complete(task, self.id, current_time);
}
pub fn record_task_cancellation(&mut self, task: TaskId) {
if !self.invariant_monitor.lock().is_enabled() {
return;
}
let current_time = Time::from_nanos(self.current_time_ns());
self.invariant_monitor
.lock()
.record_task_cancel(task, current_time);
}
#[inline]
fn with_task_table<R, F: FnOnce(&mut TaskTable) -> R>(&self, f: F) -> R {
if let Some(tt) = &self.task_table {
let mut guard = tt.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
f(&mut guard)
} else {
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
f(&mut state.tasks)
}
}
#[inline]
fn with_task_table_ref<R, F: FnOnce(&TaskTable) -> R>(&self, f: F) -> R {
if let Some(tt) = &self.task_table {
let guard = tt.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
f(&guard)
} else {
let state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
f(&state.tasks)
}
}
#[must_use]
pub fn preemption_metrics(&self) -> &PreemptionMetrics {
&self.preemption_metrics
}
#[must_use]
pub fn preemption_fairness_certificate(&self) -> PreemptionFairnessCertificate {
let adaptive_current_limit = self.adaptive_cancel_policy.as_ref().map_or(
self.cancel_streak_limit,
AdaptiveCancelStreakPolicy::current_limit,
);
let effective_limit = self
.preemption_metrics
.max_effective_limit_observed
.max(adaptive_current_limit)
.max(1);
PreemptionFairnessCertificate {
base_limit: adaptive_current_limit,
effective_limit,
observed_max_cancel_streak: self.preemption_metrics.max_cancel_streak,
cancel_dispatches: self.preemption_metrics.cancel_dispatches,
timed_dispatches: self.preemption_metrics.timed_dispatches,
ready_dispatches: self.preemption_metrics.ready_dispatches,
fairness_yields: self.preemption_metrics.fairness_yields,
observed_max_ready_stall_steps: self.preemption_metrics.max_ready_dispatch_stall,
observed_max_timed_stall_steps: self.preemption_metrics.max_timed_dispatch_stall,
ready_priority_inversions: self.preemption_metrics.ready_priority_inversions,
max_ready_priority_inversion_gap: self
.preemption_metrics
.max_ready_priority_inversion_gap,
fallback_cancel_dispatches: self.preemption_metrics.fallback_cancel_dispatches,
base_limit_exceedances: self.preemption_metrics.base_limit_exceedances,
effective_limit_exceedances: self.preemption_metrics.effective_limit_exceedances,
adaptive_enabled: self.adaptive_cancel_policy.is_some(),
adaptive_current_limit,
}
}
pub fn set_evidence_sink(&mut self, sink: Arc<dyn crate::evidence_sink::EvidenceSink>) {
self.evidence_sink = Some(sink);
}
#[cfg(any(test, feature = "test-internals"))]
pub fn set_cached_suggestion(&mut self, suggestion: SchedulingSuggestion) {
self.cached_suggestion = suggestion;
}
#[inline]
fn current_base_cancel_limit(&self) -> usize {
self.adaptive_cancel_policy
.as_ref()
.map_or(
self.cancel_streak_limit,
AdaptiveCancelStreakPolicy::current_limit,
)
.max(1)
}
fn potential_from_snapshot(snapshot: &StateSnapshot) -> f64 {
let w = PotentialWeights::default();
let task_component = w.w_tasks * f64::from(snapshot.live_tasks);
#[allow(clippy::cast_precision_loss)]
let obligation_age_seconds = snapshot.obligation_age_sum_ns as f64 / 1_000_000_000.0;
let obligation_component = w.w_obligation_age * obligation_age_seconds;
let region_component = w.w_draining_regions * f64::from(snapshot.draining_regions);
let deadline_component = w.w_deadline_pressure * snapshot.deadline_pressure;
task_component + obligation_component + region_component + deadline_component
}
fn capture_adaptive_snapshot(&self) -> AdaptiveEpochSnapshot {
let snapshot = {
let state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
StateSnapshot::from_runtime_state(&state)
};
AdaptiveEpochSnapshot {
potential: Self::potential_from_snapshot(&snapshot),
deadline_pressure: snapshot.deadline_pressure,
base_limit_exceedances: self.preemption_metrics.base_limit_exceedances,
effective_limit_exceedances: self.preemption_metrics.effective_limit_exceedances,
fallback_cancel_dispatches: self.preemption_metrics.fallback_cancel_dispatches,
}
}
fn ensure_adaptive_epoch_started(&mut self) {
if self
.adaptive_cancel_policy
.as_ref()
.is_none_or(|p| p.epoch_start.is_some())
{
return;
}
let snap = self.capture_adaptive_snapshot();
if let Some(policy) = self.adaptive_cancel_policy.as_mut() {
policy.begin_epoch(snap);
}
}
fn adaptive_on_dispatch(&mut self) {
self.ensure_adaptive_epoch_started();
let should_close_epoch = self
.adaptive_cancel_policy
.as_mut()
.is_some_and(AdaptiveCancelStreakPolicy::on_dispatch);
if !should_close_epoch {
return;
}
let snapshot_end = self.capture_adaptive_snapshot();
let sample = self.rng.next_u64();
let reward = self
.adaptive_cancel_policy
.as_mut()
.and_then(|p| p.complete_epoch(snapshot_end, sample));
if let Some(policy) = self.adaptive_cancel_policy.as_ref() {
self.preemption_metrics.adaptive_epochs = policy.epoch_count;
self.preemption_metrics.adaptive_current_limit = policy.current_limit();
self.preemption_metrics.adaptive_reward_ema = policy.reward_ema;
self.preemption_metrics.adaptive_e_value = policy.e_value();
}
if let Some(_reward) = reward {
trace!(
worker_id = self.id,
reward = reward,
adaptive_limit = self.preemption_metrics.adaptive_current_limit,
adaptive_epochs = self.preemption_metrics.adaptive_epochs,
adaptive_e_value = self.preemption_metrics.adaptive_e_value,
"adaptive cancel-streak epoch update"
);
}
}
fn drive_io_phase(&self) -> IoPhaseOutcome {
let Some(io) = &self.io_driver else {
return IoPhaseOutcome::NoProgress;
};
let now = self
.timer_driver
.as_ref()
.map_or(Time::ZERO, TimerDriverHandle::now);
let local_deadline = self.local.lock().next_deadline();
let timer_deadline = self
.timer_driver
.as_ref()
.and_then(TimerDriverHandle::next_deadline);
let global_deadline = self.global.peek_earliest_deadline();
let next_deadline = [timer_deadline, local_deadline, global_deadline]
.into_iter()
.flatten()
.min();
let timeout = next_deadline.map(|deadline| {
if deadline > now {
Duration::from_nanos(deadline.duration_since(now))
} else {
Duration::ZERO
}
});
let io_timeout = if self.fast_queue.is_empty() {
timeout
} else {
Some(Duration::ZERO)
};
match io.try_turn_with(io_timeout, |_, _| {}) {
Ok(Some(n)) => {
if n > 0 || io_timeout != Some(Duration::ZERO) {
IoPhaseOutcome::Progress
} else {
IoPhaseOutcome::NoProgress
}
}
Ok(None) | Err(_) => {
IoPhaseOutcome::Follower
}
}
}
pub fn run_loop(&mut self) {
let _guard = ScopedLocalScheduler::new(Arc::clone(&self.local));
let _queue_guard = LocalQueue::set_current(self.fast_queue.clone());
let _local_ready_guard = ScopedLocalReady::new(Arc::clone(&self.local_ready));
let _worker_guard = ScopedWorkerId::new(self.id);
while !self.shutdown.load(Ordering::Relaxed) {
if let Some(task) = self.next_task() {
self.execute(task);
continue;
}
if self.schedule_ready_finalizers() {
continue;
}
let io_phase = self.drive_io_phase();
if matches!(io_phase, IoPhaseOutcome::Progress) {
continue;
}
let mut backoff = 0;
if !self.fast_queue.is_empty() {
continue;
}
loop {
if self.shutdown.load(Ordering::Relaxed) {
break;
}
let now = self
.timer_driver
.as_ref()
.map_or(Time::ZERO, TimerDriverHandle::now);
if self.global.has_runnable_work(now) || !self.fast_queue.is_empty() {
break;
}
if backoff < SPIN_LIMIT {
std::hint::spin_loop();
backoff += 1;
} else if backoff < SPIN_LIMIT + YIELD_LIMIT {
std::thread::yield_now();
backoff += 1;
} else if self.enable_parking {
let (local_has_runnable, local_deadline) = {
let local = self.local.lock();
(local.has_runnable_work(now), local.next_deadline())
};
let local_ready_has_work = !self.local_ready.lock().is_empty();
if local_has_runnable || local_ready_has_work {
break;
}
let timer_deadline = self
.timer_driver
.as_ref()
.and_then(TimerDriverHandle::next_deadline);
let global_deadline = self.global.peek_earliest_deadline();
record_backoff_deadline_selection(
&mut self.preemption_metrics,
io_phase,
timer_deadline,
global_deadline,
);
let next_deadline = select_backoff_deadline(
io_phase,
timer_deadline,
local_deadline,
global_deadline,
);
if let Some(next_deadline) = next_deadline {
let now = self
.timer_driver
.as_ref()
.map_or(Time::ZERO, TimerDriverHandle::now);
match classify_backoff_timeout_decision(io_phase, next_deadline, now) {
BackoffTimeoutDecision::ParkTimeout { nanos } => {
record_backoff_timeout_park(
&mut self.preemption_metrics,
io_phase,
nanos,
);
self.parker.park_timeout(Duration::from_nanos(nanos));
}
BackoffTimeoutDecision::DeadlineDue => {
break;
}
}
} else {
record_backoff_indefinite_park(&mut self.preemption_metrics, io_phase);
self.parker.park();
}
backoff = 0;
} else {
break;
}
}
self.cancel_streak = 0;
self.ready_dispatch_streak = 0;
}
}
#[allow(clippy::too_many_lines)]
pub fn next_task(&mut self) -> Option<TaskId> {
if let Some(timer) = &self.timer_driver {
let _ = timer.process_timers();
}
self.ensure_adaptive_epoch_started();
let suggestion = self.governor_suggest();
let base_limit = self.current_base_cancel_limit();
self.preemption_metrics.adaptive_current_limit = base_limit;
let effective_limit = match suggestion {
SchedulingSuggestion::DrainObligations | SchedulingSuggestion::DrainRegions => {
base_limit.saturating_mul(2)
}
_ => base_limit,
};
if effective_limit > self.preemption_metrics.max_effective_limit_observed {
self.preemption_metrics.max_effective_limit_observed = effective_limit;
}
let check_cancel = self.cancel_streak < effective_limit;
if !check_cancel {
self.preemption_metrics.fairness_yields += 1;
}
let now = self
.timer_driver
.as_ref()
.map_or(Time::ZERO, TimerDriverHandle::now);
if suggestion == SchedulingSuggestion::MeetDeadlines {
if let Some(tt) = self.global.pop_timed_if_due(now) {
self.record_timed_dispatch();
return Some(self.finish_dispatch(tt.task));
}
} else {
if check_cancel {
if let Some(pt) = self.global.pop_cancel() {
self.cancel_streak += 1;
self.ready_dispatch_streak = 0;
self.record_cancel_dispatch(base_limit, effective_limit);
return Some(self.finish_dispatch(pt.task));
}
}
}
let mut local = self.local.lock();
let rng_hint = self.rng.next_u64();
if suggestion == SchedulingSuggestion::MeetDeadlines {
if let Some(task) = local.pop_timed_only_with_hint(rng_hint, now) {
drop(local);
self.record_timed_dispatch();
return Some(self.finish_dispatch(task));
}
if check_cancel {
if let Some(pt) = self.global.pop_cancel() {
drop(local);
self.cancel_streak += 1;
self.ready_dispatch_streak = 0;
self.record_cancel_dispatch(base_limit, effective_limit);
return Some(self.finish_dispatch(pt.task));
}
if let Some(task) = local.pop_cancel_only_with_hint(rng_hint) {
drop(local);
self.cancel_streak += 1;
self.ready_dispatch_streak = 0;
self.record_cancel_dispatch(base_limit, effective_limit);
return Some(self.finish_dispatch(task));
}
}
} else {
if check_cancel {
if let Some(task) = local.pop_cancel_only_with_hint(rng_hint) {
drop(local);
self.cancel_streak += 1;
self.ready_dispatch_streak = 0;
self.record_cancel_dispatch(base_limit, effective_limit);
return Some(self.finish_dispatch(task));
}
}
if let Some(tt) = self.global.pop_timed_if_due(now) {
drop(local);
self.record_timed_dispatch();
return Some(self.finish_dispatch(tt.task));
}
if let Some(task) = local.pop_timed_only_with_hint(rng_hint, now) {
drop(local);
self.record_timed_dispatch();
return Some(self.finish_dispatch(task));
}
}
drop(local);
if self.should_force_ready_handoff() {
self.preemption_metrics.browser_ready_handoff_yields += 1;
self.cancel_streak = 0;
self.ready_dispatch_streak = 0;
return None;
}
let local_ready_task = self.local_ready.lock().pop_front();
if let Some(task) = local_ready_task {
self.record_ready_dispatch();
return Some(self.finish_dispatch(task));
}
if let Some(task) = self.fast_queue.pop() {
let blocked_local_task = {
let local = self.local.lock();
local.peek_ready_task()
};
let dispatched_priority = self.task_sched_priority(task);
self.record_ready_priority_inversion(blocked_local_task, task, dispatched_priority);
self.record_ready_dispatch();
return Some(self.finish_dispatch(task));
}
if let Some(pt) = self.global.pop_ready() {
let blocked_local_task = {
let local = self.local.lock();
local.peek_ready_task()
};
self.record_ready_priority_inversion(blocked_local_task, pt.task, Some(pt.priority));
self.record_ready_dispatch();
return Some(self.finish_dispatch(pt.task));
}
let rng_hint = self.rng.next_u64();
let local_task = {
let mut local = self.local.lock();
local.pop_ready_only_with_hint(rng_hint)
};
if let Some(task) = local_task {
self.record_ready_dispatch();
return Some(self.finish_dispatch(task));
}
if let Some(task) = self.try_steal() {
self.record_ready_dispatch();
return Some(self.finish_dispatch(task));
}
if !check_cancel {
if let Some(task) = self.try_cancel_work() {
self.preemption_metrics.fallback_cancel_dispatches += 1;
self.cancel_streak = 1;
self.ready_dispatch_streak = 0;
self.record_cancel_dispatch(base_limit, effective_limit);
return Some(self.finish_dispatch(task));
}
self.cancel_streak = 0;
}
self.ready_dispatch_streak = 0;
None
}
#[inline]
fn should_force_ready_handoff(&self) -> bool {
let limit = self.browser_ready_handoff_limit;
if limit == 0 || self.ready_dispatch_streak < limit {
return false;
}
if !self.fast_queue.is_empty() || self.global.has_ready_work() {
return true;
}
if self
.local_ready
.try_lock()
.is_some_and(|queue| !queue.is_empty())
{
return true;
}
self.local.lock().has_ready_work()
}
#[inline]
fn record_cancel_dispatch(&mut self, base_limit: usize, effective_limit: usize) {
self.preemption_metrics.cancel_dispatches += 1;
if self.cancel_streak > self.preemption_metrics.max_cancel_streak {
self.preemption_metrics.max_cancel_streak = self.cancel_streak;
}
if self.cancel_streak > base_limit {
self.preemption_metrics.base_limit_exceedances += 1;
}
if self.cancel_streak > effective_limit {
self.preemption_metrics.effective_limit_exceedances += 1;
}
}
#[inline]
fn record_timed_dispatch(&mut self) {
if self.cancel_streak > self.preemption_metrics.max_timed_dispatch_stall {
self.preemption_metrics.max_timed_dispatch_stall = self.cancel_streak;
}
self.cancel_streak = 0;
self.ready_dispatch_streak = 0;
self.preemption_metrics.timed_dispatches += 1;
}
#[inline]
fn record_ready_dispatch(&mut self) {
if self.cancel_streak > self.preemption_metrics.max_ready_dispatch_stall {
self.preemption_metrics.max_ready_dispatch_stall = self.cancel_streak;
}
self.cancel_streak = 0;
self.ready_dispatch_streak = self.ready_dispatch_streak.saturating_add(1);
self.preemption_metrics.ready_dispatches += 1;
}
fn record_ready_priority_inversion(
&mut self,
blocked_task: Option<(TaskId, u8)>,
executing_task: TaskId,
executing_priority: Option<u8>,
) {
let Some((blocked_task, blocked_priority)) = blocked_task else {
return;
};
let Some(executing_priority) = executing_priority else {
return;
};
if blocked_priority <= executing_priority {
return;
}
let timestamp = Time::from_nanos(self.current_time_ns());
self.preemption_metrics.ready_priority_inversions += 1;
let gap = blocked_priority.saturating_sub(executing_priority);
if gap > self.preemption_metrics.max_ready_priority_inversion_gap {
self.preemption_metrics.max_ready_priority_inversion_gap = gap;
}
self.invariant_monitor.lock().record_task_requeue(
blocked_task,
"local_ready_heap",
blocked_priority,
timestamp,
);
self.invariant_monitor.lock().verify_priority_ordering(
executing_task,
executing_priority,
blocked_task,
blocked_priority,
timestamp,
);
self.fairness_monitor.lock().record_priority_inversion(
blocked_task,
blocked_priority,
executing_task,
executing_priority,
timestamp.as_nanos(),
);
}
#[inline]
fn task_sched_priority(&self, task: TaskId) -> Option<u8> {
self.with_task_table_ref(|tt| tt.task(task).map(|record| record.sched_priority))
}
#[inline]
fn finish_dispatch(&mut self, task: TaskId) -> TaskId {
self.adaptive_on_dispatch();
let current_time = self.current_time_ns();
self.fairness_monitor
.lock()
.record_task_dispatch(task, current_time);
self.invariant_monitor
.lock()
.record_task_dispatch(task, Time::from_nanos(current_time));
task
}
#[allow(clippy::too_many_lines)]
fn governor_suggest(&mut self) -> SchedulingSuggestion {
let Some(governor) = &self.governor else {
return SchedulingSuggestion::NoPreference;
};
self.steps_since_snapshot += 1;
if self.steps_since_snapshot < self.governor_interval {
return self.cached_suggestion;
}
self.steps_since_snapshot = 0;
let state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let snapshot = StateSnapshot::from_runtime_state(&state);
let (wait_graph_nodes, wait_graph_edges, trapped_wait_cycle) =
if self.spectral_monitor.is_some() {
wait_graph_signals_from_state(&state)
} else {
(0, Vec::new(), false)
};
drop(state);
let queue_depth = self.local.lock().len();
#[allow(clippy::cast_possible_truncation)]
let snapshot = snapshot.with_ready_queue_depth(queue_depth as u32);
let lyapunov_suggestion = governor.suggest(&snapshot);
let drain_verdict = self.drain_certificate.as_mut().and_then(|cert| {
let is_drain_phase = matches!(
lyapunov_suggestion,
SchedulingSuggestion::DrainObligations | SchedulingSuggestion::DrainRegions
);
if is_drain_phase {
cert.observe(governor.compute_record(&snapshot).total);
Some(cert.verdict())
} else {
if !cert.is_empty() {
cert.reset();
}
None
}
});
let mut spectral_report = None;
if let Some(monitor) = self.spectral_monitor.as_mut() {
if trapped_wait_cycle || wait_graph_nodes > 1 {
spectral_report = Some(monitor.analyze_with_trapped_cycle(
wait_graph_nodes,
&wait_graph_edges,
trapped_wait_cycle,
));
}
}
let mut suggestion = if let (Some(contract), Some(posterior)) =
(&self.decision_contract, &mut self.decision_posterior)
{
let likelihoods =
super::decision_contract::SchedulerDecisionContract::snapshot_likelihoods(
&snapshot,
);
posterior.bayesian_update(&likelihoods);
let probs = posterior.probs();
#[allow(clippy::cast_precision_loss)]
let uniform = 1.0 / probs.len().max(1) as f64;
let max_prob = probs
.iter()
.copied()
.fold(0.0_f64, f64::max)
.clamp(0.0, 1.0);
let concentration = if probs.len() > 1 {
((max_prob - uniform) / (1.0 - uniform)).clamp(0.0, 1.0)
} else {
1.0
};
let entropy = normalized_entropy(probs);
let conformal_hit = spectral_report
.as_ref()
.and_then(|report| {
report.bifurcation.as_ref().and_then(|bw| {
bw.conformal_lower_bound_next
.map(|lb| u8::from(report.decomposition.fiedler_value >= lb))
})
})
.map_or(1.0, f64::from);
let uncertainty_penalty = 0.35f64.mul_add(1.0 - concentration, 0.15 * entropy);
let conformal_penalty = 0.5 * (1.0 - conformal_hit);
let calibration_score = (1.0 - uncertainty_penalty - conformal_penalty).clamp(0.0, 1.0);
let ci_width = 0.5f64
.mul_add(1.0 - concentration, 0.25 * entropy)
.clamp(0.0, 1.0);
let adaptive_e = self.preemption_metrics.adaptive_e_value.max(1.0);
let spectral_e = spectral_report
.as_ref()
.and_then(|report| {
report
.bifurcation
.as_ref()
.map(|bw| bw.deterioration_e_value.max(1.0))
})
.unwrap_or(1.0);
let e_process = adaptive_e.max(spectral_e);
let seq = self.decision_sequence;
self.decision_sequence = self.decision_sequence.saturating_add(1);
let now_ms = self
.timer_driver
.as_ref()
.map_or(seq, |td| td.now().as_millis());
let random_bits = ((self.id as u128) << 64) | u128::from(seq);
let ctx = franken_decision::EvalContext {
calibration_score,
e_process,
ci_width,
decision_id: franken_kernel::DecisionId::from_parts(now_ms, random_bits),
trace_id: franken_kernel::TraceId::from_parts(
now_ms,
random_bits ^ 0xA5A5_A5A5_A5A5_A5A5_A5A5,
),
ts_unix_ms: now_ms,
};
let outcome = franken_decision::evaluate(contract, posterior, &ctx);
if let Some(ref sink) = self.evidence_sink {
let evidence = outcome.audit_entry.to_evidence_ledger();
sink.emit(&evidence);
}
match outcome.action_index {
super::decision_contract::action::AGGRESSIVE => SchedulingSuggestion::NoPreference,
super::decision_contract::action::CONSERVATIVE => {
SchedulingSuggestion::MeetDeadlines
}
_ => lyapunov_suggestion,
}
} else {
lyapunov_suggestion
};
if let Some(report) = spectral_report.as_ref() {
let override_suggestion = match report.classification {
crate::observability::spectral_health::HealthClassification::Deadlocked => {
Some(SchedulingSuggestion::DrainObligations)
}
crate::observability::spectral_health::HealthClassification::Critical {
approaching_disconnect: true,
..
} => Some(SchedulingSuggestion::DrainObligations),
_ => report.bifurcation.as_ref().and_then(|bw| {
(bw.trend
== crate::observability::spectral_health::SpectralTrend::Deteriorating
&& (bw.confidence >= 0.6 || bw.deterioration_e_value >= 2.0))
.then_some(SchedulingSuggestion::DrainRegions)
}),
};
if let Some(ovr) = override_suggestion {
suggestion = ovr;
}
}
if trapped_wait_cycle {
suggestion = SchedulingSuggestion::DrainObligations;
}
if !trapped_wait_cycle {
if let Some(ref verdict) = drain_verdict {
match verdict.drain_phase {
DrainPhase::Stalled if verdict.stall_detected => {
suggestion = SchedulingSuggestion::DrainObligations;
}
DrainPhase::Quiescent => {
suggestion = SchedulingSuggestion::NoPreference;
}
_ => {}
}
}
}
if suggestion != self.cached_suggestion {
if let Some(ref sink) = self.evidence_sink {
let suggestion_str = match suggestion {
SchedulingSuggestion::MeetDeadlines => "meet_deadlines",
SchedulingSuggestion::DrainObligations => "drain_obligations",
SchedulingSuggestion::DrainRegions => "drain_regions",
SchedulingSuggestion::NoPreference => "no_preference",
};
let cancel_depth = snapshot.cancel_requested_tasks
+ snapshot.cancelling_tasks
+ snapshot.finalizing_tasks;
crate::evidence_sink::emit_scheduler_evidence(
sink.as_ref(),
suggestion_str,
cancel_depth,
snapshot.draining_regions,
snapshot.ready_queue_depth,
self.decision_contract
.as_ref()
.is_some_and(|_| self.decision_posterior.is_some()),
);
}
}
self.cached_suggestion = suggestion;
suggestion
}
pub fn run_once(&mut self) -> bool {
if self.shutdown.load(Ordering::Relaxed) {
return false;
}
if let Some(task) = self.next_task() {
self.execute(task);
return true;
}
false
}
pub(crate) fn try_cancel_work(&mut self) -> Option<TaskId> {
if let Some(pt) = self.global.pop_cancel() {
return Some(pt.task);
}
let mut local = self.local.lock();
let rng_hint = self.rng.next_u64();
local.pop_cancel_only_with_hint(rng_hint)
}
#[allow(dead_code)] pub(crate) fn try_timed_work(&mut self) -> Option<TaskId> {
let now = self
.timer_driver
.as_ref()
.map_or(Time::ZERO, TimerDriverHandle::now);
if let Some(tt) = self.global.pop_timed_if_due(now) {
return Some(tt.task);
}
let mut local = self.local.lock();
let rng_hint = self.rng.next_u64();
local.pop_timed_only_with_hint(rng_hint, now)
}
#[cfg(any(test, feature = "test-internals"))]
pub fn ready_count(&self) -> usize {
let local_ready = self.local_ready.try_lock().map_or(0, |q| q.len());
let fast = self.fast_queue.len();
let global = self.global.ready_count();
local_ready + fast + global
}
#[allow(dead_code)] pub(crate) fn try_ready_work(&mut self) -> Option<TaskId> {
if let Some(task) = self.local_ready.lock().pop_front() {
return Some(task);
}
if let Some(task) = self.fast_queue.pop() {
return Some(task);
}
if let Some(pt) = self.global.pop_ready() {
return Some(pt.task);
}
let mut local = self.local.lock();
let rng_hint = self.rng.next_u64();
local.pop_ready_only_with_hint(rng_hint)
}
pub(crate) fn try_steal(&mut self) -> Option<TaskId> {
if !self.fast_stealers.is_empty() {
let len = self.fast_stealers.len();
let start = self.rng.next_usize(len);
for i in 0..len {
let idx = (start + i) % len;
if let Some(task) = self.fast_stealers[idx].steal() {
debug_assert!(
!self.with_task_table_ref(|tt| {
tt.task(task)
.is_some_and(crate::record::task::TaskRecord::is_local)
}),
"BUG: stole a local (!Send) task {task:?} from another worker's fast_queue"
);
self.invariant_monitor
.lock()
.record_task_dispatch(task, Time::from_nanos(self.current_time_ns()));
return Some(task);
}
}
}
if self.stealers.is_empty() {
return None;
}
let len = self.stealers.len();
let start = self.rng.next_usize(len);
for i in 0..len {
let idx = (start + i) % len;
let stealer = &self.stealers[idx];
if let Some(mut victim) = stealer.try_lock() {
let stolen_count =
victim.steal_ready_batch_into(self.steal_batch_size, &mut self.steal_buffer);
if stolen_count > 0 {
#[cfg(debug_assertions)]
{
for &(task, _) in &self.steal_buffer[..stolen_count] {
let is_local = self.with_task_table_ref(|tt| {
tt.task(task)
.is_some_and(crate::record::task::TaskRecord::is_local)
});
debug_assert!(
!is_local,
"BUG: stole a local (!Send) task {task:?} from PriorityScheduler"
);
}
}
let (first_task, _) = self.steal_buffer[0];
self.invariant_monitor
.lock()
.record_task_dispatch(first_task, Time::from_nanos(self.current_time_ns()));
if stolen_count > 1 {
for &(task, _priority) in self.steal_buffer[1..].iter().rev() {
self.fast_queue.push(task);
self.invariant_monitor.lock().record_task_requeue(
task,
"fast_queue_stolen",
_priority,
Time::from_nanos(self.current_time_ns()),
);
}
}
return Some(first_task);
}
}
}
None
}
pub fn schedule_local(&self, task: TaskId, priority: u8) {
let should_schedule = self.with_task_table_ref(|tt| {
tt.task(task).is_none_or(|record| {
if record.is_local() {
error!(
?task,
"schedule_local: refusing to enqueue local task into PriorityScheduler"
);
return false;
}
record.wake_state.notify()
})
});
if should_schedule {
let mut local = self.local.lock();
local.schedule(task, priority);
let current_time = self.current_time_ns();
self.fairness_monitor.lock().record_task_enqueue(
task,
priority,
current_time,
2, );
self.invariant_monitor.lock().record_task_enqueue(
task,
"local_ready_heap",
priority,
Time::from_nanos(current_time),
);
}
}
pub fn schedule_local_cancel(&self, task: TaskId, priority: u8) {
self.with_task_table_ref(|tt| {
if let Some(record) = tt.task(task) {
record.wake_state.notify();
}
});
{
let mut local_ready_guard = self.local_ready.lock();
if let Some(pos) = local_ready_guard.iter().position(|t| *t == task) {
local_ready_guard.remove(pos);
}
drop(local_ready_guard);
let mut local = self.local.lock();
local.move_to_cancel_lane(task, priority);
let current_time = self.current_time_ns();
self.fairness_monitor.lock().record_task_enqueue(
task,
priority,
current_time,
0, );
self.invariant_monitor.lock().record_task_requeue(
task,
"local_cancel_queue",
priority,
Time::from_nanos(current_time),
);
}
self.parker.unpark();
}
pub fn schedule_local_timed(&self, task: TaskId, deadline: Time) {
let should_schedule = self.with_task_table_ref(|tt| {
tt.task(task).is_none_or(|record| {
if record.is_local() {
error!(
?task,
"schedule_local_timed: refusing to enqueue local task into timed lane"
);
return false;
}
record.wake_state.notify()
})
});
if should_schedule {
let mut local = self.local.lock();
local.schedule_timed(task, deadline);
let current_time = self.current_time_ns();
self.fairness_monitor.lock().record_task_enqueue(
task,
0, current_time,
1, );
self.invariant_monitor.lock().record_task_enqueue(
task,
"local_timed_queue",
0, Time::from_nanos(current_time),
);
}
}
fn wake_dependents_locked(
&self,
state: &RuntimeState,
waiters: impl IntoIterator<Item = TaskId>,
) {
let mut global_tasks = smallvec::SmallVec::<[(TaskId, u8); 16]>::new();
for waiter in waiters {
if let Some(record) = state.task(waiter) {
let waiter_priority = record.sched_priority;
if record.wake_state.notify() {
if record.is_local() {
if let Some(worker_id) = record.pinned_worker() {
if let Some(queue) = self.all_local_ready.get(worker_id) {
queue.lock().push_back(waiter);
self.coordinator.wake_worker(worker_id);
} else {
debug_assert!(
false,
"Pinned local waiter {waiter:?} has invalid worker id {worker_id}"
);
error!(
?waiter,
worker_id,
"execute: pinned local waiter has invalid worker id, wake skipped"
);
}
} else {
self.local_ready.lock().push_back(waiter);
self.parker.unpark();
}
} else {
global_tasks.push((waiter, waiter_priority));
}
}
}
}
let global_wakes = global_tasks.len();
if global_wakes > 0 {
self.global.add_ready_count(global_wakes);
for (task, priority) in global_tasks {
self.global.inject_ready_uncounted(task, priority);
}
self.coordinator.wake_many(global_wakes);
}
}
#[allow(clippy::too_many_lines)]
pub(crate) fn execute(&self, task_id: TaskId) {
struct TaskExecutionGuard<'a> {
worker: &'a ThreeLaneWorker,
task_id: TaskId,
completed: bool,
}
impl Drop for TaskExecutionGuard<'_> {
#[allow(clippy::significant_drop_tightening)] fn drop(&mut self) {
if !self.completed && std::thread::panicking() {
self.worker.with_task_table(|tt| {
if let Some(record) = tt.task_mut(self.task_id) {
if !record.state.is_terminal() {
record.complete(crate::types::Outcome::Panicked(
crate::types::outcome::PanicPayload::new(
"task panicked during poll",
),
));
}
}
});
let mut state = self
.worker
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let waiters = state.task_completed(self.task_id);
let finalizers = state.drain_ready_async_finalizers();
self.worker.wake_dependents_locked(&state, waiters);
let finalizer_wakes = finalizers.len();
if finalizer_wakes > 0 {
self.worker.global.add_ready_count(finalizer_wakes);
for (finalizer_task, priority) in finalizers {
self.worker
.global
.inject_ready_uncounted(finalizer_task, priority);
}
self.worker.coordinator.wake_many(finalizer_wakes);
}
}
}
}
trace!(task_id = ?task_id, worker_id = self.id, "executing task");
let (
mut stored,
wake_state,
priority,
task_cx,
cx_inner,
cached_waker,
cached_cancel_waker,
) = {
let merged = self.with_task_table(|tt| {
let global_stored = tt.remove_stored_future(task_id)?;
let record = tt.task_mut(task_id)?;
record.start_running();
record.wake_state.begin_poll();
let priority = record.sched_priority;
let wake_state = Arc::clone(&record.wake_state);
let task_cx = record.cx.clone();
let cached_waker = record.cached_waker.take();
let cached_cancel_waker = record.cached_cancel_waker.take();
let both_cached = cached_waker.is_some()
&& cached_cancel_waker
.as_ref()
.is_some_and(|(_, p)| *p == priority);
let cx_inner = if both_cached {
None
} else {
record.cx_inner.clone()
};
Some((
AnyStoredTask::Global(global_stored),
wake_state,
priority,
task_cx,
cx_inner,
cached_waker,
cached_cancel_waker,
))
});
if let Some(result) = merged {
result
} else {
let local = crate::runtime::local::remove_local_task(task_id);
let Some(local) = local else {
return;
};
let record_info = self.with_task_table(|tt| {
let record = tt.task_mut(task_id)?;
record.start_running();
record.wake_state.begin_poll();
let priority = record.sched_priority;
let wake_state = Arc::clone(&record.wake_state);
let task_cx = record.cx.clone();
let cached_waker = record.cached_waker.take();
let cached_cancel_waker = record.cached_cancel_waker.take();
let both_cached = cached_waker.is_some()
&& cached_cancel_waker
.as_ref()
.is_some_and(|(_, p)| *p == priority);
let cx_inner = if both_cached {
None
} else {
record.cx_inner.clone()
};
Some((
wake_state,
priority,
task_cx,
cx_inner,
cached_waker,
cached_cancel_waker,
))
});
let Some((
wake_state,
priority,
task_cx,
cx_inner,
cached_waker,
cached_cancel_waker,
)) = record_info
else {
return;
};
(
AnyStoredTask::Local(local),
wake_state,
priority,
task_cx,
cx_inner,
cached_waker,
cached_cancel_waker,
)
}
};
let is_local = stored.is_local();
let waker = if let Some((w, _)) = cached_waker {
w
} else {
let inner = cx_inner.as_ref().expect("cx_inner missing");
let fast_cancel = Arc::clone(&inner.read().fast_cancel);
let weak_inner = Arc::downgrade(inner);
if is_local {
Waker::from(Arc::new(ThreeLaneLocalWaker {
task_id,
priority,
wake_state: Arc::clone(&wake_state),
local: Arc::clone(&self.local),
local_ready: Arc::clone(&self.local_ready),
parker: self.parker.clone(),
fast_cancel,
cx_inner: weak_inner,
}))
} else {
Waker::from(Arc::new(ThreeLaneWaker {
task_id,
wake_state: Arc::clone(&wake_state),
global: Arc::clone(&self.global),
coordinator: Arc::clone(&self.coordinator),
priority,
fast_cancel,
cx_inner: weak_inner,
}))
}
};
let cancel_waker_for_cache = if cached_cancel_waker
.as_ref()
.is_some_and(|(_, p)| *p == priority)
{
cached_cancel_waker.map(|(w, _)| (w, priority))
} else {
cx_inner.as_ref().map(|inner| {
let w = if is_local {
Waker::from(Arc::new(ThreeLaneLocalCancelWaker {
task_id,
default_priority: priority,
wake_state: Arc::clone(&wake_state),
local: Arc::clone(&self.local),
local_ready: Arc::clone(&self.local_ready),
parker: self.parker.clone(),
cx_inner: Arc::downgrade(inner),
}))
} else {
Waker::from(Arc::new(CancelLaneWaker {
task_id,
default_priority: priority,
wake_state: Arc::clone(&wake_state),
global: Arc::clone(&self.global),
coordinator: Arc::clone(&self.coordinator),
cx_inner: Arc::downgrade(inner),
}))
};
{
let mut guard = inner.write();
let needs_update = !guard
.cancel_waker
.as_ref()
.is_some_and(|existing| existing.will_wake(&w));
if needs_update {
guard.cancel_waker = Some(w.clone());
}
}
(w, priority)
})
};
let _cx_guard = crate::cx::Cx::set_current(task_cx);
let mut guard = TaskExecutionGuard {
worker: self,
task_id,
completed: false,
};
let poll_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut cx = Context::from_waker(&waker);
stored.poll(&mut cx)
}));
match poll_result {
Ok(Poll::Ready(outcome)) => {
let task_outcome = outcome
.map_err(|()| crate::error::Error::new(crate::error::ErrorKind::Internal));
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let cancel_ack = Self::consume_cancel_ack_locked(&mut state, task_id);
if let Some(record) = state.task_mut(task_id) {
if !record.state.is_terminal() {
let mut completed_via_cancel = false;
if matches!(task_outcome, crate::types::Outcome::Ok(())) {
let should_cancel = matches!(
record.state,
crate::record::task::TaskState::Cancelling { .. }
| crate::record::task::TaskState::Finalizing { .. }
) || (cancel_ack
&& matches!(
record.state,
crate::record::task::TaskState::CancelRequested { .. }
));
if should_cancel {
if matches!(
record.state,
crate::record::task::TaskState::CancelRequested { .. }
) {
let _ = record.acknowledge_cancel();
}
if matches!(
record.state,
crate::record::task::TaskState::Cancelling { .. }
) {
record.cleanup_done();
}
if matches!(
record.state,
crate::record::task::TaskState::Finalizing { .. }
) {
record.finalize_done();
}
completed_via_cancel = matches!(
record.state,
crate::record::task::TaskState::Completed(
crate::types::Outcome::Cancelled(_)
)
);
}
}
if !completed_via_cancel {
record.complete(task_outcome);
}
}
}
let waiters = state.task_completed(task_id);
let finalizers = state.drain_ready_async_finalizers();
self.wake_dependents_locked(&state, waiters);
let finalizer_wakes = finalizers.len();
if finalizer_wakes > 0 {
self.global.add_ready_count(finalizer_wakes);
for (finalizer_task, priority) in finalizers {
self.global.inject_ready_uncounted(finalizer_task, priority);
}
self.coordinator.wake_many(finalizer_wakes);
}
drop(state);
guard.completed = true;
wake_state.clear();
}
Ok(Poll::Pending) => {
match stored {
AnyStoredTask::Global(t) => {
self.with_task_table(move |tt| {
tt.store_spawned_task(task_id, t);
if let Some(record) = tt.task_mut(task_id) {
record.cached_waker = Some((waker, priority));
record.cached_cancel_waker = cancel_waker_for_cache;
if let Some(inner) = record.cx_inner.as_ref() {
let needs_ack = inner.read().cancel_acknowledged;
if needs_ack {
let mut g = inner.write();
if g.cancel_acknowledged {
g.cancel_acknowledged = false;
drop(g);
let _ = record.acknowledge_cancel();
}
}
}
}
});
}
AnyStoredTask::Local(t) => {
crate::runtime::local::store_local_task(task_id, t);
self.with_task_table(move |tt| {
if let Some(record) = tt.task_mut(task_id) {
record.cached_waker = Some((waker, priority));
record.cached_cancel_waker = cancel_waker_for_cache;
if let Some(inner) = record.cx_inner.as_ref() {
let needs_ack = inner.read().cancel_acknowledged;
if needs_ack {
let mut g = inner.write();
if g.cancel_acknowledged {
g.cancel_acknowledged = false;
drop(g);
let _ = record.acknowledge_cancel();
}
}
}
}
});
}
}
if wake_state.finish_poll() {
let mut cancel_priority = priority;
let mut schedule_cancel = false;
let cx_inner_for_finish = if cx_inner.is_some() {
cx_inner
} else {
self.with_task_table(|tt| tt.task(task_id).and_then(|r| r.cx_inner.clone()))
};
if let Some(inner) = cx_inner_for_finish.as_ref() {
let guard = inner.read();
if guard.cancel_requested {
schedule_cancel = true;
if let Some(reason) = guard.cancel_reason.as_ref() {
cancel_priority = reason.cleanup_budget().priority;
}
}
}
if is_local {
if schedule_cancel {
let mut local_ready_guard = self.local_ready.lock();
if let Some(pos) = local_ready_guard.iter().position(|t| *t == task_id)
{
local_ready_guard.remove(pos);
}
drop(local_ready_guard);
let mut local = self.local.lock();
local.schedule_cancel(task_id, cancel_priority);
} else {
self.local_ready.lock().push_back(task_id);
}
self.parker.unpark();
} else {
if schedule_cancel {
self.global.inject_cancel(task_id, cancel_priority);
} else {
self.global.inject_ready(task_id, priority);
}
self.coordinator.wake_one();
}
}
guard.completed = true;
}
Err(payload) => {
let panic_payload = crate::types::outcome::PanicPayload::new(
crate::cx::scope::payload_to_string(&payload),
);
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let _cancel_ack = Self::consume_cancel_ack_locked(&mut state, task_id);
if let Some(record) = state.task_mut(task_id) {
if !record.state.is_terminal() {
record.complete(crate::types::Outcome::Panicked(panic_payload));
}
}
let waiters = state.task_completed(task_id);
let finalizers = state.drain_ready_async_finalizers();
self.wake_dependents_locked(&state, waiters);
let finalizer_wakes = finalizers.len();
if finalizer_wakes > 0 {
self.global.add_ready_count(finalizer_wakes);
for (finalizer_task, priority) in finalizers {
self.global.inject_ready_uncounted(finalizer_task, priority);
}
self.coordinator.wake_many(finalizer_wakes);
}
drop(state);
guard.completed = true;
wake_state.clear();
}
}
let _ = guard.completed;
}
fn schedule_ready_finalizers(&self) -> bool {
let tasks = {
let mut state = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
state.drain_ready_async_finalizers()
};
if tasks.is_empty() {
return false;
}
let finalizer_wakes = tasks.len();
if finalizer_wakes > 0 {
self.global.add_ready_count(finalizer_wakes);
for (task_id, priority) in tasks {
self.global.inject_ready_uncounted(task_id, priority);
}
self.coordinator.wake_many(finalizer_wakes);
}
true
}
#[allow(dead_code)] fn consume_cancel_ack(&self, task_id: TaskId) -> bool {
self.with_task_table(|tt| Self::consume_cancel_ack_from_table(tt, task_id))
}
fn consume_cancel_ack_locked(state: &mut RuntimeState, task_id: TaskId) -> bool {
Self::consume_cancel_ack_from_table(&mut state.tasks, task_id)
}
fn consume_cancel_ack_from_table(tt: &mut TaskTable, task_id: TaskId) -> bool {
let Some(record) = tt.task_mut(task_id) else {
return false;
};
let Some(inner) = record.cx_inner.as_ref() else {
return false;
};
if !inner.read().cancel_acknowledged {
return false;
}
let mut guard = inner.write();
if guard.cancel_acknowledged {
guard.cancel_acknowledged = false;
drop(guard);
let _ = record.acknowledge_cancel();
return true;
}
false
}
}
struct ThreeLaneWaker {
task_id: TaskId,
wake_state: Arc<crate::record::task::TaskWakeState>,
global: Arc<GlobalInjector>,
coordinator: Arc<WorkerCoordinator>,
priority: u8,
fast_cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
cx_inner: Weak<RwLock<CxInner>>,
}
impl ThreeLaneWaker {
#[inline]
fn schedule(&self) {
if self.wake_state.notify() {
let mut priority = self.priority;
let is_cancelling = self.fast_cancel.load(Ordering::Acquire);
if is_cancelling {
if let Some(inner) = self.cx_inner.upgrade() {
let guard = inner.read();
if let Some(reason) = &guard.cancel_reason {
priority = reason.cleanup_budget().priority;
}
}
}
if is_cancelling {
self.global.inject_cancel(self.task_id, priority);
} else {
self.global.inject_ready(self.task_id, priority);
}
self.coordinator.wake_one();
}
}
}
use std::task::Wake;
impl Wake for ThreeLaneWaker {
#[inline]
fn wake(self: Arc<Self>) {
self.schedule();
}
#[inline]
fn wake_by_ref(self: &Arc<Self>) {
self.schedule();
}
}
struct ThreeLaneLocalWaker {
task_id: TaskId,
priority: u8,
wake_state: Arc<crate::record::task::TaskWakeState>,
local: Arc<Mutex<PriorityScheduler>>,
local_ready: Arc<LocalReadyQueue>,
parker: Parker,
fast_cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
cx_inner: Weak<RwLock<CxInner>>,
}
impl ThreeLaneLocalWaker {
#[inline]
fn schedule(&self) {
if self.wake_state.notify() {
let is_cancelling = self.fast_cancel.load(Ordering::Acquire);
if is_cancelling {
let mut priority = self.priority;
if let Some(inner) = self.cx_inner.upgrade() {
let guard = inner.read();
if let Some(reason) = &guard.cancel_reason {
priority = reason.cleanup_budget().priority;
}
}
let mut local_ready_guard = self.local_ready.lock();
if let Some(pos) = local_ready_guard.iter().position(|t| *t == self.task_id) {
local_ready_guard.remove(pos);
}
drop(local_ready_guard);
let mut local = self.local.lock();
local.move_to_cancel_lane(self.task_id, priority);
} else {
self.local_ready.lock().push_back(self.task_id);
}
self.parker.unpark();
}
}
}
impl Wake for ThreeLaneLocalWaker {
#[inline]
fn wake(self: Arc<Self>) {
self.schedule();
}
#[inline]
fn wake_by_ref(self: &Arc<Self>) {
self.schedule();
}
}
struct CancelLaneWaker {
task_id: TaskId,
default_priority: u8,
wake_state: Arc<crate::record::task::TaskWakeState>,
global: Arc<GlobalInjector>,
coordinator: Arc<WorkerCoordinator>,
cx_inner: Weak<RwLock<CxInner>>,
}
impl CancelLaneWaker {
#[inline]
fn schedule(&self) {
let Some(inner) = self.cx_inner.upgrade() else {
return;
};
let (cancel_requested, priority) = {
let guard = inner.read();
let priority = guard
.cancel_reason
.as_ref()
.map_or(self.default_priority, |reason| {
reason.cleanup_budget().priority
});
(guard.cancel_requested, priority)
};
if !cancel_requested {
return;
}
self.wake_state.notify();
self.global.inject_cancel(self.task_id, priority);
self.coordinator.wake_one();
}
}
impl Wake for CancelLaneWaker {
#[inline]
fn wake(self: Arc<Self>) {
self.schedule();
}
#[inline]
fn wake_by_ref(self: &Arc<Self>) {
self.schedule();
}
}
struct ThreeLaneLocalCancelWaker {
task_id: TaskId,
default_priority: u8,
wake_state: Arc<crate::record::task::TaskWakeState>,
local: Arc<Mutex<PriorityScheduler>>,
local_ready: Arc<LocalReadyQueue>,
parker: Parker,
cx_inner: Weak<RwLock<CxInner>>,
}
impl ThreeLaneLocalCancelWaker {
#[inline]
fn schedule(&self) {
let Some(inner) = self.cx_inner.upgrade() else {
return;
};
let (cancel_requested, priority) = {
let guard = inner.read();
let priority = guard
.cancel_reason
.as_ref()
.map_or(self.default_priority, |reason| {
reason.cleanup_budget().priority
});
(guard.cancel_requested, priority)
};
if !cancel_requested {
return;
}
self.wake_state.notify();
{
let mut local_ready_guard = self.local_ready.lock();
if let Some(pos) = local_ready_guard.iter().position(|t| *t == self.task_id) {
local_ready_guard.remove(pos);
}
drop(local_ready_guard);
let mut local = self.local.lock();
local.move_to_cancel_lane(self.task_id, priority);
}
self.parker.unpark();
}
}
impl Wake for ThreeLaneLocalCancelWaker {
#[inline]
fn wake(self: Arc<Self>) {
self.schedule();
}
#[inline]
fn wake_by_ref(self: &Arc<Self>) {
self.schedule();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::task::TaskWakeState;
use crate::runtime::scheduler::invariant_monitor;
use crate::runtime::scheduler::{InvariantCategory, SchedulerInvariant};
use crate::time::{TimerDriverHandle, VirtualClock};
use crate::types::{Budget, CancelKind, CancelReason, CxInner, RegionId, TaskId};
use parking_lot::RwLock;
use serde_json::{Value, json};
use std::time::Duration;
#[derive(Default)]
struct TaskIdScrubber {
labels: BTreeMap<TaskId, String>,
next: usize,
}
impl TaskIdScrubber {
fn label(&mut self, task_id: TaskId) -> String {
if let Some(label) = self.labels.get(&task_id) {
return label.clone();
}
let label = format!("[TASK_{}]", self.next);
self.next += 1;
self.labels.insert(task_id, label.clone());
label
}
}
fn scrubbed_tracked_tasks(
scrubber: &mut TaskIdScrubber,
tracked_tasks: Vec<invariant_monitor::TrackedTaskSnapshot>,
) -> Vec<Value> {
let mut tracked_tasks = tracked_tasks;
tracked_tasks.sort_by_key(|snapshot| snapshot.task_id);
tracked_tasks
.into_iter()
.map(|snapshot| {
json!({
"task_id": scrubber.label(snapshot.task_id),
"queues": snapshot.queues,
"priority": snapshot.priority,
"enqueue_time_ns": snapshot.enqueue_time.as_nanos(),
"last_update_ns": snapshot.last_update.as_nanos(),
"lifecycle_state": snapshot.lifecycle_state,
"owner_worker": snapshot.owner_worker,
"is_cancelled": snapshot.is_cancelled,
})
})
.collect()
}
fn worker_state_dump_scrubbed(
scenario: &str,
worker: &ThreeLaneWorker,
dispatch_sequence: &[TaskId],
) -> Value {
let mut scrubber = TaskIdScrubber::default();
let local_ready_tasks: Vec<_> = worker.local_ready.lock().iter().copied().collect();
let local_scheduler_depth = worker.local.lock().len();
let tracked_tasks = worker.invariant_monitor.lock().tracked_tasks();
let invariant_stats = worker.invariant_stats();
let starvation_stats = worker.starvation_stats();
let certificate = worker.preemption_fairness_certificate();
let metrics = worker.preemption_metrics();
let adaptive_policy = worker.adaptive_cancel_policy.as_ref().map(|policy| {
json!({
"selected_arm": policy.selected_arm,
"current_limit": policy.current_limit(),
"epoch_steps": policy.epoch_steps,
"steps_in_epoch": policy.steps_in_epoch,
"epoch_count": policy.epoch_count,
"reward_ema": policy.reward_ema,
"e_process_log": policy.e_process_log,
"weights": policy.weights,
"probs": policy.probs,
"pulls": policy.pulls,
})
});
json!({
"scenario": scenario,
"worker_id": worker.id,
"cancel_streak": worker.cancel_streak,
"cancel_streak_limit": worker.cancel_streak_limit,
"ready_count": worker.ready_count(),
"lane_depths": {
"local_priority_scheduler": local_scheduler_depth,
"local_ready": local_ready_tasks.len(),
"fast_queue": worker.fast_queue.len(),
"global_pending": worker.global.len(),
"global_ready": worker.global.ready_count(),
"global_has_cancel": worker.global.has_cancel_work(),
"global_has_timed": worker.global.has_timed_work(),
"global_has_ready": worker.global.has_ready_work(),
},
"local_ready_tasks": local_ready_tasks
.into_iter()
.map(|task_id| scrubber.label(task_id))
.collect::<Vec<_>>(),
"dispatch_sequence": dispatch_sequence
.iter()
.copied()
.map(|task_id| scrubber.label(task_id))
.collect::<Vec<_>>(),
"tracked_tasks": scrubbed_tracked_tasks(&mut scrubber, tracked_tasks),
"fairness_certificate": {
"base_limit": certificate.base_limit,
"effective_limit": certificate.effective_limit,
"observed_max_cancel_streak": certificate.observed_max_cancel_streak,
"cancel_dispatches": certificate.cancel_dispatches,
"timed_dispatches": certificate.timed_dispatches,
"ready_dispatches": certificate.ready_dispatches,
"fairness_yields": certificate.fairness_yields,
"observed_max_ready_stall_steps": certificate.observed_max_ready_stall_steps,
"observed_max_timed_stall_steps": certificate.observed_max_timed_stall_steps,
"ready_priority_inversions": certificate.ready_priority_inversions,
"max_ready_priority_inversion_gap": certificate.max_ready_priority_inversion_gap,
"fallback_cancel_dispatches": certificate.fallback_cancel_dispatches,
"base_limit_exceedances": certificate.base_limit_exceedances,
"effective_limit_exceedances": certificate.effective_limit_exceedances,
"adaptive_enabled": certificate.adaptive_enabled,
"adaptive_current_limit": certificate.adaptive_current_limit,
"ready_stall_bound_steps": certificate.ready_stall_bound_steps(),
"observed_non_cancel_stall_steps": certificate.observed_non_cancel_stall_steps(),
"invariant_holds": certificate.invariant_holds(),
"witness_hash": certificate.witness_hash(),
},
"preemption_metrics": {
"cancel_dispatches": metrics.cancel_dispatches,
"timed_dispatches": metrics.timed_dispatches,
"ready_dispatches": metrics.ready_dispatches,
"fairness_yields": metrics.fairness_yields,
"max_cancel_streak": metrics.max_cancel_streak,
"max_ready_dispatch_stall": metrics.max_ready_dispatch_stall,
"max_timed_dispatch_stall": metrics.max_timed_dispatch_stall,
"fallback_cancel_dispatches": metrics.fallback_cancel_dispatches,
"base_limit_exceedances": metrics.base_limit_exceedances,
"effective_limit_exceedances": metrics.effective_limit_exceedances,
"adaptive_epochs": metrics.adaptive_epochs,
"adaptive_current_limit": metrics.adaptive_current_limit,
"adaptive_reward_ema": metrics.adaptive_reward_ema,
"adaptive_e_value": metrics.adaptive_e_value,
},
"invariant_stats": {
"operations_monitored": invariant_stats.operations_monitored,
"avg_monitoring_overhead_ns": invariant_stats.avg_monitoring_overhead_ns,
"monitored_workers": invariant_stats.monitored_workers,
"violations_by_severity": invariant_stats.violations_by_severity,
},
"starvation_stats": {
"total_starvation_events": starvation_stats.total_starvation_events,
"currently_starved_tasks": starvation_stats.currently_starved_tasks,
"max_task_wait_time_ns": starvation_stats.max_task_wait_time_ns,
"avg_task_wait_time_ns": starvation_stats.avg_task_wait_time_ns,
"total_priority_inversions": starvation_stats.total_priority_inversions,
"tracked_tasks_count": starvation_stats.tracked_tasks_count,
"pattern_detected": starvation_stats.pattern_detected,
"total_tracked_wait_time_ns": starvation_stats.total_tracked_wait_time_ns,
"max_priority_inversion_gap": starvation_stats.max_priority_inversion_gap,
},
"adaptive_policy": adaptive_policy,
})
}
fn empty_scheduler_state_dump() -> Value {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
worker.verify_scheduler_invariants();
worker_state_dump_scrubbed("empty", worker, &[])
}
fn loaded_scheduler_state_dump() -> Value {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
worker.schedule_local(TaskId::new_for_test(100, 1), 40);
worker.schedule_local_cancel(TaskId::new_for_test(101, 1), 90);
worker.schedule_local_timed(TaskId::new_for_test(102, 1), Time::from_nanos(5_000));
worker.verify_scheduler_invariants();
worker_state_dump_scrubbed("loaded", worker, &[])
}
fn cancel_streak_scheduler_state_dump() -> Value {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, 2);
let worker = &mut scheduler.workers[0];
worker.schedule_local(TaskId::new_for_test(200, 1), 25);
for task_index in 0..5 {
worker.schedule_local_cancel(TaskId::new_for_test(210 + task_index, 1), 100);
}
let mut dispatch_sequence = Vec::new();
for _ in 0..6 {
if let Some(task_id) = worker.next_task() {
dispatch_sequence.push(task_id);
}
}
worker.verify_scheduler_invariants();
worker_state_dump_scrubbed("cancel_streak", worker, &dispatch_sequence)
}
#[test]
fn test_three_lane_scheduler_creation() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let scheduler = ThreeLaneScheduler::new(2, &state);
assert!(!scheduler.is_shutdown());
assert_eq!(scheduler.workers.len(), 2);
}
#[test]
fn test_initial_local_scheduler_capacity_scales_with_worker_count() {
assert_eq!(
ThreeLaneScheduler::initial_local_scheduler_capacity(0),
1024
);
assert_eq!(
ThreeLaneScheduler::initial_local_scheduler_capacity(1),
1024
);
assert_eq!(
ThreeLaneScheduler::initial_local_scheduler_capacity(2),
1024
);
assert_eq!(ThreeLaneScheduler::initial_local_scheduler_capacity(4), 512);
assert_eq!(ThreeLaneScheduler::initial_local_scheduler_capacity(8), 256);
assert_eq!(
ThreeLaneScheduler::initial_local_scheduler_capacity(64),
128
);
}
#[test]
fn select_backoff_deadline_follower_uses_local_only() {
let timer_deadline = Some(Time::from_nanos(100));
let local_deadline = Some(Time::from_nanos(400));
let global_deadline = Some(Time::from_nanos(200));
let selected = select_backoff_deadline(
IoPhaseOutcome::Follower,
timer_deadline,
local_deadline,
global_deadline,
);
assert_eq!(
selected, local_deadline,
"follower must ignore shared deadlines and honor only local deadline"
);
}
#[test]
fn select_backoff_deadline_follower_without_local_deadline_stays_none() {
let selected = select_backoff_deadline(
IoPhaseOutcome::Follower,
Some(Time::from_nanos(100)),
None,
Some(Time::from_nanos(200)),
);
assert_eq!(
selected, None,
"follower should not arm timeout wakeups for non-local deadlines"
);
}
#[test]
fn select_backoff_deadline_non_follower_uses_earliest_deadline() {
let timer_deadline = Some(Time::from_nanos(500));
let local_deadline = Some(Time::from_nanos(300));
let global_deadline = Some(Time::from_nanos(100));
let selected = select_backoff_deadline(
IoPhaseOutcome::NoProgress,
timer_deadline,
local_deadline,
global_deadline,
);
assert_eq!(
selected, global_deadline,
"leader/no-io path should continue using earliest deadline across all sources"
);
}
#[test]
fn backoff_metrics_count_follower_shared_deadline_ignores() {
let mut metrics = PreemptionMetrics::default();
record_backoff_deadline_selection(
&mut metrics,
IoPhaseOutcome::Follower,
Some(Time::from_nanos(100)),
Some(Time::from_nanos(200)),
);
assert_eq!(metrics.follower_shared_deadline_ignored, 1);
record_backoff_deadline_selection(
&mut metrics,
IoPhaseOutcome::NoProgress,
Some(Time::from_nanos(100)),
Some(Time::from_nanos(200)),
);
assert_eq!(metrics.follower_shared_deadline_ignored, 1);
}
#[test]
fn backoff_metrics_count_follower_without_shared_deadlines_is_noop() {
let mut metrics = PreemptionMetrics::default();
record_backoff_deadline_selection(&mut metrics, IoPhaseOutcome::Follower, None, None);
assert_eq!(
metrics.follower_shared_deadline_ignored, 0,
"follower should only count suppressions when a shared deadline was present"
);
}
#[test]
fn backoff_metrics_count_short_waits_and_follower_timeout_parks() {
let mut metrics = PreemptionMetrics::default();
record_backoff_timeout_park(&mut metrics, IoPhaseOutcome::Follower, 4_000_000);
record_backoff_timeout_park(&mut metrics, IoPhaseOutcome::NoProgress, 6_000_000);
assert_eq!(metrics.backoff_parks_total, 2);
assert_eq!(metrics.backoff_timeout_parks_total, 2);
assert_eq!(metrics.backoff_timeout_nanos_total, 10_000_000);
assert_eq!(metrics.short_wait_le_5ms, 1);
assert_eq!(metrics.follower_timeout_parks, 1);
}
#[test]
fn backoff_metrics_count_short_wait_threshold_is_inclusive() {
let mut metrics = PreemptionMetrics::default();
record_backoff_timeout_park(
&mut metrics,
IoPhaseOutcome::Follower,
SHORT_WAIT_LE_5MS_NANOS,
);
assert_eq!(
metrics.short_wait_le_5ms, 1,
"<= 5ms threshold should include exactly 5ms"
);
}
#[test]
fn classify_backoff_timeout_decision_handles_due_short_and_long_waits() {
let now = Time::from_nanos(1_000);
let due = classify_backoff_timeout_decision(IoPhaseOutcome::Follower, now, now);
assert_eq!(due, BackoffTimeoutDecision::DeadlineDue);
let short_follower = classify_backoff_timeout_decision(
IoPhaseOutcome::Follower,
Time::from_nanos(1_000 + 4_000_000),
now,
);
assert_eq!(
short_follower,
BackoffTimeoutDecision::ParkTimeout { nanos: 4_000_000 }
);
let threshold_follower = classify_backoff_timeout_decision(
IoPhaseOutcome::Follower,
Time::from_nanos(1_000 + SHORT_WAIT_LE_5MS_NANOS),
now,
);
assert_eq!(
threshold_follower,
BackoffTimeoutDecision::ParkTimeout {
nanos: SHORT_WAIT_LE_5MS_NANOS
}
);
let long_follower = classify_backoff_timeout_decision(
IoPhaseOutcome::Follower,
Time::from_nanos(1_000 + 6_000_000),
now,
);
assert_eq!(
long_follower,
BackoffTimeoutDecision::ParkTimeout { nanos: 6_000_000 }
);
let short_leader = classify_backoff_timeout_decision(
IoPhaseOutcome::NoProgress,
Time::from_nanos(1_000 + 4_000_000),
now,
);
assert_eq!(
short_leader,
BackoffTimeoutDecision::ParkTimeout { nanos: 4_000_000 }
);
}
#[test]
fn backoff_metrics_count_indefinite_parks() {
let mut metrics = PreemptionMetrics::default();
record_backoff_indefinite_park(&mut metrics, IoPhaseOutcome::Follower);
record_backoff_indefinite_park(&mut metrics, IoPhaseOutcome::NoProgress);
assert_eq!(metrics.backoff_parks_total, 2);
assert_eq!(metrics.backoff_indefinite_parks, 2);
assert_eq!(metrics.follower_indefinite_parks, 1);
}
#[test]
fn preemption_metrics_backoff_summary_helpers_handle_zero_denominators() {
let metrics = PreemptionMetrics::default();
assert_eq!(metrics.avg_timeout_park_nanos(), 0);
assert_eq!(metrics.short_wait_ratio_bps(), 0);
assert_eq!(metrics.follower_short_wait_avoidance_bps(), 0);
}
#[test]
fn preemption_metrics_backoff_summary_helpers_compute_expected_values() {
let metrics = PreemptionMetrics {
backoff_timeout_parks_total: 4,
backoff_timeout_nanos_total: 20,
short_wait_le_5ms: 2,
follower_short_wait_skip_le_5ms: 3,
follower_timeout_parks: 1,
..PreemptionMetrics::default()
};
assert_eq!(metrics.avg_timeout_park_nanos(), 5);
assert_eq!(metrics.short_wait_ratio_bps(), 5_000);
assert_eq!(metrics.follower_short_wait_avoidance_bps(), 7_500);
}
#[test]
fn test_three_lane_worker_shutdown() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let workers = scheduler.take_workers();
assert_eq!(workers.len(), 2);
let handles: Vec<_> = workers
.into_iter()
.map(|mut worker| {
std::thread::spawn(move || {
worker.run_loop();
})
})
.collect();
std::thread::sleep(Duration::from_millis(10));
scheduler.shutdown();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_cancel_priority_over_ready() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.inject_ready(TaskId::new_for_test(1, 1), 100);
scheduler.inject_cancel(TaskId::new_for_test(1, 2), 50);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let task1 = worker.try_cancel_work();
assert!(task1.is_some());
assert_eq!(task1.unwrap(), TaskId::new_for_test(1, 2));
let task2 = worker.try_ready_work();
assert!(task2.is_some());
assert_eq!(task2.unwrap(), TaskId::new_for_test(1, 1));
}
#[test]
fn test_cancel_lane_fairness_limit() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, 2);
let cancel_tasks = [
TaskId::new_for_test(1, 1),
TaskId::new_for_test(1, 2),
TaskId::new_for_test(1, 3),
];
let ready_task = TaskId::new_for_test(1, 4);
for &task_id in &cancel_tasks {
scheduler.inject_cancel(task_id, 100);
}
scheduler.inject_ready(ready_task, 50);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let first = worker.next_task().expect("first dispatch");
let second = worker.next_task().expect("second dispatch");
let third = worker.next_task().expect("third dispatch");
let fourth = worker.next_task().expect("fourth dispatch");
assert!(cancel_tasks.contains(&first));
assert!(cancel_tasks.contains(&second));
assert_eq!(third, ready_task);
assert!(cancel_tasks.contains(&fourth));
}
#[test]
fn test_local_cancel_lane_fairness_limit() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, 2);
let cancel_tasks = [
TaskId::new_for_test(1, 11),
TaskId::new_for_test(1, 12),
TaskId::new_for_test(1, 13),
];
let ready_task = TaskId::new_for_test(1, 14);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
{
let mut local = worker.local.lock();
for &task_id in &cancel_tasks {
local.schedule_cancel(task_id, 100);
}
local.schedule(ready_task, 50);
}
let first = worker.next_task().expect("first dispatch");
let second = worker.next_task().expect("second dispatch");
let third = worker.next_task().expect("third dispatch");
let fourth = worker.next_task().expect("fourth dispatch");
assert!(cancel_tasks.contains(&first));
assert!(cancel_tasks.contains(&second));
assert_eq!(third, ready_task);
assert!(cancel_tasks.contains(&fourth));
}
#[test]
fn test_stealing_only_from_ready_lane() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
{
let workers = &scheduler.workers;
let mut local0 = workers[0].local.lock();
local0.schedule_cancel(TaskId::new_for_test(1, 1), 100);
local0.schedule(TaskId::new_for_test(1, 2), 50);
local0.schedule(TaskId::new_for_test(1, 3), 50);
}
let mut workers = scheduler.take_workers().into_iter();
let _ = workers.next().unwrap(); let mut thief_worker = workers.next().unwrap();
let stolen = thief_worker.try_steal();
assert!(stolen.is_some());
let stolen_id = stolen.unwrap();
assert!(
stolen_id == TaskId::new_for_test(1, 2) || stolen_id == TaskId::new_for_test(1, 3),
"Expected ready task, got cancel task"
);
}
#[test]
fn execute_completes_task_and_schedules_waiter() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let waiter_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (waiter_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
waiter_id
};
{
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(record) = guard.task_mut(task_id) {
record.add_waiter(waiter_id);
}
}
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = scheduler.take_workers().into_iter().next().unwrap();
worker.execute(task_id);
let completed = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.task(task_id)
.is_none();
assert!(completed, "task should be removed after completion");
let scheduled_task = worker.global.pop_ready().map(|pt| pt.task);
assert_eq!(scheduled_task, Some(waiter_id));
}
#[test]
fn test_try_timed_work_checks_deadline() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::new());
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let task_id = TaskId::new_for_test(1, 1);
let deadline = Time::from_nanos(1000);
scheduler.inject_timed(task_id, deadline);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let result = worker.try_timed_work();
assert!(result.is_none(), "task should not be ready before deadline");
clock.advance(2000);
let result = worker.try_timed_work();
assert_eq!(result, Some(task_id), "task should be ready after deadline");
}
#[test]
fn test_worker_has_timer_driver_from_state() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::new());
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
assert!(
worker.timer_driver.is_some(),
"worker should have timer driver from state"
);
let timer = worker.timer_driver.as_ref().unwrap();
assert_eq!(timer.now(), Time::ZERO, "timer should start at zero");
clock.advance(1000);
assert_eq!(
timer.now(),
Time::from_nanos(1000),
"timer should reflect clock advance"
);
}
#[test]
fn test_scheduler_timer_driver_propagates_to_workers() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let workers = scheduler.take_workers();
assert!(workers[0].timer_driver.is_none());
assert!(workers[1].timer_driver.is_none());
assert!(scheduler.timer_driver.is_none());
}
#[test]
fn test_run_once_processes_timers() {
use crate::time::{TimerDriverHandle, VirtualClock};
use std::sync::atomic::AtomicBool;
use std::task::Waker;
struct TestWaker(AtomicBool);
impl Wake for TestWaker {
fn wake(self: Arc<Self>) {
self.0.store(true, Ordering::SeqCst);
}
}
let clock = Arc::new(VirtualClock::new());
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let timer_driver = scheduler.timer_driver.as_ref().unwrap().clone();
let waker_flag = Arc::new(TestWaker(AtomicBool::new(false)));
let waker = Waker::from(waker_flag.clone());
let _handle = timer_driver.register(Time::from_nanos(500), waker);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
assert!(!waker_flag.0.load(Ordering::SeqCst));
worker.run_once();
assert!(
!waker_flag.0.load(Ordering::SeqCst),
"timer should not fire before deadline"
);
clock.advance(1000);
worker.run_once();
assert!(
waker_flag.0.load(Ordering::SeqCst),
"timer should fire after deadline"
);
}
#[test]
fn test_timed_work_not_due_stays_in_queue() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::new());
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let task_id = TaskId::new_for_test(1, 1);
let deadline = Time::from_nanos(1000);
scheduler.inject_timed(task_id, deadline);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let result = worker.try_timed_work();
assert!(result.is_none());
let peeked = worker.global.pop_timed();
assert!(peeked.is_some(), "task should remain in global queue");
assert_eq!(peeked.unwrap().task, task_id);
}
#[test]
fn test_edf_ordering_from_global_queue() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(1000)));
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let task1 = TaskId::new_for_test(1, 1);
let task2 = TaskId::new_for_test(1, 2);
let task3 = TaskId::new_for_test(1, 3);
scheduler.inject_timed(task2, Time::from_nanos(500)); scheduler.inject_timed(task3, Time::from_nanos(750)); scheduler.inject_timed(task1, Time::from_nanos(250));
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let first = worker.try_timed_work();
assert_eq!(
first,
Some(task1),
"earliest deadline (250) should be first"
);
let second = worker.try_timed_work();
assert_eq!(
second,
Some(task2),
"second earliest deadline (500) should be second"
);
let third = worker.try_timed_work();
assert_eq!(
third,
Some(task3),
"third earliest deadline (750) should be third"
);
}
#[test]
fn test_starvation_avoidance_ready_with_timed() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::new());
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let ready_task = TaskId::new_for_test(1, 1);
scheduler.inject_ready(ready_task, 100);
let timed_task = TaskId::new_for_test(1, 2);
scheduler.inject_timed(timed_task, Time::from_nanos(1000));
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
assert!(worker.try_timed_work().is_none());
assert_eq!(worker.try_ready_work(), Some(ready_task));
}
#[test]
fn test_cancel_priority_over_timed() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(1000)));
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let timed_task = TaskId::new_for_test(1, 1);
scheduler.inject_timed(timed_task, Time::from_nanos(500));
let cancel_task = TaskId::new_for_test(1, 2);
scheduler.inject_cancel(cancel_task, 50);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
assert_eq!(worker.try_cancel_work(), Some(cancel_task));
assert_eq!(worker.try_timed_work(), Some(timed_task));
}
#[test]
fn cancel_waker_injects_cancel_lane() {
let task_id = TaskId::new_for_test(1, 1);
let cx_inner = Arc::new(RwLock::new(CxInner::new(
RegionId::new_for_test(1, 0),
task_id,
Budget::INFINITE,
)));
{
let mut guard = cx_inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
guard.cancel_reason = Some(CancelReason::timeout());
}
let wake_state = Arc::new(crate::record::task::TaskWakeState::new());
let global = Arc::new(GlobalInjector::new());
let parker = Parker::new();
let coordinator = Arc::new(WorkerCoordinator::new(vec![parker], None));
let waker = Waker::from(Arc::new(CancelLaneWaker {
task_id,
default_priority: Budget::INFINITE.priority,
wake_state,
global: Arc::clone(&global),
coordinator,
cx_inner: Arc::downgrade(&cx_inner),
}));
waker.wake_by_ref();
let task = global.pop_cancel().map(|pt| pt.task);
assert_eq!(task, Some(task_id));
}
#[test]
fn ordinary_waker_observes_fast_cancel_and_injects_cancel_lane() {
let task_id = TaskId::new_for_test(1, 7);
let cx_inner = Arc::new(RwLock::new(CxInner::new(
RegionId::new_for_test(1, 0),
task_id,
Budget::INFINITE,
)));
{
let mut guard = cx_inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
guard.cancel_reason = Some(CancelReason::timeout());
}
let wake_state = Arc::new(crate::record::task::TaskWakeState::new());
let global = Arc::new(GlobalInjector::new());
let parker = Parker::new();
let coordinator = Arc::new(WorkerCoordinator::new(vec![parker], None));
let waker = Waker::from(Arc::new(ThreeLaneWaker {
task_id,
wake_state,
global: Arc::clone(&global),
coordinator,
priority: Budget::INFINITE.priority,
fast_cancel: Arc::clone(&cx_inner.read().fast_cancel),
cx_inner: Arc::downgrade(&cx_inner),
}));
waker.wake_by_ref();
let task = global.pop_cancel().map(|pt| pt.task);
assert_eq!(task, Some(task_id));
assert!(
global.pop_ready().is_none(),
"cancelled task should not be re-enqueued in ready lane"
);
}
#[test]
fn test_inject_ready_dedup_prevents_double_schedule() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.inject_ready(task_id, 100);
assert!(
scheduler.global.has_ready_work(),
"first inject should add to queue"
);
scheduler.inject_ready(task_id, 100);
let first = scheduler.global.pop_ready();
assert!(first.is_some(), "first pop should succeed");
assert_eq!(first.unwrap().task, task_id);
let second = scheduler.global.pop_ready();
assert!(second.is_none(), "second pop should fail (deduplicated)");
}
#[test]
fn test_inject_cancel_allows_duplicates_for_priority() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.inject_cancel(task_id, 100);
assert!(scheduler.global.has_cancel_work());
scheduler.inject_cancel(task_id, 100);
let first = scheduler.global.pop_cancel();
assert!(first.is_some());
let second = scheduler.global.pop_cancel();
assert!(second.is_some(), "cancel inject always injects");
let third = scheduler.global.pop_cancel();
assert!(third.is_none());
}
#[test]
fn test_inject_cancel_promotes_ready_task() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.inject_ready(task_id, 50);
assert!(scheduler.global.has_ready_work());
assert!(!scheduler.global.has_cancel_work());
scheduler.inject_cancel(task_id, 100);
assert!(
scheduler.global.has_cancel_work(),
"Task should be promoted to cancel lane"
);
}
#[test]
fn test_spawn_local_not_stolen() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let mut worker_pool = scheduler.take_workers();
let local_ready_0 = Arc::clone(&worker_pool[0].local_ready);
let mut stealer_worker = worker_pool.pop().unwrap();
let task_id = TaskId::new_for_test(1, 0);
{
let _guard = ScopedLocalReady::new(Arc::clone(&local_ready_0));
assert!(
schedule_local_task(task_id),
"schedule_local_task should succeed"
);
}
{
let queue = local_ready_0.lock();
assert_eq!(queue.len(), 1);
assert_eq!(queue[0], task_id);
drop(queue);
}
let stolen = stealer_worker.try_steal();
assert!(stolen.is_none(), "Local task should not be stolen");
}
#[test]
fn test_local_cancel_removes_from_local_ready() {
let task_id = TaskId::new_for_test(1, 0);
let local_ready = Arc::new(LocalReadyQueue::new(VecDeque::from([task_id])));
let local = Arc::new(Mutex::new(PriorityScheduler::new()));
let wake_state = Arc::new(TaskWakeState::new());
let cx_inner = Arc::new(RwLock::new(CxInner::new(
RegionId::new_for_test(1, 0),
task_id,
Budget::INFINITE,
)));
{
let mut guard = cx_inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
guard.cancel_reason = Some(CancelReason::new(CancelKind::User));
}
let waker = ThreeLaneLocalCancelWaker {
task_id,
default_priority: 10,
wake_state: Arc::clone(&wake_state),
local: Arc::clone(&local),
local_ready: Arc::clone(&local_ready),
parker: Parker::new(),
cx_inner: Arc::downgrade(&cx_inner),
};
waker.schedule();
let queue = local_ready.lock();
assert!(
!queue.contains(&task_id),
"local_ready should not retain cancelled task"
);
drop(queue);
assert!(
local.lock().is_in_cancel_lane(task_id),
"task should be promoted to cancel lane"
);
}
#[test]
fn ordinary_local_waker_promotes_cancelled_task_out_of_local_ready() {
let task_id = TaskId::new_for_test(1, 3);
let local_ready = Arc::new(LocalReadyQueue::new(VecDeque::from([task_id])));
let local = Arc::new(Mutex::new(PriorityScheduler::new()));
let wake_state = Arc::new(TaskWakeState::new());
let cx_inner = Arc::new(RwLock::new(CxInner::new(
RegionId::new_for_test(1, 0),
task_id,
Budget::INFINITE,
)));
{
let mut guard = cx_inner.write();
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
guard.cancel_reason = Some(CancelReason::new(CancelKind::User));
}
let waker = ThreeLaneLocalWaker {
task_id,
priority: 10,
wake_state,
local: Arc::clone(&local),
local_ready: Arc::clone(&local_ready),
parker: Parker::new(),
fast_cancel: Arc::clone(&cx_inner.read().fast_cancel),
cx_inner: Arc::downgrade(&cx_inner),
};
waker.schedule();
let queue = local_ready.lock();
assert!(
!queue.contains(&task_id),
"cancelled local task should be removed from local_ready"
);
drop(queue);
assert!(
local.lock().is_in_cancel_lane(task_id),
"cancelled local task should be promoted to cancel lane"
);
}
#[test]
fn schedule_cancel_on_current_local_removes_local_ready() {
let task_id = TaskId::new_for_test(1, 0);
let local_ready = Arc::new(LocalReadyQueue::new(VecDeque::from([task_id])));
let local = Arc::new(Mutex::new(PriorityScheduler::new()));
let _local_ready_guard = ScopedLocalReady::new(Arc::clone(&local_ready));
let _local_guard = ScopedLocalScheduler::new(Arc::clone(&local));
let scheduled = schedule_cancel_on_current_local(task_id, 7);
assert!(scheduled, "should schedule via current local scheduler");
let queue = local_ready.lock();
assert!(
!queue.contains(&task_id),
"local_ready should not retain cancelled task"
);
drop(queue);
assert!(
local.lock().is_in_cancel_lane(task_id),
"task should be promoted to cancel lane"
);
}
#[test]
fn test_schedule_local_dedup_prevents_double_schedule() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local(task_id, 100);
worker.schedule_local(task_id, 100);
let count = {
let local = worker.local.lock();
local.len()
};
assert_eq!(count, 1, "should have exactly 1 task, not {count}");
}
#[test]
fn test_schedule_local_rejects_local_task() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
let record = guard.task_mut(task_id).expect("task record missing");
record.mark_local();
drop(guard);
task_id
};
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local(task_id, 100);
let popped = worker.local.lock().pop_ready_only();
assert!(popped.is_none(), "local task must not enter ready lane");
assert!(
!worker.local_ready.lock().contains(&task_id),
"schedule_local must not route local tasks"
);
}
#[test]
fn test_schedule_local_timed_rejects_local_task() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
let record = guard.task_mut(task_id).expect("task record missing");
record.mark_local();
drop(guard);
task_id
};
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local_timed(task_id, Time::from_nanos(42));
let popped = worker.local.lock().pop_timed_only(Time::from_nanos(100));
assert!(popped.is_none(), "local task must not enter timed lane");
assert!(
!worker.local_ready.lock().contains(&task_id),
"schedule_local_timed must not route local tasks"
);
}
#[test]
fn test_local_then_global_dedup() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local(task_id, 100);
scheduler.global.inject_ready(task_id, 100);
let local_len = {
let local = worker.local.lock();
local.len()
};
assert_eq!(local_len, 1);
}
#[test]
fn test_multiple_wakes_single_schedule() {
let task_id = TaskId::new_for_test(1, 1);
let wake_state = Arc::new(crate::record::task::TaskWakeState::new());
let global = Arc::new(GlobalInjector::new());
let parker = Parker::new();
let coordinator = Arc::new(WorkerCoordinator::new(vec![parker], None));
let wakers: Vec<_> = (0..10)
.map(|_| {
Waker::from(Arc::new(ThreeLaneWaker {
task_id,
wake_state: Arc::clone(&wake_state),
global: Arc::clone(&global),
coordinator: Arc::clone(&coordinator),
priority: 0,
fast_cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
cx_inner: Weak::new(),
}))
})
.collect();
for waker in &wakers {
waker.wake_by_ref();
}
let first = global.pop_ready();
assert!(first.is_some(), "at least one wake should succeed");
let second = global.pop_ready();
assert!(
second.is_none(),
"only one wake should succeed, dedup should prevent duplicates"
);
}
#[test]
fn test_wake_state_cleared_allows_reschedule() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (task_id, _handle) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
task_id
};
let wake_state = {
let guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard
.task(task_id)
.map(|r| Arc::clone(&r.wake_state))
.expect("task should exist")
};
let scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.inject_ready(task_id, 100);
let first = scheduler.global.pop_ready();
assert!(first.is_some());
wake_state.clear();
scheduler.inject_ready(task_id, 100);
let second = scheduler.global.pop_ready();
assert!(second.is_some(), "should be able to reschedule after clear");
}
#[test]
#[ignore = "stress test; run manually"]
fn stress_test_parker_high_contention() {
use crate::runtime::scheduler::worker::Parker;
use std::sync::atomic::AtomicUsize;
use std::thread;
let parker = Arc::new(Parker::new());
let successful_wakes = Arc::new(AtomicUsize::new(0));
let iterations = 1000;
let thread_count = 50;
let handles: Vec<_> = (0..thread_count)
.map(|i| {
let p = parker.clone();
let wakes = successful_wakes.clone();
thread::spawn(move || {
for j in 0..iterations {
if i % 2 == 0 {
p.park_timeout(Duration::from_millis(10));
wakes.fetch_add(1, Ordering::Relaxed);
} else {
p.unpark();
if j % 10 == 0 {
thread::yield_now();
}
}
}
})
})
.collect();
for h in handles {
h.join().expect("thread should not panic");
}
let total_wakes = successful_wakes.load(Ordering::Relaxed);
assert!(
total_wakes > 0,
"at least some threads should have woken up"
);
}
#[test]
#[ignore = "stress test; run manually"]
fn stress_test_scheduler_inject_while_parking() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let scheduler = Arc::new(ThreeLaneScheduler::new(4, &state));
let injected = Arc::new(AtomicUsize::new(0));
let executed = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(std::sync::Barrier::new(21));
let inject_handles: Vec<_> = (0..20)
.map(|t| {
let s = scheduler.clone();
let inj = injected.clone();
let b = barrier.clone();
std::thread::spawn(move || {
b.wait();
for i in 0..5000 {
let task = TaskId::new_for_test(t * 10000 + i, 0);
s.inject_ready(task, 50);
inj.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
barrier.wait();
std::thread::sleep(Duration::from_millis(100));
let exec = executed.clone();
loop {
if scheduler.global.pop_ready().is_some() {
exec.fetch_add(1, Ordering::Relaxed);
} else {
break;
}
}
for h in inject_handles {
h.join().expect("injector should complete");
}
while scheduler.global.pop_ready().is_some() {
executed.fetch_add(1, Ordering::Relaxed);
}
let total_injected = injected.load(Ordering::Relaxed);
let total_executed = executed.load(Ordering::Relaxed);
assert!(
total_executed > 0,
"should have executed some tasks, got {total_executed}"
);
assert!(
total_injected >= total_executed,
"injected ({total_injected}) should be >= executed ({total_executed})"
);
}
#[test]
#[ignore = "stress test; run manually"]
fn stress_test_work_stealing_fairness() {
use crate::runtime::scheduler::priority::Scheduler as PriorityScheduler;
let producer_queue = Arc::new(Mutex::new(PriorityScheduler::new()));
let stolen_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(std::sync::Barrier::new(12));
{
let mut q = producer_queue.lock();
for i in 0..10000 {
q.schedule(TaskId::new_for_test(i, 0), 50);
}
}
let stealer_handles: Vec<_> = (0..10)
.map(|_| {
let q = producer_queue.clone();
let stolen = stolen_count.clone();
let b = barrier.clone();
std::thread::spawn(move || {
b.wait();
let mut local_stolen = 0;
loop {
let task = {
let Some(mut guard) = q.try_lock() else {
continue;
};
let batch = guard.steal_ready_batch(4);
if batch.is_empty() {
None
} else {
Some(batch.len())
}
};
match task {
Some(count) => {
local_stolen += count;
std::thread::yield_now();
}
None => break,
}
}
stolen.fetch_add(local_stolen, Ordering::Relaxed);
})
})
.collect();
let q = producer_queue.clone();
let b = barrier.clone();
let producer = std::thread::spawn(move || {
b.wait();
for i in 10000..15000 {
let mut guard = q.lock();
guard.schedule(TaskId::new_for_test(i, 0), 50);
drop(guard);
std::thread::yield_now();
}
});
barrier.wait();
producer.join().expect("producer should complete");
for h in stealer_handles {
h.join().expect("stealer should complete");
}
let mut remaining = 0;
{
let mut q = producer_queue.lock();
while q.pop().is_some() {
remaining += 1;
}
}
let total_stolen = stolen_count.load(Ordering::Relaxed);
let total = total_stolen + remaining;
assert!(
total >= 14000, "should handle most tasks, got {total}"
);
}
#[test]
#[ignore = "stress test; run manually"]
fn stress_test_global_queue_contention() {
let global = Arc::new(GlobalInjector::new());
let spawned = Arc::new(AtomicUsize::new(0));
let consumed = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(std::sync::Barrier::new(61));
let spawn_handles: Vec<_> = (0..50)
.map(|t| {
let g = global.clone();
let s = spawned.clone();
let b = barrier.clone();
std::thread::spawn(move || {
b.wait();
for i in 0..2000 {
let task = TaskId::new_for_test(t * 100_000 + i, 0);
g.inject_ready(task, 50);
s.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
let consumer_handles: Vec<_> = (0..10)
.map(|_| {
let g = global.clone();
let c = consumed.clone();
let b = barrier.clone();
std::thread::spawn(move || {
b.wait();
let mut local = 0;
let mut empty_streak = 0;
loop {
if g.pop_ready().is_some() {
local += 1;
empty_streak = 0;
} else {
empty_streak += 1;
if empty_streak > 1000 {
break;
}
std::thread::yield_now();
}
}
c.fetch_add(local, Ordering::Relaxed);
})
})
.collect();
barrier.wait();
for h in spawn_handles {
h.join().expect("spawner should complete");
}
std::thread::sleep(Duration::from_millis(100));
for h in consumer_handles {
h.join().expect("consumer should complete");
}
while global.pop_ready().is_some() {
consumed.fetch_add(1, Ordering::Relaxed);
}
let total_spawned = spawned.load(Ordering::Relaxed);
let total_consumed = consumed.load(Ordering::Relaxed);
assert_eq!(total_spawned, 100_000, "should spawn exactly 100k tasks");
assert!(
total_consumed >= 99_000, "should consume most tasks, got {total_consumed}"
);
}
#[test]
fn test_round_robin_wakeup_distribution() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let scheduler = ThreeLaneScheduler::new(4, &state);
let initial = scheduler.coordinator.next_wake.load(Ordering::Relaxed);
assert_eq!(initial, 0, "next_wake should start at 0");
for i in 0..8 {
scheduler.wake_one();
let current = scheduler.coordinator.next_wake.load(Ordering::Relaxed);
assert_eq!(current, i + 1, "next_wake should increment on each wake");
}
let final_val = scheduler.coordinator.next_wake.load(Ordering::Relaxed);
assert_eq!(final_val, 8, "next_wake should be 8 after 8 wakes");
}
#[test]
fn test_coordinator_non_power_of_two_round_robin() {
let parkers: Vec<Parker> = (0..3).map(|_| Parker::new()).collect();
let coordinator = WorkerCoordinator::new(parkers, None);
assert!(
coordinator.mask.is_none(),
"3 workers should use modulo path, not bitmask"
);
for cycle in 0..3 {
for expected_slot in 0..3 {
let idx = coordinator.next_wake.load(Ordering::Relaxed);
let slot = idx % 3;
assert_eq!(
slot, expected_slot,
"cycle {cycle}, idx {idx} should wake slot {expected_slot}"
);
coordinator.wake_one();
}
}
}
#[test]
fn test_coordinator_power_of_two_uses_bitmask() {
let parkers: Vec<Parker> = (0..4).map(|_| Parker::new()).collect();
let coordinator = WorkerCoordinator::new(parkers, None);
assert_eq!(
coordinator.mask,
Some(3),
"4 workers should use bitmask 0b11"
);
for i in 0u64..8 {
let idx = coordinator.next_wake.load(Ordering::Relaxed);
assert_eq!(idx & 3, (i as usize) % 4);
coordinator.wake_one();
}
}
#[test]
fn test_coordinator_single_worker() {
let parkers = vec![Parker::new()];
let coordinator = WorkerCoordinator::new(parkers, None);
assert_eq!(coordinator.mask, Some(0));
for _ in 0..10 {
coordinator.wake_one();
}
}
#[test]
fn test_coordinator_zero_workers_is_noop() {
let coordinator = WorkerCoordinator::new(vec![], None);
assert!(coordinator.mask.is_none());
coordinator.wake_one();
coordinator.wake_all();
}
#[test]
fn test_default_cancel_streak_limit_fairness() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
for i in 0..20 {
scheduler.inject_cancel(TaskId::new_for_test(1, i), 100);
}
let ready_task = TaskId::new_for_test(1, 99);
scheduler.inject_ready(ready_task, 50);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let mut dispatch_order = Vec::new();
for _ in 0..21 {
if let Some(task) = worker.next_task() {
dispatch_order.push(task);
}
}
let ready_pos = dispatch_order
.iter()
.position(|t| *t == ready_task)
.expect("ready task must be dispatched");
assert!(
ready_pos <= DEFAULT_CANCEL_STREAK_LIMIT,
"ready task at position {ready_pos} must appear within \
cancel_streak_limit ({DEFAULT_CANCEL_STREAK_LIMIT}) + 1 dispatches"
);
let metrics = worker.preemption_metrics();
assert!(
metrics.fairness_yields > 0,
"should have fairness yields with 20 cancel + 1 ready"
);
assert!(
metrics.max_cancel_streak <= DEFAULT_CANCEL_STREAK_LIMIT,
"max cancel streak {} should not exceed default limit {}",
metrics.max_cancel_streak,
DEFAULT_CANCEL_STREAK_LIMIT
);
}
#[test]
fn test_region_quiescence_all_tasks_complete() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id1 = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
id
};
let task_id2 = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
id
};
assert!(
!state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_quiescent(),
"should not be quiescent with live tasks"
);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.inject_ready(task_id1, 100);
scheduler.inject_ready(task_id2, 100);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.execute(task_id1);
worker.execute(task_id2);
let guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert!(
guard.task(task_id1).is_none(),
"task1 should be removed after completion"
);
assert!(
guard.task(task_id2).is_none(),
"task2 should be removed after completion"
);
drop(guard);
}
#[test]
fn test_governor_disabled_returns_no_preference() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
assert!(worker.governor.is_none(), "default has no governor");
let suggestion = worker.governor_suggest();
assert_eq!(suggestion, SchedulingSuggestion::NoPreference);
}
#[test]
fn test_governor_enabled_quiescent_returns_no_preference() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
assert!(worker.governor.is_some(), "governor enabled");
let suggestion = worker.governor_suggest();
assert_eq!(suggestion, SchedulingSuggestion::NoPreference);
}
#[test]
fn test_governor_independent_live_tasks_do_not_force_drain_obligations() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let _ = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
let _ = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let suggestion = worker.governor_suggest();
assert_eq!(
suggestion,
SchedulingSuggestion::NoPreference,
"independent live tasks should not be treated as a trapped wait deadlock"
);
}
#[test]
fn test_governor_single_live_task_without_wait_edges_skips_spectral_monitor() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let _ = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let suggestion = worker.governor_suggest();
assert_eq!(suggestion, SchedulingSuggestion::NoPreference);
assert_eq!(
worker
.spectral_monitor
.as_ref()
.expect("governor should install spectral monitor")
.history_len(),
0,
"benign singleton live-task states should not feed spectral history"
);
}
#[test]
fn test_governor_single_task_self_cycle_updates_spectral_monitor() {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (task_id, _handle) = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
state.task_mut(task_id).expect("task").waiters.push(task_id);
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let suggestion = worker.governor_suggest();
assert_eq!(suggestion, SchedulingSuggestion::DrainObligations);
assert_eq!(
worker
.spectral_monitor
.as_ref()
.expect("governor should install spectral monitor")
.history_len(),
1,
"single-node trapped self-cycles should still update the spectral monitor"
);
}
#[test]
fn test_governor_meet_deadlines_dispatches_timed_first() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(999_000_000)));
let mut state = RuntimeState::new();
state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
state.now = Time::from_nanos(999_000_000);
let root = state.create_root_region(Budget::unlimited());
let (_task_id, _handle) = state
.create_task(root, Budget::with_deadline_ns(1_000_000_000), async {})
.expect("create task");
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let cancel_task = TaskId::new_for_test(1, 10);
let timed_task = TaskId::new_for_test(1, 11);
scheduler.inject_cancel(cancel_task, 100);
scheduler.inject_timed(timed_task, Time::from_nanos(500_000_000));
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let first = worker.next_task();
assert_eq!(
first,
Some(timed_task),
"timed should be dispatched first under MeetDeadlines"
);
let second = worker.next_task();
assert_eq!(
second,
Some(cancel_task),
"cancel follows timed under MeetDeadlines"
);
}
#[test]
fn test_governor_drain_obligations_boosts_cancel_streak() {
use crate::record::ObligationKind;
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (task_id, _handle) = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
let _obl = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
state.now = Time::from_nanos(1_000_000_000); let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 2, true, 1);
let c1 = TaskId::new_for_test(1, 20);
let c2 = TaskId::new_for_test(1, 21);
let c3 = TaskId::new_for_test(1, 22);
let c4 = TaskId::new_for_test(1, 23);
let ready = TaskId::new_for_test(1, 24);
scheduler.inject_cancel(c1, 100);
scheduler.inject_cancel(c2, 100);
scheduler.inject_cancel(c3, 100);
scheduler.inject_cancel(c4, 100);
scheduler.inject_ready(ready, 50);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let dispatched: Vec<_> = (0..5).filter_map(|_| worker.next_task()).collect();
assert_eq!(dispatched.len(), 5, "should dispatch all 5 tasks");
let cancel_tasks = [c1, c2, c3, c4];
for (i, &task) in dispatched.iter().take(4).enumerate() {
assert!(
cancel_tasks.contains(&task),
"task {i} should be a cancel task, got {task:?}"
);
}
assert_eq!(
dispatched[4], ready,
"ready task should come after all cancel tasks"
);
let cert = worker.preemption_fairness_certificate();
assert_eq!(cert.base_limit, 2);
assert_eq!(cert.effective_limit, 4);
assert_eq!(cert.observed_max_cancel_streak, 4);
assert!(
cert.base_limit_exceedances > 0,
"boosted mode should exceed base L while remaining within 2L"
);
assert_eq!(cert.effective_limit_exceedances, 0);
assert!(cert.invariant_holds());
}
#[test]
fn test_governor_interval_caches_suggestion() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 4);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
assert_eq!(worker.steps_since_snapshot, 0);
assert_eq!(worker.cached_suggestion, SchedulingSuggestion::NoPreference);
for i in 1..=3u32 {
let s = worker.governor_suggest();
assert_eq!(s, SchedulingSuggestion::NoPreference);
assert_eq!(worker.steps_since_snapshot, i);
}
let s = worker.governor_suggest();
assert_eq!(s, SchedulingSuggestion::NoPreference); assert_eq!(worker.steps_since_snapshot, 0);
}
#[test]
fn test_governor_deterministic_across_workers() {
use crate::record::ObligationKind;
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (task_id, _handle) = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
let _obl = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
state.now = Time::from_nanos(2_000_000_000);
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler = ThreeLaneScheduler::new_with_options(4, &state, 16, true, 1);
let mut workers = scheduler.take_workers();
let suggestions: Vec<_> = workers
.iter_mut()
.map(super::ThreeLaneWorker::governor_suggest)
.collect();
for s in &suggestions {
assert_eq!(
*s, suggestions[0],
"all workers must agree on scheduling suggestion"
);
}
assert_eq!(suggestions[0], SchedulingSuggestion::DrainObligations);
}
fn ready_only_governor_scheduler(
task_count: usize,
chunk_pattern: &[usize],
) -> ThreeLaneScheduler {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let tasks: Vec<_> = (0..task_count)
.map(|_| {
state
.create_task(root, Budget::unlimited(), async {})
.expect("create task")
.0
})
.collect();
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let scheduler = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let mut offset = 0usize;
for &chunk in chunk_pattern {
let end = offset.saturating_add(chunk).min(tasks.len());
for &task_id in &tasks[offset..end] {
scheduler.inject_ready(task_id, 100);
}
offset = end;
if offset == tasks.len() {
break;
}
}
for &task_id in &tasks[offset..] {
scheduler.inject_ready(task_id, 100);
}
scheduler
}
fn governor_total_potential(worker: &ThreeLaneWorker) -> f64 {
let governor = worker.governor.as_ref().expect("governor enabled");
let state = worker
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let ready_depth = worker.local.lock().len();
#[allow(clippy::cast_possible_truncation)]
let snapshot =
StateSnapshot::from_runtime_state(&state).with_ready_queue_depth(ready_depth as u32);
governor.compute_record(&snapshot).total
}
fn collect_ready_drain_potentials(worker: &mut ThreeLaneWorker, dispatches: usize) -> Vec<f64> {
let mut potentials = vec![governor_total_potential(worker)];
for _ in 0..dispatches {
let task_id = worker.next_task().expect("ready task should dispatch");
worker.execute(task_id);
potentials.push(governor_total_potential(worker));
}
potentials
}
#[test]
fn metamorphic_lyapunov_chunked_ready_load_matches_batched_potential_sequence() {
let task_count = 12;
let mut batched = ready_only_governor_scheduler(task_count, &[task_count]);
let mut chunked = ready_only_governor_scheduler(task_count, &[3, 4, 5]);
let mut batched_workers = batched.take_workers();
let mut chunked_workers = chunked.take_workers();
let batched_worker = &mut batched_workers[0];
let chunked_worker = &mut chunked_workers[0];
let batched_potentials = collect_ready_drain_potentials(batched_worker, task_count);
let chunked_potentials = collect_ready_drain_potentials(chunked_worker, task_count);
assert_eq!(
batched_potentials.len(),
chunked_potentials.len(),
"equivalent ready loads should expose the same number of potential samples"
);
for (step, (&batched_total, &chunked_total)) in batched_potentials
.iter()
.zip(&chunked_potentials)
.enumerate()
{
assert!(
(batched_total - chunked_total).abs() <= f64::EPSILON,
"chunking equivalent ready work changed Lyapunov potential at step {step}: batched={batched_total}, chunked={chunked_total}"
);
}
}
#[test]
fn metamorphic_lyapunov_ready_drain_potential_is_monotonic() {
let task_count = 10;
let mut scheduler = ready_only_governor_scheduler(task_count, &[2, 3, 5]);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let potentials = collect_ready_drain_potentials(worker, task_count);
for (step, window) in potentials.windows(2).enumerate() {
assert!(
window[1] <= window[0] + f64::EPSILON,
"draining ready work increased Lyapunov potential between steps {step} and {}: {:?}",
step + 1,
window
);
}
assert!(
potentials
.last()
.is_some_and(|last| last.abs() <= f64::EPSILON),
"fully drained ready workload should converge to zero potential: {potentials:?}"
);
}
#[test]
fn metamorphic_lyapunov_drain_boost_scales_with_base_limit() {
use crate::record::ObligationKind;
for &base_limit in &[2usize, 4, 8] {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (task_id, _handle) = state
.create_task(root, Budget::unlimited(), async {})
.expect("create task");
let _obligation = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
state.now = Time::from_nanos(1_000_000_000);
let state = Arc::new(ContendedMutex::new("runtime_state", state));
let mut scheduler =
ThreeLaneScheduler::new_with_options(1, &state, base_limit, true, 1);
let cancel_count = base_limit * 2 + 1;
let cancel_tasks: Vec<_> = (0..cancel_count)
.map(|i| TaskId::new_for_test(42, i as u32))
.collect();
let ready_task = TaskId::new_for_test(42, 10_000 + base_limit as u32);
for &cancel_task in &cancel_tasks {
scheduler.inject_cancel(cancel_task, 100);
}
scheduler.inject_ready(ready_task, 50);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let dispatched: Vec<_> = (0..=cancel_count)
.map(|_| worker.next_task().expect("dispatch should continue"))
.collect();
let ready_position = dispatched
.iter()
.position(|&task| task == ready_task)
.expect("ready task should dispatch under boosted drain mode");
assert_eq!(
ready_position,
base_limit * 2,
"ready work should dispatch immediately after the boosted cancel streak for base limit {base_limit}: {dispatched:?}"
);
let cert = worker.preemption_fairness_certificate();
assert_eq!(cert.base_limit, base_limit);
assert_eq!(
cert.effective_limit,
base_limit * 2,
"drain mode should scale the effective cancel streak limit linearly"
);
assert_eq!(
cert.observed_max_cancel_streak,
base_limit * 2,
"cancel streak should stabilize exactly at the boosted limit"
);
assert_eq!(
cert.effective_limit_exceedances, 0,
"boosted drain mode must still preserve the effective limit invariant"
);
assert!(cert.invariant_holds());
}
}
#[test]
fn test_governor_backward_compatible_dispatch() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut sched_off = ThreeLaneScheduler::new(1, &state);
let mut sched_on = ThreeLaneScheduler::new_with_options(1, &state, 16, true, 1);
let cancel = TaskId::new_for_test(1, 30);
let ready = TaskId::new_for_test(1, 31);
sched_off.inject_cancel(cancel, 100);
sched_off.inject_ready(ready, 50);
sched_on.inject_cancel(cancel, 100);
sched_on.inject_ready(ready, 50);
let mut workers_off = sched_off.take_workers();
let w_off = &mut workers_off[0];
let mut workers_on = sched_on.take_workers();
let w_on = &mut workers_on[0];
let off_1 = w_off.next_task();
let on_1 = w_on.next_task();
assert_eq!(off_1, on_1, "first dispatch should match");
assert_eq!(off_1, Some(cancel));
let off_2 = w_off.next_task();
let on_2 = w_on.next_task();
assert_eq!(off_2, on_2, "second dispatch should match");
assert_eq!(off_2, Some(ready));
}
#[test]
fn test_preemption_metrics_track_dispatches() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, 4);
for i in 0..3u32 {
scheduler.inject_cancel(TaskId::new_for_test(1, i), 100);
}
for i in 3..5u32 {
scheduler.inject_ready(TaskId::new_for_test(1, i), 50);
}
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
for _ in 0..5 {
worker.next_task();
}
let m = worker.preemption_metrics();
assert_eq!(m.cancel_dispatches, 3);
assert_eq!(m.ready_dispatches, 2);
assert_eq!(m.base_limit_exceedances, 0);
assert_eq!(m.effective_limit_exceedances, 0);
assert_eq!(
m.cancel_dispatches + m.ready_dispatches + m.timed_dispatches,
5
);
}
#[test]
fn test_browser_ready_handoff_limit_bounds_ready_bursts() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.set_browser_ready_handoff_limit(3);
for i in 0..10u32 {
scheduler.inject_ready(TaskId::new_for_test(1, i), 50);
}
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let mut dispatched = 0u32;
let mut current_burst = 0usize;
let mut max_burst = 0usize;
let mut handoff_yields = 0u32;
for _ in 0..64 {
if worker.next_task().is_some() {
dispatched = dispatched.saturating_add(1);
current_burst = current_burst.saturating_add(1);
max_burst = max_burst.max(current_burst);
} else {
if dispatched == 10 {
break;
}
if current_burst == 3 {
handoff_yields = handoff_yields.saturating_add(1);
}
current_burst = 0;
}
}
assert_eq!(dispatched, 10, "all ready tasks should dispatch");
assert!(
max_burst <= 3,
"ready burst should be capped by handoff limit: observed {max_burst}"
);
assert!(
handoff_yields >= 3,
"10 tasks with limit=3 should induce at least 3 handoff yields"
);
assert_eq!(
worker.preemption_metrics().browser_ready_handoff_yields,
u64::from(handoff_yields),
"metrics should track host-turn handoff yields"
);
}
#[test]
fn test_browser_ready_handoff_does_not_mask_cancel_priority() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
scheduler.set_browser_ready_handoff_limit(1);
let ready_a = TaskId::new_for_test(1, 1);
let ready_b = TaskId::new_for_test(1, 2);
let cancel = TaskId::new_for_test(1, 3);
scheduler.inject_ready(ready_a, 50);
scheduler.inject_ready(ready_b, 50);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
assert!(
worker.next_task().is_some(),
"first dispatch should consume a ready task"
);
worker.global.inject_cancel(cancel, 100);
let second = worker.next_task();
assert_eq!(
second,
Some(cancel),
"cancel work must preempt before ready-handoff yielding"
);
assert!(
worker.next_task().is_some(),
"remaining ready task should still dispatch"
);
assert_eq!(
worker.preemption_metrics().browser_ready_handoff_yields,
0,
"cancel preemption should prevent handoff yield in this sequence"
);
}
#[test]
fn test_preemption_fairness_yield_under_cancel_flood() {
let limit: usize = 4;
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, limit);
let cancel_count: u32 = 20;
let ready_count: u32 = 5;
for i in 0..cancel_count {
scheduler.inject_cancel(TaskId::new_for_test(1, i), 100);
}
for i in cancel_count..cancel_count + ready_count {
scheduler.inject_ready(TaskId::new_for_test(1, i), 50);
}
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let total = cancel_count + ready_count;
for _ in 0..total {
worker.next_task();
}
let m = worker.preemption_metrics();
assert_eq!(m.cancel_dispatches, u64::from(cancel_count));
assert_eq!(m.ready_dispatches, u64::from(ready_count));
assert!(
m.max_cancel_streak <= limit,
"max cancel streak {} exceeded limit {}",
m.max_cancel_streak,
limit
);
assert!(m.fairness_yields > 0, "should yield under cancel flood");
assert_eq!(m.base_limit_exceedances, 0);
assert_eq!(m.effective_limit_exceedances, 0);
assert_eq!(
m.max_ready_dispatch_stall, limit,
"ready work should observe the configured stall ceiling under cancel flood"
);
assert_eq!(
m.max_non_cancel_dispatch_stall(),
limit,
"worst observed non-cancel stall should match the ready stall in this workload"
);
let cert = worker.preemption_fairness_certificate();
assert!(cert.invariant_holds());
assert_eq!(cert.ready_stall_bound_steps(), limit + 1);
assert_eq!(cert.observed_max_ready_stall_steps, limit);
assert_eq!(cert.observed_non_cancel_stall_steps(), limit);
let hash_a = cert.witness_hash();
let hash_b = cert.witness_hash();
assert_eq!(hash_a, hash_b, "witness hash should be deterministic");
}
#[test]
fn test_timed_dispatch_stall_recorded_under_cancel_flood() {
let limit: usize = 3;
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(1_000)));
let mut runtime_state = RuntimeState::new();
runtime_state.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock));
let state = Arc::new(ContendedMutex::new("runtime_state", runtime_state));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, limit);
for i in 0..9u32 {
scheduler.inject_cancel(TaskId::new_for_test(11, i), 100);
}
scheduler.inject_timed(TaskId::new_for_test(12, 0), Time::from_nanos(500));
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().expect("worker");
for _ in 0..10 {
worker.next_task();
}
let metrics = worker.preemption_metrics();
assert_eq!(
metrics.max_timed_dispatch_stall, limit,
"due timed work should observe the configured stall ceiling under cancel flood"
);
assert_eq!(metrics.max_non_cancel_dispatch_stall(), limit);
let cert = worker.preemption_fairness_certificate();
assert_eq!(cert.observed_max_timed_stall_steps, limit);
assert_eq!(cert.observed_non_cancel_stall_steps(), limit);
assert!(cert.invariant_holds());
}
#[test]
fn test_global_ready_dispatch_records_local_priority_inversion() {
let state = LocalQueue::test_state(32);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let low_global = TaskId::new_for_test(21, 0);
let high_local = TaskId::new_for_test(22, 0);
scheduler.workers[0].with_task_table(|tt| {
tt.task_mut(low_global)
.expect("global task record missing")
.sched_priority = 10;
tt.task_mut(high_local)
.expect("local task record missing")
.sched_priority = 200;
});
scheduler.inject_ready(low_global, 10);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local(high_local, 200);
let dispatched = worker.next_task();
assert_eq!(
dispatched,
Some(low_global),
"global ready queue currently dispatches before local ready heap"
);
let metrics = worker.preemption_metrics();
assert_eq!(metrics.ready_dispatches, 1);
assert_eq!(metrics.ready_priority_inversions, 1);
assert_eq!(metrics.max_ready_priority_inversion_gap, 190);
let starvation_stats = worker.starvation_stats();
assert_eq!(starvation_stats.total_priority_inversions, 1);
let invariant_stats = worker.invariant_stats();
assert_eq!(
invariant_stats.violations_by_category[&InvariantCategory::PriorityOrdering],
1
);
let violations = worker.invariant_violations();
let invariant_violation = violations
.back()
.expect("priority-order violation should be recorded");
match &invariant_violation.invariant {
SchedulerInvariant::PriorityOrderViolation {
high_priority_task,
high_priority,
low_priority_task,
low_priority,
} => {
assert_eq!(*high_priority_task, high_local);
assert_eq!(*high_priority, 200);
assert_eq!(*low_priority_task, low_global);
assert_eq!(*low_priority, 10);
}
other => panic!("expected priority-order violation, got {other:?}"),
}
let cert = worker.preemption_fairness_certificate();
assert_eq!(cert.ready_priority_inversions, 1);
assert_eq!(cert.max_ready_priority_inversion_gap, 190);
assert!(
!cert.invariant_holds(),
"priority inversions should invalidate the scheduler certificate"
);
}
#[test]
fn test_global_ready_dispatch_skips_inversion_when_local_priority_is_not_higher() {
let state = LocalQueue::test_state(32);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let high_global = TaskId::new_for_test(23, 0);
let lower_local = TaskId::new_for_test(24, 0);
scheduler.workers[0].with_task_table(|tt| {
tt.task_mut(high_global)
.expect("global task record missing")
.sched_priority = 200;
tt.task_mut(lower_local)
.expect("local task record missing")
.sched_priority = 10;
});
scheduler.inject_ready(high_global, 200);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local(lower_local, 10);
let dispatched = worker.next_task();
assert_eq!(dispatched, Some(high_global));
let starvation_stats = worker.starvation_stats();
assert_eq!(starvation_stats.total_priority_inversions, 0);
let metrics = worker.preemption_metrics();
assert_eq!(metrics.ready_priority_inversions, 0);
assert_eq!(metrics.max_ready_priority_inversion_gap, 0);
assert!(worker.invariant_violations().is_empty());
let cert = worker.preemption_fairness_certificate();
assert_eq!(cert.ready_priority_inversions, 0);
assert_eq!(cert.max_ready_priority_inversion_gap, 0);
}
#[test]
fn test_fast_queue_dispatch_records_local_priority_inversion() {
let state = LocalQueue::test_state(32);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let low_fast = TaskId::new_for_test(23, 0);
let high_local = TaskId::new_for_test(24, 0);
worker.with_task_table(|tt| {
tt.task_mut(low_fast)
.expect("fast task record missing")
.sched_priority = 10;
tt.task_mut(high_local)
.expect("local task record missing")
.sched_priority = 200;
});
worker.fast_queue.push(low_fast);
worker.schedule_local(high_local, 200);
let dispatched = worker.next_task();
assert_eq!(
dispatched,
Some(low_fast),
"fast_queue currently dispatches before the local ready heap"
);
let metrics = worker.preemption_metrics();
assert_eq!(metrics.ready_dispatches, 1);
let starvation_stats = worker.starvation_stats();
assert_eq!(starvation_stats.total_priority_inversions, 1);
let invariant_stats = worker.invariant_stats();
assert_eq!(
invariant_stats.violations_by_category[&InvariantCategory::PriorityOrdering],
1
);
}
#[test]
fn fairness_monitor_reports_priority_inversion_details() {
let mut monitor = FairnessMonitor::with_defaults();
let blocked = TaskId::new_for_test(30, 0);
let executing = TaskId::new_for_test(31, 0);
monitor.record_task_enqueue(blocked, 200, 1_000, 2);
monitor.record_task_skip(blocked, executing, 10, 1_250);
let stats = monitor.starvation_stats(1_250);
assert_eq!(stats.total_priority_inversions, 1);
assert_eq!(stats.max_priority_inversion_gap, 190);
let inversion = stats
.latest_priority_inversion
.expect("latest inversion should be reported");
assert_eq!(inversion.blocked_task_id, blocked);
assert_eq!(inversion.blocked_priority, 200);
assert_eq!(inversion.executing_task_id, executing);
assert_eq!(inversion.executing_priority, 10);
assert_eq!(inversion.priority_gap, 190);
assert_eq!(inversion.timestamp_ns, 1_250);
assert_eq!(inversion.duration_ns, 0);
let oldest = stats
.oldest_tracked_task
.expect("blocked task should remain tracked");
assert_eq!(oldest.task_id, blocked);
assert_eq!(oldest.priority, 200);
assert_eq!(oldest.current_lane, 2);
assert_eq!(oldest.skip_count, 1);
assert_eq!(oldest.wait_time_ns, 250);
assert_eq!(oldest.total_wait_time_ns, 250);
}
#[test]
fn starvation_analysis_window_ignores_uninitialized_slots() {
let mut window = StarvationAnalysisWindow::new(16);
let current_time_ns = 500_000_000;
let window_duration_ns = 1_000_000_000;
assert_eq!(
window.events_in_window(window_duration_ns, current_time_ns),
0
);
assert!(!window.is_pattern_detected(10, window_duration_ns, current_time_ns));
for timestamp_ns in (410_000_000..=500_000_000).step_by(10_000_000) {
window.record_event(timestamp_ns);
}
assert_eq!(
window.events_in_window(window_duration_ns, current_time_ns),
10
);
assert!(window.is_pattern_detected(10, window_duration_ns, current_time_ns));
}
#[test]
fn starvation_analysis_window_comprehensive_uninitialized_edge_cases() {
let mut window = StarvationAnalysisWindow::new(8);
assert_eq!(window.events_in_window(1_000_000, 500_000), 0);
assert_eq!(window.events_in_window(u64::MAX, 1_000_000), 0);
assert_eq!(window.events_in_window(0, 0), 0);
window.record_event(1000);
assert_eq!(window.events_in_window(1, 1000), 1); assert_eq!(window.events_in_window(1, 999), 0); assert_eq!(window.events_in_window(1, 1001), 1);
let mut full_window = StarvationAnalysisWindow::new(8);
for i in 0..8 {
full_window.record_event(1000 + i * 100);
}
assert_eq!(full_window.events_in_window(10_000, 2000), 8);
let mut overfull_window = StarvationAnalysisWindow::new(4);
for i in 0..6 {
overfull_window.record_event(1000 + i * 100);
}
assert_eq!(overfull_window.events_in_window(10_000, 2000), 4);
let mut zero_window = StarvationAnalysisWindow::new(3);
zero_window.record_event(0);
zero_window.record_event(100);
assert_eq!(zero_window.events_in_window(200, 150), 2);
let mut pattern_window = StarvationAnalysisWindow::new(16);
assert!(!pattern_window.is_pattern_detected(10, 1_000_000, 500_000));
for i in 0..10 {
pattern_window.record_event(400_000 + i * 10_000);
}
assert!(pattern_window.is_pattern_detected(10, 1_000_000, 500_000));
assert!(!pattern_window.is_pattern_detected(11, 1_000_000, 500_000));
}
#[test]
fn fairness_monitor_integration_tracks_enqueue_and_dispatch() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let task1 = TaskId::new_for_test(100, 1);
let task2 = TaskId::new_for_test(101, 1);
worker.with_fairness_monitor(|monitor| {
assert_eq!(monitor.tracked_tasks.len(), 0);
});
worker.schedule_local(task1, 50);
worker.schedule_local_cancel(task2, 100);
worker.with_fairness_monitor(|monitor| {
assert_eq!(monitor.tracked_tasks.len(), 2);
assert!(monitor.tracked_tasks.contains_key(&task1));
assert!(monitor.tracked_tasks.contains_key(&task2));
assert_eq!(monitor.tracked_tasks[&task1].current_lane, 2); assert_eq!(monitor.tracked_tasks[&task2].current_lane, 0); });
if let Some(dispatched_task) = worker.next_task() {
worker.with_fairness_monitor(|monitor| {
assert_eq!(monitor.tracked_tasks.len(), 1);
assert!(!monitor.tracked_tasks.contains_key(&dispatched_task));
});
}
}
#[test]
fn comprehensive_invariant_monitor_integration() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let task1 = TaskId::new_for_test(100, 1);
let task2 = TaskId::new_for_test(101, 1);
let task3 = TaskId::new_for_test(102, 1);
assert!(worker.invariant_monitor.lock().tracked_tasks().is_empty());
assert_eq!(worker.invariant_stats().operations_monitored, 0);
worker.schedule_local(task1, 50); worker.schedule_local_cancel(task2, 100); worker.schedule_local_timed(task3, Time::from_nanos(5000));
let tracked = worker.invariant_monitor.lock().tracked_tasks();
assert_eq!(tracked.len(), 3);
let task1_tracked = tracked.iter().find(|t| t.task_id == task1).unwrap();
let task2_tracked = tracked.iter().find(|t| t.task_id == task2).unwrap();
let task3_tracked = tracked.iter().find(|t| t.task_id == task3).unwrap();
assert!(
task1_tracked
.queues
.contains(&"local_ready_heap".to_string())
);
assert!(
task2_tracked
.queues
.contains(&"local_cancel_queue".to_string())
);
assert!(
task3_tracked
.queues
.contains(&"local_timed_queue".to_string())
);
if let Some(dispatched_task) = worker.next_task() {
assert_eq!(dispatched_task, task2);
let tracked_after = worker.invariant_monitor.lock().tracked_tasks();
assert_eq!(tracked_after.len(), 2);
assert!(!tracked_after.iter().any(|t| t.task_id == task2));
}
worker.verify_scheduler_invariants();
assert!(worker.invariant_violations().is_empty());
worker.record_task_completion(task2);
worker.record_task_cancellation(task1);
let stats = worker.invariant_stats();
assert!(stats.operations_monitored > 0);
assert_eq!(stats.violations_by_severity, [0, 0, 0, 0]); }
#[test]
fn local_cancel_promotion_does_not_trigger_multiple_queue_violation() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let task = TaskId::new_for_test(400, 1);
worker.schedule_local(task, 10);
worker.schedule_local_cancel(task, 90);
let violations = worker.invariant_violations();
assert!(
violations.iter().all(|violation| {
!matches!(
violation.invariant,
SchedulerInvariant::TaskInMultipleQueues { .. }
)
}),
"cancel promotion should relocate queue membership, not fabricate multiple-queue violations: {violations:?}"
);
let tracked = worker.invariant_monitor.lock().tracked_tasks();
assert_eq!(tracked.len(), 1);
assert_eq!(tracked[0].queues, vec!["local_cancel_queue".to_string()]);
assert_eq!(tracked[0].priority, 90);
}
#[test]
fn stolen_batch_requeues_do_not_trigger_multiple_queue_violation() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(2, &state);
scheduler.set_steal_batch_size(2);
let mut workers = scheduler.take_workers();
let ready_a = TaskId::new_for_test(1, 0);
let ready_b = TaskId::new_for_test(2, 0);
workers[0].schedule_local(ready_a, 20);
workers[0].schedule_local(ready_b, 10);
let stolen = workers[1].try_steal();
assert!(stolen.is_some(), "steal should produce work");
let violations = workers[1].invariant_violations();
assert!(
violations.iter().all(|violation| {
!matches!(
violation.invariant,
SchedulerInvariant::TaskInMultipleQueues { .. }
)
}),
"steal batch transfer should move queue membership cleanly: {violations:?}"
);
}
#[test]
fn verify_scheduler_invariants_does_not_report_false_queue_mismatches() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let local_task = TaskId::new_for_test(300, 1);
let fast_task = TaskId::new_for_test(301, 1);
worker.local_ready.lock().push_back(local_task);
worker.fast_queue.push(fast_task);
worker.verify_scheduler_invariants();
let queue_mismatches: Vec<_> = worker
.invariant_violations()
.into_iter()
.filter(|violation| {
matches!(
violation.invariant,
SchedulerInvariant::QueueDepthMismatch { .. }
)
})
.collect();
assert!(
queue_mismatches.is_empty(),
"queue verifier should not fabricate mismatches for exact queue snapshots: {queue_mismatches:?}"
);
}
#[test]
fn invariant_monitor_detects_violations() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let low_priority_task = TaskId::new_for_test(200, 1);
let high_priority_task = TaskId::new_for_test(201, 1);
worker.invariant_monitor.lock().verify_priority_ordering(
low_priority_task,
10, high_priority_task,
50, Time::from_nanos(1000),
);
let violations = worker.invariant_violations();
assert_eq!(violations.len(), 1);
let violation = &violations[0];
match &violation.invariant {
SchedulerInvariant::PriorityOrderViolation {
high_priority_task: hp_task,
high_priority: hp,
low_priority_task: lp_task,
low_priority: lp,
} => {
assert_eq!(*hp_task, high_priority_task);
assert_eq!(*hp, 50);
assert_eq!(*lp_task, low_priority_task);
assert_eq!(*lp, 10);
}
_ => panic!("Expected PriorityOrderViolation"),
}
let stats = worker.invariant_stats();
assert_eq!(stats.violations_by_severity[2], 1); }
#[test]
fn fairness_monitor_reports_oldest_tracked_task_details() {
let mut monitor = FairnessMonitor::with_defaults();
let oldest = TaskId::new_for_test(32, 0);
let newer = TaskId::new_for_test(33, 0);
monitor.record_task_enqueue(oldest, 120, 1_000, 1);
monitor.record_task_enqueue(newer, 90, 1_200, 2);
let stats = monitor.starvation_stats(1_300);
assert_eq!(stats.tracked_tasks_count, 2);
assert_eq!(stats.total_tracked_wait_time_ns, 400);
let oldest = stats
.oldest_tracked_task
.expect("oldest tracked task should be reported");
assert_eq!(oldest.task_id, TaskId::new_for_test(32, 0));
assert_eq!(oldest.priority, 120);
assert_eq!(oldest.current_lane, 1);
assert_eq!(oldest.skip_count, 0);
assert_eq!(oldest.wait_time_ns, 300);
assert_eq!(oldest.total_wait_time_ns, 300);
}
#[test]
fn test_preemption_max_streak_bounded_by_limit() {
for limit in [1, 2, 4, 8, 16] {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, limit);
let n_cancel = (limit * 3) as u32;
for i in 0..n_cancel {
scheduler.inject_cancel(TaskId::new_for_test(1, i), 100);
}
scheduler.inject_ready(TaskId::new_for_test(1, n_cancel), 50);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
for _ in 0..=n_cancel {
worker.next_task();
}
let m = worker.preemption_metrics();
assert!(
m.max_cancel_streak <= limit,
"limit={}: max_cancel_streak {} exceeded",
limit,
m.max_cancel_streak,
);
assert_eq!(m.base_limit_exceedances, 0);
assert_eq!(m.effective_limit_exceedances, 0);
}
}
#[test]
fn test_preemption_fallback_cancel_when_only_cancel_work() {
let limit: usize = 2;
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, limit);
for i in 0..6u32 {
scheduler.inject_cancel(TaskId::new_for_test(1, i), 100);
}
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
let mut count = 0u32;
for _ in 0..6 {
if worker.next_task().is_some() {
count += 1;
}
}
assert_eq!(count, 6);
let m = worker.preemption_metrics();
assert_eq!(m.cancel_dispatches, 6);
assert!(m.fallback_cancel_dispatches > 0, "should use fallback path");
assert_eq!(m.effective_limit_exceedances, 0);
assert_eq!(m.base_limit_exceedances, 0);
}
#[test]
fn test_fallback_cancel_streak_counts_toward_limit() {
let limit: usize = 3;
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, limit);
for i in 0..20u32 {
scheduler.inject_cancel(TaskId::new_for_test(1, i), 100);
}
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().unwrap();
for _ in 0..=limit {
assert!(worker.next_task().is_some(), "should dispatch cancel");
}
let ready_task = TaskId::new_for_test(99, 0);
worker.fast_queue.push(ready_task);
let mut dispatches_until_ready = 0;
for _ in 0..limit {
let task = worker.next_task().expect("should have work");
dispatches_until_ready += 1;
if task == ready_task {
break;
}
}
let last_task = worker.fast_queue.pop();
let ready_was_dispatched = dispatches_until_ready <= limit
&& (last_task.is_none() || last_task != Some(ready_task));
assert!(
ready_was_dispatched,
"ready task should be dispatched within {limit} steps after fallback, \
took {dispatches_until_ready}"
);
}
#[test]
fn test_preemption_fairness_certificate_deterministic() {
fn run(limit: usize) -> PreemptionFairnessCertificate {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, limit);
for i in 0..12u32 {
scheduler.inject_cancel(TaskId::new_for_test(7, i), 100);
}
for i in 12..18u32 {
scheduler.inject_ready(TaskId::new_for_test(7, i), 50);
}
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().expect("worker");
for _ in 0..18 {
worker.next_task();
}
worker.preemption_fairness_certificate()
}
let cert_a = run(4);
let cert_b = run(4);
assert_eq!(cert_a, cert_b, "certificate should be deterministic");
assert_eq!(
cert_a.witness_hash(),
cert_b.witness_hash(),
"witness hash should match for identical dispatch traces"
);
assert!(cert_a.invariant_holds());
}
fn replay_adaptive_cancel_flood_trace(seed: u64) -> Vec<TaskId> {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, 4);
scheduler.set_adaptive_cancel_streak(true, 1);
let timed_task = TaskId::new_for_test(77, 1);
let ready_task = TaskId::new_for_test(77, 2);
for i in 0..24u32 {
scheduler.inject_cancel(TaskId::new_for_test(77, 100 + i), 100);
}
scheduler.inject_timed(timed_task, Time::ZERO);
scheduler.inject_ready(ready_task, 50);
let mut workers = scheduler.take_workers().into_iter();
let mut worker = workers.next().expect("worker");
worker.rng = crate::util::DetRng::new(seed);
let adaptive_limit = {
let policy = worker
.adaptive_cancel_policy
.as_mut()
.expect("adaptive policy enabled");
policy.selected_arm = 0; policy.current_limit()
};
worker.preemption_metrics.adaptive_current_limit = adaptive_limit;
let mut trace = Vec::new();
for _ in 0..12 {
let Some(task) = worker.next_task() else {
break;
};
trace.push(task);
if trace.contains(&timed_task) && trace.contains(&ready_task) {
break;
}
}
let timed_index = trace
.iter()
.position(|task| *task == timed_task)
.expect("timed lane should make progress under cancel flood");
let ready_index = trace
.iter()
.position(|task| *task == ready_task)
.expect("ready lane should make progress under cancel flood");
assert!(
timed_index < ready_index,
"timed lane should preempt ready once fairness yields under cancel flood: {trace:?}"
);
assert!(
ready_index < adaptive_limit * 2 + 2,
"ready lane should progress within a bounded number of dispatches under cancel flood: {trace:?}"
);
assert!(
worker.preemption_fairness_certificate().invariant_holds(),
"adaptive cancel flood should preserve fairness certificate invariants"
);
trace
}
#[test]
fn metamorphic_adaptive_cancel_flood_progresses_lower_lanes_deterministically() {
let seed = 0xC0DE_CAFE_BEEF_0603;
let trace_a = replay_adaptive_cancel_flood_trace(seed);
let trace_b = replay_adaptive_cancel_flood_trace(seed);
assert_eq!(
trace_a, trace_b,
"same-seed adaptive cancel flood should replay the same dispatch trace"
);
assert!(
trace_a.contains(&TaskId::new_for_test(77, 1))
&& trace_a.contains(&TaskId::new_for_test(77, 2)),
"same-seed trace should include both lower-priority lanes: {trace_a:?}"
);
}
#[test]
fn test_local_queue_fast_path() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let scheduler = ThreeLaneScheduler::new(1, &state);
let worker_local = scheduler.workers[0].local.clone();
assert!(!scheduler.global.has_ready_work());
{
let _guard = ScopedLocalScheduler::new(worker_local.clone());
scheduler.spawn(TaskId::new_for_test(1, 1), 100);
}
assert!(
!scheduler.global.has_ready_work(),
"Global queue should be empty"
);
let count = {
let local = worker_local.lock();
local.len()
};
assert_eq!(count, 1, "Local queue should have 1 task");
{
let _guard = ScopedLocalScheduler::new(worker_local.clone());
scheduler.wake(TaskId::new_for_test(1, 2), 100);
}
assert!(!scheduler.global.has_ready_work());
let count = {
let local = worker_local.lock();
local.len()
};
assert_eq!(count, 2, "Local queue should have 2 tasks");
scheduler.spawn(TaskId::new_for_test(1, 3), 100);
assert!(
scheduler.global.has_ready_work(),
"Global queue should have task"
);
}
#[test]
fn fast_queue_spawn_prefers_local_queue_tls() {
let state = LocalQueue::test_state(10);
let scheduler = ThreeLaneScheduler::new(1, &state);
let fast_queue = scheduler.workers[0].fast_queue.clone();
let priority_sched = scheduler.workers[0].local.clone();
{
let _sched_guard = ScopedLocalScheduler::new(priority_sched.clone());
let _queue_guard = LocalQueue::set_current(fast_queue.clone());
scheduler.spawn(TaskId::new_for_test(1, 0), 100);
}
assert!(!fast_queue.is_empty(), "task should be in fast_queue");
let priority_len = priority_sched.lock().len();
assert_eq!(priority_len, 0, "PriorityScheduler should be empty");
assert!(!scheduler.global.has_ready_work(), "global should be empty");
}
#[test]
fn fast_queue_wake_prefers_local_queue_tls() {
let state = LocalQueue::test_state(10);
let scheduler = ThreeLaneScheduler::new(1, &state);
let fast_queue = scheduler.workers[0].fast_queue.clone();
let priority_sched = scheduler.workers[0].local.clone();
{
let _sched_guard = ScopedLocalScheduler::new(priority_sched.clone());
let _queue_guard = LocalQueue::set_current(fast_queue.clone());
scheduler.wake(TaskId::new_for_test(1, 0), 100);
}
assert!(!fast_queue.is_empty(), "task should be in fast_queue");
let priority_len = priority_sched.lock().len();
assert_eq!(priority_len, 0, "PriorityScheduler should be empty");
}
#[test]
fn try_ready_work_drains_fast_queue_first() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.fast_queue.push(TaskId::new_for_test(1, 0));
worker.local.lock().schedule(TaskId::new_for_test(2, 0), 50);
let first = worker.try_ready_work();
assert_eq!(
first,
Some(TaskId::new_for_test(1, 0)),
"fast_queue task should come first"
);
let second = worker.try_ready_work();
assert_eq!(
second,
Some(TaskId::new_for_test(2, 0)),
"PriorityScheduler task should come second"
);
assert!(worker.try_ready_work().is_none());
}
#[test]
fn try_steal_tries_fast_stealers_first() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let fast_task = TaskId::new_for_test(1, 0);
scheduler.workers[0].fast_queue.push(fast_task);
let mut workers = scheduler.take_workers();
let thief = &mut workers[1];
let stolen = thief.try_steal();
assert_eq!(stolen, Some(fast_task), "should steal from fast_queue");
}
#[test]
fn try_steal_falls_back_to_priority_scheduler() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let heap_task = TaskId::new_for_test(1, 1);
scheduler.workers[0].local.lock().schedule(heap_task, 50);
let mut workers = scheduler.take_workers();
let thief = &mut workers[1];
let stolen = thief.try_steal();
assert_eq!(
stolen,
Some(heap_task),
"should fall back to PriorityScheduler steal"
);
}
#[test]
fn fast_queue_no_loss_no_dup_single_worker() {
let state = LocalQueue::test_state(255);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let count = 256u32;
for i in 0..count {
worker.fast_queue.push(TaskId::new_for_test(i, 0));
}
let mut seen = std::collections::HashSet::new();
while let Some(task) = worker.try_ready_work() {
assert!(seen.insert(task), "duplicate task: {task:?}");
}
assert_eq!(seen.len(), count as usize, "all tasks should be popped");
}
#[test]
fn fast_queue_no_loss_no_dup_two_workers_stealing() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrd};
use std::sync::{Arc as StdArc, Barrier};
use std::thread;
let total = 512usize;
let state = LocalQueue::test_state((total - 1) as u32);
let mut scheduler = ThreeLaneScheduler::new(2, &state);
for i in 0..total {
scheduler.workers[0]
.fast_queue
.push(TaskId::new_for_test(i as u32, 0));
}
let mut workers = scheduler.take_workers();
let w0 = workers.remove(0);
let mut w1 = workers.remove(0);
let counts: StdArc<Vec<AtomicUsize>> =
StdArc::new((0..total).map(|_| AtomicUsize::new(0)).collect());
let barrier = StdArc::new(Barrier::new(2));
let c0 = StdArc::clone(&counts);
let b0 = StdArc::clone(&barrier);
let t0 = thread::spawn(move || {
b0.wait();
while let Some(task) = w0.fast_queue.pop() {
let idx = task.0.index() as usize;
c0[idx].fetch_add(1, AtomicOrd::SeqCst);
thread::yield_now();
}
});
let c1 = StdArc::clone(&counts);
let b1 = StdArc::clone(&barrier);
let t1 = thread::spawn(move || {
b1.wait();
loop {
let stolen = w1.try_steal();
if let Some(task) = stolen {
let idx = task.0.index() as usize;
c1[idx].fetch_add(1, AtomicOrd::SeqCst);
thread::yield_now();
} else {
break;
}
}
});
t0.join().expect("owner join");
t1.join().expect("thief join");
let mut total_seen = 0usize;
for (idx, count) in counts.iter().enumerate() {
let v = count.load(AtomicOrd::SeqCst);
assert_eq!(v, 1, "task {idx} seen {v} times (expected 1)");
total_seen += v;
}
assert_eq!(total_seen, total);
}
#[test]
fn fast_queue_schedule_on_current_local_prefers_fast() {
let state = LocalQueue::test_state(10);
let scheduler = ThreeLaneScheduler::new(1, &state);
let fast_queue = scheduler.workers[0].fast_queue.clone();
let priority_sched = scheduler.workers[0].local.clone();
{
let _sched_guard = ScopedLocalScheduler::new(priority_sched.clone());
let _queue_guard = LocalQueue::set_current(fast_queue.clone());
let ok = schedule_on_current_local(TaskId::new_for_test(1, 0), 100);
assert!(ok);
}
assert!(!fast_queue.is_empty(), "should be in fast_queue");
assert_eq!(
priority_sched.lock().len(),
0,
"PriorityScheduler should be empty"
);
}
#[test]
fn fast_queue_cancel_timed_bypass_fast_path() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let cancel_task = TaskId::new_for_test(1, 1);
let timed_task = TaskId::new_for_test(1, 2);
scheduler.inject_cancel(cancel_task, 100);
scheduler.inject_timed(timed_task, Time::from_nanos(500));
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
assert!(
worker.fast_queue.is_empty(),
"fast_queue should not have cancel/timed tasks"
);
assert!(scheduler.global.has_cancel_work());
}
#[test]
fn fast_queue_waker_uses_local_ready_on_same_thread() {
let task_id = TaskId::new_for_test(1, 0);
let wake_state = Arc::new(crate::record::task::TaskWakeState::new());
let priority_sched = Arc::new(Mutex::new(PriorityScheduler::new()));
let parker = Parker::new();
let local_ready = Arc::new(LocalReadyQueue::new(VecDeque::new()));
let waker = Waker::from(Arc::new(ThreeLaneLocalWaker {
task_id,
priority: 0,
wake_state: Arc::clone(&wake_state),
local: Arc::clone(&priority_sched),
local_ready: Arc::clone(&local_ready),
parker,
fast_cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
cx_inner: Weak::new(),
}));
let _ready_guard = ScopedLocalReady::new(Arc::clone(&local_ready));
waker.wake_by_ref();
{
let queue = local_ready.lock();
assert_eq!(queue.len(), 1, "local_ready should have 1 task");
assert_eq!(queue[0], task_id);
drop(queue);
}
assert_eq!(
priority_sched.lock().len(),
0,
"PriorityScheduler should be empty"
);
}
#[test]
fn fast_queue_waker_falls_back_to_local_ready_cross_thread() {
let task_id = TaskId::new_for_test(1, 1);
let wake_state = Arc::new(crate::record::task::TaskWakeState::new());
let priority_sched = Arc::new(Mutex::new(PriorityScheduler::new()));
let parker = Parker::new();
let local_ready = Arc::new(LocalReadyQueue::new(VecDeque::new()));
let waker = Waker::from(Arc::new(ThreeLaneLocalWaker {
task_id,
priority: 0,
wake_state: Arc::clone(&wake_state),
local: Arc::clone(&priority_sched),
local_ready: Arc::clone(&local_ready),
parker,
fast_cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
cx_inner: Weak::new(),
}));
waker.wake_by_ref();
{
let queue = local_ready.lock();
assert_eq!(queue.len(), 1, "local_ready should have 1 task");
assert_eq!(queue[0], task_id);
drop(queue);
}
}
#[test]
fn fast_queue_stolen_tasks_go_to_thief_fast_queue() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(2, &state);
scheduler.set_steal_batch_size(2);
for i in 0..8u32 {
scheduler.workers[0]
.local
.lock()
.schedule(TaskId::new_for_test(i, 0), 50);
}
let mut workers = scheduler.take_workers();
let thief = &mut workers[1];
let stolen = thief.try_steal();
assert!(stolen.is_some(), "should steal at least one task");
let fast_count = {
let mut count = 0;
while thief.fast_queue.pop().is_some() {
count += 1;
}
count
};
assert_eq!(
fast_count, 1,
"thief's fast_queue should have batch remainder, got {fast_count}"
);
}
#[test]
fn local_ready_queue_drains_before_fast_queue() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let local_task = TaskId::new_for_test(1, 0);
let fast_task = TaskId::new_for_test(2, 0);
worker.local_ready.lock().push_back(local_task);
worker.fast_queue.push(fast_task);
let first = worker.try_ready_work();
assert_eq!(first, Some(local_task), "local_ready should drain first");
let second = worker.try_ready_work();
assert_eq!(second, Some(fast_task), "fast_queue should drain second");
assert!(
worker.try_ready_work().is_none(),
"no more ready work expected"
);
}
#[test]
fn local_ready_queue_preserves_fifo_order() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let first = TaskId::new_for_test(10, 0);
let second = TaskId::new_for_test(11, 0);
let third = TaskId::new_for_test(12, 0);
worker.local_ready.lock().extend([first, second, third]);
assert_eq!(
worker.next_task(),
Some(first),
"first enqueued local task should dispatch first"
);
assert_eq!(
worker.next_task(),
Some(second),
"second enqueued local task should dispatch second"
);
assert_eq!(
worker.next_task(),
Some(third),
"third enqueued local task should dispatch third"
);
}
#[test]
fn local_ready_queue_not_visible_to_fast_stealers() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let mut workers = scheduler.take_workers();
let local_task = TaskId::new_for_test(1, 1);
workers[0].local_ready.lock().push_back(local_task);
let stolen = workers[1].try_steal();
assert!(
stolen.is_none(),
"local_ready tasks must not be stealable, but got {stolen:?}"
);
let drained = workers[0].try_ready_work();
assert_eq!(
drained,
Some(local_task),
"local task should remain on owner worker"
);
}
#[test]
fn local_ready_queue_not_visible_to_priority_stealers() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let mut workers = scheduler.take_workers();
let local_task = TaskId::new_for_test(1, 1);
workers[0].local_ready.lock().push_back(local_task);
let stolen = workers[1].try_steal();
assert!(
stolen.is_none(),
"local_ready tasks must not be stealable via PriorityScheduler"
);
}
#[test]
fn local_ready_survives_concurrent_steal_pressure() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let mut workers = scheduler.take_workers();
let local_tasks: Vec<TaskId> = (1..=10).map(|i| TaskId::new_for_test(1, i)).collect();
{
let mut queue = workers[0].local_ready.lock();
for &task in &local_tasks {
queue.push_back(task);
}
}
for _ in 0..10 {
assert!(
workers[1].try_steal().is_none(),
"steal should fail for local_ready tasks"
);
}
let mut drained = Vec::new();
while let Some(task) = workers[0].try_ready_work() {
drained.push(task);
}
assert_eq!(
drained.len(),
local_tasks.len(),
"all local tasks should be drained by owner"
);
for task in &local_tasks {
assert!(
drained.contains(task),
"local task {task:?} should be in drained set"
);
}
}
#[test]
fn task_record_is_local_default_false() {
use crate::record::task::TaskRecord;
let record = TaskRecord::new(
TaskId::new_for_test(1, 0),
RegionId::new_for_test(0, 0),
Budget::INFINITE,
);
assert!(!record.is_local(), "default should be false");
}
#[test]
fn task_record_mark_local() {
use crate::record::task::TaskRecord;
let mut record = TaskRecord::new(
TaskId::new_for_test(1, 0),
RegionId::new_for_test(0, 0),
Budget::INFINITE,
);
assert!(!record.is_local());
record.mark_local();
assert!(record.is_local(), "mark_local should set is_local");
}
#[test]
fn backoff_loop_wakes_for_local_ready() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let task = TaskId::new_for_test(1, 1);
worker.local_ready.lock().push_back(task);
let found = worker.next_task();
assert_eq!(found, Some(task), "next_task should find local_ready task");
}
#[test]
fn schedule_local_task_uses_tls() {
let queue = Arc::new(LocalReadyQueue::new(VecDeque::new()));
let _guard = ScopedLocalReady::new(Arc::clone(&queue));
let task = TaskId::new_for_test(1, 1);
let scheduled = schedule_local_task(task);
assert!(scheduled, "should succeed when TLS is set");
let tasks = queue.lock();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0], task);
drop(tasks);
}
#[test]
fn try_ready_work_waits_for_local_ready_lock_before_fast_queue() {
let state = LocalQueue::test_state(10);
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let mut worker = workers.remove(0);
let local_task = TaskId::new_for_test(1, 0);
let fast_task = TaskId::new_for_test(2, 0);
worker.local_ready.lock().push_back(local_task);
worker.fast_queue.push(fast_task);
let local_ready = Arc::clone(&worker.local_ready);
let held_guard = local_ready.lock();
let (started_tx, started_rx) = std::sync::mpsc::channel();
let (result_tx, result_rx) = std::sync::mpsc::channel();
let handle = std::thread::spawn(move || {
started_tx.send(()).expect("notify start");
let next = worker.try_ready_work();
result_tx.send(next).expect("send result");
});
started_rx
.recv_timeout(Duration::from_secs(1))
.expect("worker thread should start");
assert!(
result_rx.recv_timeout(Duration::from_millis(50)).is_err(),
"worker should wait for local_ready ownership instead of skipping to fast_queue"
);
drop(held_guard);
let next = result_rx
.recv_timeout(Duration::from_secs(1))
.expect("worker should return once local_ready lock is released");
assert_eq!(
next,
Some(local_task),
"local_ready task should still outrank fast_queue under contention"
);
handle.join().expect("worker join");
}
#[test]
fn schedule_local_task_fails_without_tls() {
let task = TaskId::new_for_test(1, 1);
let scheduled = schedule_local_task(task);
assert!(!scheduled, "should fail without TLS");
}
#[test]
fn local_waiter_routes_to_current_worker_local_ready() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
id
};
let waiter_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
if let Some(record) = guard.task_mut(id) {
record.mark_local();
}
drop(guard);
id
};
{
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(record) = guard.task_mut(task_id) {
record.add_waiter(waiter_id);
}
}
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let local_ready = Arc::clone(&worker.local_ready);
worker.execute(task_id);
let queued: Vec<TaskId> = local_ready.lock().drain(..).collect();
assert!(
queued.contains(&waiter_id),
"local waiter should be routed to current worker's local_ready, got {queued:?}"
);
assert!(
worker.global.pop_ready().is_none(),
"local waiter should not be in the global injector"
);
}
#[test]
fn local_waiter_pinned_routes_to_owner_worker() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
id
};
let waiter_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
if let Some(record) = guard.task_mut(id) {
record.pin_to_worker(1);
}
drop(guard);
id
};
{
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(record) = guard.task_mut(task_id) {
record.add_waiter(waiter_id);
}
}
let mut scheduler = ThreeLaneScheduler::new(2, &state);
let worker_pool = scheduler.take_workers();
let primary_worker = &worker_pool[0];
let worker1_local_ready = Arc::clone(&worker_pool[1].local_ready);
primary_worker.execute(task_id);
let queued: Vec<TaskId> = worker1_local_ready.lock().drain(..).collect();
assert!(
queued.contains(&waiter_id),
"local waiter should be routed to owner worker 1, got {queued:?}"
);
assert!(
!primary_worker.local_ready.lock().contains(&waiter_id),
"local waiter should NOT be in worker 0's local_ready"
);
assert!(
primary_worker.global.pop_ready().is_none(),
"local waiter should not be in the global injector"
);
}
#[test]
fn global_waiter_routes_to_global_injector() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let region = state
.lock()
.expect("lock")
.create_root_region(Budget::INFINITE);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
id
};
let waiter_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let (id, _) = guard
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
id
};
{
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(record) = guard.task_mut(task_id) {
record.add_waiter(waiter_id);
}
}
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.execute(task_id);
let popped = worker.global.pop_ready();
assert!(
popped.is_some(),
"global waiter should be in the global injector"
);
assert_eq!(popped.unwrap().task, waiter_id);
assert!(
worker.local_ready.lock().is_empty(),
"global waiter should NOT be in local_ready"
);
}
#[test]
#[allow(clippy::significant_drop_tightening)] fn test_local_task_cross_thread_wake_routes_correctly() {
use crate::runtime::RuntimeState;
use crate::sync::ContendedMutex;
use crate::types::Budget;
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let scheduler = ThreeLaneScheduler::new(2, &state);
let task_id = {
let mut guard = state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let region = guard.create_root_region(Budget::INFINITE);
let (tid, _) = guard
.create_task(region, Budget::INFINITE, async { 1 })
.unwrap();
let record = guard.task_mut(tid).unwrap();
record.mark_local();
record.pin_to_worker(0);
tid
};
let worker_1_ready = Arc::new(LocalReadyQueue::new(VecDeque::new()));
let _tls_guard = ScopedLocalReady::new(worker_1_ready.clone());
let _worker_guard = ScopedWorkerId::new(1);
scheduler.wake(task_id, 100);
let worker_1_has_it = worker_1_ready.lock().contains(&task_id);
let worker_0_ready = scheduler.local_ready[0].clone();
let worker_0_has_it = worker_0_ready.lock().contains(&task_id);
assert!(!worker_1_has_it, "Task incorrectly scheduled on Worker 1");
assert!(worker_0_has_it, "Task correctly routed to Worker 0");
}
fn task_table_scheduler(
worker_count: usize,
max_task_id: u32,
) -> (
ThreeLaneScheduler,
Arc<ContendedMutex<RuntimeState>>,
Arc<ContendedMutex<TaskTable>>,
) {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let task_table = local_queue::LocalQueue::test_task_table(max_task_id);
let scheduler = ThreeLaneScheduler::new_with_options_and_task_table(
worker_count,
&state,
Some(Arc::clone(&task_table)),
DEFAULT_CANCEL_STREAK_LIMIT,
false,
32,
);
(scheduler, state, task_table)
}
#[test]
fn task_table_backed_inject_ready() {
let (scheduler, _state, task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
assert!(
task_table
.lock()
.expect("task table lock poisoned")
.task(task_id)
.is_some(),
"task should be in sharded table"
);
scheduler.inject_ready(task_id, 100);
let popped = scheduler.global.pop_ready();
assert!(popped.is_some(), "task should be in global ready queue");
assert_eq!(popped.unwrap().task, task_id);
}
#[test]
fn task_table_backed_inject_cancel() {
let (scheduler, _state, _task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
scheduler.inject_cancel(task_id, 100);
let popped = scheduler.global.pop_cancel();
assert!(popped.is_some(), "task should be in global cancel queue");
assert_eq!(popped.unwrap().task, task_id);
}
#[test]
fn task_table_backed_spawn_uses_task_table() {
let (scheduler, _state, _task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
scheduler.spawn(task_id, 50);
let popped = scheduler.global.pop_ready();
assert!(popped.is_some(), "task should be in global ready queue");
assert_eq!(popped.unwrap().task, task_id);
}
#[test]
fn task_table_backed_schedule_local() {
let (mut scheduler, _state, _task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local(task_id, 50);
let next = worker.local.lock().pop_ready_only();
assert!(next.is_some(), "task should be in local scheduler");
assert_eq!(next.unwrap(), task_id);
}
#[test]
fn task_table_backed_schedule_local_cancel() {
let (mut scheduler, _state, _task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
worker.schedule_local_cancel(task_id, 50);
let next = worker.local.lock().pop_cancel_only();
assert!(next.is_some(), "task should be in local cancel lane");
assert_eq!(next.unwrap(), task_id);
}
#[test]
fn task_table_backed_schedule_local_timed() {
let (mut scheduler, _state, _task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let deadline = Time::from_nanos(1000);
worker.schedule_local_timed(task_id, deadline);
let next = worker.local.lock().pop_timed_only(Time::from_nanos(2000));
assert!(next.is_some(), "task should be in local timed lane");
assert_eq!(next.unwrap(), task_id);
}
#[test]
fn task_table_backed_wake_state_dedup() {
let (scheduler, _state, task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
scheduler.inject_ready(task_id, 50);
scheduler.inject_ready(task_id, 50);
let first = scheduler.global.pop_ready();
assert!(first.is_some());
let second = scheduler.global.pop_ready();
assert!(second.is_none(), "duplicate should be deduplicated");
{
let tt = task_table
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(record) = tt.task(task_id) {
record.wake_state.clear();
}
}
scheduler.inject_ready(task_id, 50);
let third = scheduler.global.pop_ready();
assert!(
third.is_some(),
"should be injectable after wake_state clear"
);
}
#[test]
fn task_table_backed_consume_cancel_ack() {
let (mut scheduler, _state, task_table) = task_table_scheduler(1, 3);
let task_id = TaskId::new_for_test(1, 0);
let region_id = RegionId::new_for_test(0, 0);
let cx_inner = Arc::new(RwLock::new(CxInner::new(
region_id,
task_id,
Budget::INFINITE,
)));
{
let mut tt = task_table
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(record) = tt.task_mut(task_id) {
record.cx_inner = Some(cx_inner.clone());
}
}
{
let mut guard = cx_inner.write();
guard.cancel_acknowledged = true;
}
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let result = worker.consume_cancel_ack(task_id);
assert!(result, "cancel ack should be consumed from task table");
let ack = cx_inner.read().cancel_acknowledged;
assert!(!ack, "cancel_acknowledged should be cleared");
}
#[test]
fn conformance_p0_cancel_lane_never_starves() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, 16);
let ready_tasks: Vec<TaskId> = (0..50).map(|i| TaskId::new_for_test(i, 0)).collect();
for &task_id in &ready_tasks {
scheduler.inject_ready(task_id, 100);
}
let cancel_tasks: Vec<TaskId> = (100..110).map(|i| TaskId::new_for_test(i, 0)).collect();
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let _ready1 = worker.next_task();
let _ready2 = worker.next_task();
let _ready3 = worker.next_task();
for &task_id in &cancel_tasks {
scheduler.inject_cancel(task_id, 0);
}
for i in 0..10 {
let next_task = worker.next_task();
assert!(next_task.is_some(), "should get task {}", i);
let task_id = next_task.unwrap();
assert!(
cancel_tasks.contains(&task_id),
"task {} should be from cancel lane, got {:?}",
i,
task_id
);
}
let after_cancel = worker.next_task();
assert!(
after_cancel.is_some(),
"should get ready task after cancel drain"
);
let task_id = after_cancel.unwrap();
assert!(
ready_tasks.contains(&task_id),
"should resume ready lane after cancel completion"
);
}
#[test]
fn conformance_p1_preempts_p2_within_quantum() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(1000)));
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
{
let mut guard = state.lock().expect("lock state");
guard.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
}
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let ready_tasks: Vec<TaskId> = (0..20).map(|i| TaskId::new_for_test(i, 0)).collect();
for &task_id in &ready_tasks {
scheduler.inject_ready(task_id, 100);
}
let timed_tasks: Vec<TaskId> = (50..55).map(|i| TaskId::new_for_test(i, 0)).collect();
for &task_id in &timed_tasks {
scheduler.inject_timed(task_id, Time::from_nanos(1500));
}
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let ready_dispatch_count = 3;
for i in 0..ready_dispatch_count {
let task = worker.next_task();
assert!(task.is_some(), "should get ready task {}", i);
assert!(ready_tasks.contains(&task.unwrap()));
}
clock.advance_to(Time::from_nanos(1500));
let preempting_task = worker.next_task();
assert!(preempting_task.is_some(), "should get timed task");
let task_id = preempting_task.unwrap();
assert!(
timed_tasks.contains(&task_id),
"should preempt with timed task, got {:?}",
task_id
);
for i in 1..timed_tasks.len() {
let task = worker.next_task();
assert!(task.is_some(), "should get timed task {}", i);
assert!(timed_tasks.contains(&task.unwrap()));
}
let resume_ready = worker.next_task();
assert!(resume_ready.is_some(), "should resume ready lane");
assert!(ready_tasks.contains(&resume_ready.unwrap()));
}
#[test]
fn conformance_edf_ordering_within_lane() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = Arc::new(VirtualClock::starting_at(Time::from_nanos(2000)));
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
{
let mut guard = state.lock().expect("lock state");
guard.set_timer_driver(TimerDriverHandle::with_virtual_clock(clock.clone()));
}
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let deadlines = [
Time::from_nanos(1800), Time::from_nanos(1900), Time::from_nanos(1700), Time::from_nanos(1950), ];
let task_ids: Vec<TaskId> = (10..14).map(|i| TaskId::new_for_test(i, 0)).collect();
for (i, &task_id) in task_ids.iter().enumerate() {
scheduler.inject_timed(task_id, deadlines[i]);
}
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let expected_edf_order = [
task_ids[2], task_ids[0], task_ids[1], task_ids[3], ];
for (i, &expected_task) in expected_edf_order.iter().enumerate() {
let task = worker.next_task();
assert!(task.is_some(), "should get timed task {}", i);
let actual_task = task.unwrap();
assert_eq!(
actual_task, expected_task,
"EDF violation at position {}: expected {:?}, got {:?}",
i, expected_task, actual_task
);
}
let after_timed = worker.next_task();
assert!(
after_timed.is_none(),
"timed lane should be empty after EDF drain"
);
}
#[test]
fn conformance_cancel_promotion_to_front() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let existing_cancel_tasks: Vec<TaskId> =
(0..5).map(|i| TaskId::new_for_test(i, 0)).collect();
for &task_id in &existing_cancel_tasks {
scheduler.inject_cancel(task_id, 0);
}
let ready_task = TaskId::new_for_test(100, 0);
scheduler.inject_ready(ready_task, 100);
scheduler.inject_cancel(ready_task, 0);
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let first_cancel = worker.next_task();
assert!(first_cancel.is_some(), "should get cancel task");
let first_task_id = first_cancel.unwrap();
let mut dispatched_tasks = vec![first_task_id];
for _ in 0..5 {
if let Some(task_id) = worker.next_task() {
dispatched_tasks.push(task_id);
}
}
assert!(
dispatched_tasks.contains(&ready_task),
"promoted task {:?} should be dispatched from cancel lane, got: {:?}",
ready_task,
dispatched_tasks
);
assert_eq!(
dispatched_tasks.len(),
existing_cancel_tasks.len() + 1, "should dispatch all cancel tasks first"
);
}
#[test]
fn conformance_cancel_fairness_prevents_starvation() {
let cancel_limit = 4; let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new_with_cancel_limit(1, &state, cancel_limit);
let ready_tasks: Vec<TaskId> = (0..10).map(|i| TaskId::new_for_test(i, 0)).collect();
for &task_id in &ready_tasks {
scheduler.inject_ready(task_id, 100);
}
let cancel_tasks: Vec<TaskId> = (100..120).map(|i| TaskId::new_for_test(i, 0)).collect();
for &task_id in &cancel_tasks {
scheduler.inject_cancel(task_id, 0);
}
let mut workers = scheduler.take_workers();
let worker = &mut workers[0];
let mut cancel_dispatches = 0;
let mut ready_dispatches = 0;
let mut total_dispatches = 0;
while total_dispatches < 30 {
if let Some(task_id) = worker.next_task() {
total_dispatches += 1;
if cancel_tasks.contains(&task_id) {
cancel_dispatches += 1;
} else if ready_tasks.contains(&task_id) {
ready_dispatches += 1;
break;
}
assert!(
cancel_dispatches < cancel_limit * 2,
"Cancel fairness violated: {} cancel dispatches without ready dispatch",
cancel_dispatches
);
} else {
break;
}
}
assert!(
ready_dispatches > 0,
"Ready lane should not starve under cancel pressure. Cancel: {}, Ready: {}",
cancel_dispatches,
ready_dispatches
);
assert!(
cancel_dispatches >= cancel_limit,
"Should dispatch at least {} cancel tasks before fairness kicks in",
cancel_limit
);
}
#[test]
fn conformance_lyapunov_governor_bounded_queues() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let weights = PotentialWeights::default();
let governor = LyapunovGovernor::new(weights);
let task_burst_size = 200;
let ready_tasks: Vec<TaskId> = (0..task_burst_size)
.map(|i| TaskId::new_for_test(i, 0))
.collect();
let mut max_observed_ready_queue = 0;
for (i, &task_id) in ready_tasks.iter().enumerate() {
scheduler.inject_ready(task_id, 100);
if i % 20 == 0 {
let workers = scheduler.take_workers();
if let Some(worker) = workers.first() {
let ready_queue_size = {
let global_ready_count = 0;
let local_ready_count = worker.local_ready.lock().len();
global_ready_count + local_ready_count
};
max_observed_ready_queue = max_observed_ready_queue.max(ready_queue_size);
let state_snapshot = StateSnapshot {
ready_queue_depth: ready_queue_size as u32,
..Default::default()
};
let _suggestion = governor.suggest(&state_snapshot);
if ready_queue_size > 150 {
assert!(
true,
"Lyapunov governor should suggest backpressure for queue size {}",
ready_queue_size
);
}
}
let _ = scheduler.take_workers();
}
}
assert!(
max_observed_ready_queue > 50,
"Should observe queue growth under burst load"
);
assert!(
max_observed_ready_queue < task_burst_size as usize,
"Queue should not grow unboundedly: max observed = {}, burst size = {}",
max_observed_ready_queue,
task_burst_size
);
let mut workers = scheduler.take_workers();
if let Some(worker) = workers.first_mut() {
for _ in 0..50 {
worker.next_task();
}
let final_queue_size = {
let global_ready_count = 0;
let local_ready_count = worker.local_ready.lock().len();
global_ready_count + local_ready_count
};
assert!(
final_queue_size < max_observed_ready_queue,
"Queue should reduce after task consumption: final={}, max={}",
final_queue_size,
max_observed_ready_queue
);
}
}
#[test]
fn golden_test_exp3_weights_stabilize_after_n_cancel_events() {
let mut policy = AdaptiveCancelStreakPolicy::new(32); let mut weight_history: Vec<[f64; 5]> = Vec::new();
for step in 0..500 {
policy.refresh_probs();
let selected = 0;
let _reward = if selected == 2 { 0.6 } else { 0.4 };
if step % 50 == 49 {
weight_history.push(policy.weights);
}
}
assert!(
weight_history.len() >= 2,
"Need at least 2 weight snapshots"
);
let second_last = &weight_history[weight_history.len() - 2];
let last = &weight_history[weight_history.len() - 1];
for i in 0..5 {
let change_ratio = (last[i] - second_last[i]).abs() / second_last[i];
assert!(
change_ratio < 0.05,
"Weight for arm {} should stabilize: change ratio {:.4} >= 0.05",
i,
change_ratio
);
}
let best_arm = (0..5)
.max_by(|&a, &b| last[a].partial_cmp(&last[b]).unwrap())
.unwrap();
assert_eq!(
best_arm, 2,
"Arm 2 should have highest weight after convergence"
);
let weight_variance = {
let mean: f64 = last.iter().sum::<f64>() / 5.0;
let variance: f64 = last.iter().map(|&w| (w - mean).powi(2)).sum::<f64>() / 5.0;
variance
};
assert!(
weight_variance > 0.01,
"Weights should not be uniform after convergence"
);
}
#[test]
fn golden_test_cancel_streak_penalty_converges() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let mut penalty_history: Vec<f64> = Vec::new();
for i in 0..200 {
let task_id = TaskId::new_for_test(1000, i);
worker.schedule_local_cancel(task_id, 100);
for _ in 0..3 {
worker.next_task();
}
if i % 20 == 19 {
let penalty = 0.0;
penalty_history.push(penalty);
}
}
assert!(
penalty_history.len() >= 3,
"Need at least 3 penalty snapshots"
);
let recent = &penalty_history[penalty_history.len() - 3..];
let penalty_variance = {
let mean: f64 = recent.iter().sum::<f64>() / recent.len() as f64;
recent.iter().map(|&p| (p - mean).powi(2)).sum::<f64>() / recent.len() as f64
};
assert!(
penalty_variance < 0.01,
"Cancel-streak penalty should converge: variance {:.6} >= 0.01",
penalty_variance
);
for &penalty in recent {
assert!(
(0.0..=2.0).contains(&penalty),
"Penalty {:.4} should be in bounds [0.0, 2.0]",
penalty
);
}
}
#[test]
fn golden_test_adaptive_threshold_updates_within_bounds() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
let mut threshold_history: Vec<usize> = Vec::new();
let initial_threshold = 1000;
for epoch in 0..20 {
for step in 0..50 {
let task_id = TaskId::new_for_test(2000 + epoch, step);
worker.schedule_local_cancel(task_id, 100);
worker.next_task();
if step % 10 == 9 {
let current_threshold = 1000;
threshold_history.push(current_threshold);
}
}
}
for &threshold in &threshold_history {
assert!(
ADAPTIVE_STREAK_ARMS.contains(&threshold),
"Threshold {} should be one of the valid arms {:?}",
threshold,
ADAPTIVE_STREAK_ARMS
);
}
let adaptation_occurred = threshold_history.iter().any(|&t| t != initial_threshold);
assert!(
adaptation_occurred,
"Threshold should adapt from initial value {} during varied workload",
initial_threshold
);
let extreme_jumps = threshold_history
.windows(2)
.filter(|window| {
let diff = window[1].abs_diff(window[0]);
diff > 24 })
.count();
let jump_ratio = extreme_jumps as f64 / (threshold_history.len() - 1) as f64;
assert!(
jump_ratio < 0.3,
"Too many extreme threshold jumps: {:.2}% >= 30%",
jump_ratio * 100.0
);
}
#[test]
fn golden_test_concurrent_cancel_events_no_double_penalize() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(2, &state); let mut workers = scheduler.take_workers();
for worker in &workers {
let initial_weights: [f64; 5] = worker.adaptive_cancel_policy.as_ref().unwrap().weights;
assert_eq!(
initial_weights, [1.0; 5],
"Initial weights should be uniform"
);
}
let task_base = 3000;
for i in 0..50 {
for (worker_idx, worker) in workers.iter_mut().enumerate() {
let task_id = TaskId::new_for_test((task_base + worker_idx * 100) as u32, i as u32);
worker.schedule_local_cancel(task_id, 100);
}
}
let mut total_processed = [0; 2];
for _ in 0..100 {
for (worker_idx, worker) in workers.iter_mut().enumerate() {
if worker.next_task().is_some() {
total_processed[worker_idx] += 1;
}
}
}
assert!(
total_processed[0] > 0 && total_processed[1] > 0,
"Both workers should process cancel events: [{}, {}]",
total_processed[0],
total_processed[1]
);
for (worker_idx, worker) in workers.iter().enumerate() {
let final_weights: [f64; 5] = worker.adaptive_cancel_policy.as_ref().unwrap().weights;
for (arm_idx, &weight) in final_weights.iter().enumerate() {
assert!(
(1e-30..=1e30).contains(&weight),
"Worker {} arm {} weight {:.2e} out of bounds [1e-30, 1e30]",
worker_idx,
arm_idx,
weight
);
}
let weight_sum: f64 = final_weights.iter().sum();
assert!(
weight_sum > 1e-10 && weight_sum < 1e20,
"Worker {} total weight sum {:.2e} unreasonable",
worker_idx,
weight_sum
);
}
for (worker_idx, worker) in workers.iter().enumerate() {
let e_process = worker
.adaptive_cancel_policy
.as_ref()
.unwrap()
.e_process_log;
assert!(
e_process.is_finite() && e_process.abs() < 100.0,
"Worker {} e-process log {:.4} should be finite and bounded",
worker_idx,
e_process
);
}
}
fn test_adaptive_epoch_snapshot(
potential: f64,
deadline_pressure: f64,
base_limit_exceedances: u64,
effective_limit_exceedances: u64,
fallback_cancel_dispatches: u64,
) -> AdaptiveEpochSnapshot {
AdaptiveEpochSnapshot {
potential,
deadline_pressure,
base_limit_exceedances,
effective_limit_exceedances,
fallback_cancel_dispatches,
}
}
fn replay_adaptive_limit_trace(seed: u64, epochs: usize) -> Vec<usize> {
let mut policy = AdaptiveCancelStreakPolicy::new(4);
let mut rng = crate::util::DetRng::new(seed);
let start = test_adaptive_epoch_snapshot(100.0, 0.25, 0, 0, 0);
let relaxed = test_adaptive_epoch_snapshot(72.0, 0.10, 0, 0, 0);
let pressured = test_adaptive_epoch_snapshot(128.0, 0.70, 2, 4, 2);
let mut trace = Vec::with_capacity(epochs);
for epoch in 0..epochs {
policy.selected_arm = 2; policy.begin_epoch(start);
let sample = rng.next_u64();
let end = if epoch % 2 == 0 { relaxed } else { pressured };
let reward = policy
.complete_epoch(end, sample)
.expect("epoch start snapshot should be present");
assert!(
reward.is_finite(),
"adaptive reward should stay finite across replay"
);
trace.push(policy.current_limit());
}
trace
}
#[test]
fn golden_test_cancel_streak_adaptivity_same_seed_replays_limit_trace() {
let trace_a = replay_adaptive_limit_trace(0xC0DE_CAFE_BEEF_0001, 24);
let trace_b = replay_adaptive_limit_trace(0xC0DE_CAFE_BEEF_0001, 24);
assert_eq!(
trace_a, trace_b,
"same-seed adaptive replay should produce the same limit trace"
);
let distinct_limits = trace_a
.iter()
.copied()
.collect::<std::collections::BTreeSet<_>>()
.len();
assert!(
distinct_limits >= 2,
"deterministic replay should still explore multiple cancel-streak limits: {:?}",
trace_a
);
}
#[test]
fn golden_test_cancel_streak_adaptivity_penalty_reduces_prob_mass() {
fn favored_arm_prob(end: AdaptiveEpochSnapshot) -> f64 {
let mut policy = AdaptiveCancelStreakPolicy::new(4);
let start = test_adaptive_epoch_snapshot(100.0, 0.25, 0, 0, 0);
for _ in 0..12 {
policy.selected_arm = 2;
policy.begin_epoch(start);
let _reward = policy
.complete_epoch(end, 0)
.expect("epoch start snapshot should be present");
}
policy.refresh_probs();
policy.probs[2]
}
let relaxed = test_adaptive_epoch_snapshot(70.0, 0.10, 0, 0, 0);
let pressured = test_adaptive_epoch_snapshot(130.0, 0.85, 4, 8, 4);
let relaxed_prob = favored_arm_prob(relaxed);
let pressured_prob = favored_arm_prob(pressured);
assert!(
relaxed_prob > pressured_prob,
"heavier cancel/fairness penalties should reduce EXP3 mass for the repeatedly selected arm: relaxed={relaxed_prob:.4}, pressured={pressured_prob:.4}"
);
assert!(
relaxed_prob - pressured_prob > 0.05,
"penalty-driven probability shift should be material: relaxed={relaxed_prob:.4}, pressured={pressured_prob:.4}"
);
}
#[test]
fn golden_test_lab_runtime_replay_determinism() {
let mut trace_a = Vec::new();
let mut trace_b = Vec::new();
{
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
for i in 0..100 {
let task_id = TaskId::new_for_test(4000, i);
worker.schedule_local_cancel(task_id, 100);
if i % 10 == 9 {
let policy = &worker.adaptive_cancel_policy;
trace_a.push((
policy.as_ref().unwrap().selected_arm,
policy.as_ref().unwrap().epoch_count,
policy.as_ref().unwrap().steps_in_epoch,
policy.as_ref().unwrap().weights,
policy.as_ref().unwrap().probs,
));
}
worker.next_task();
}
}
{
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = ThreeLaneScheduler::new(1, &state);
let worker = &mut scheduler.workers[0];
for i in 0..100 {
let task_id = TaskId::new_for_test(4000, i);
worker.schedule_local_cancel(task_id, 100);
if i % 10 == 9 {
let policy = &worker.adaptive_cancel_policy;
trace_b.push((
policy.as_ref().unwrap().selected_arm,
policy.as_ref().unwrap().epoch_count,
policy.as_ref().unwrap().steps_in_epoch,
policy.as_ref().unwrap().weights,
policy.as_ref().unwrap().probs,
));
}
worker.next_task();
}
}
assert_eq!(trace_a.len(), trace_b.len(), "Trace lengths should match");
for (step, (state_a, state_b)) in trace_a.iter().zip(trace_b.iter()).enumerate() {
assert_eq!(
state_a.0, state_b.0,
"Step {}: Selected arm should be deterministic: {} vs {}",
step, state_a.0, state_b.0
);
assert_eq!(
state_a.1, state_b.1,
"Step {}: Epoch count should be deterministic: {} vs {}",
step, state_a.1, state_b.1
);
assert_eq!(
state_a.2, state_b.2,
"Step {}: Steps in epoch should be deterministic: {} vs {}",
step, state_a.2, state_b.2
);
for arm in 0..5 {
assert_eq!(
state_a.3[arm], state_b.3[arm],
"Step {}: Weight[{}] should be deterministic: {:.6} vs {:.6}",
step, arm, state_a.3[arm], state_b.3[arm]
);
}
for arm in 0..5 {
assert_eq!(
state_a.4[arm], state_b.4[arm],
"Step {}: Prob[{}] should be deterministic: {:.6} vs {:.6}",
step, arm, state_a.4[arm], state_b.4[arm]
);
}
}
}
#[test]
fn three_lane_scheduler_state_dump_scrubbed() {
insta::assert_json_snapshot!(
"three_lane_scheduler_state_dump_scrubbed",
json!({
"empty": empty_scheduler_state_dump(),
"loaded": loaded_scheduler_state_dump(),
"cancel_streak": cancel_streak_scheduler_state_dump(),
})
);
}
}