pub mod decision_contract;
#[cfg(test)]
pub mod edf_priority_metamorphic;
pub mod global_injector;
pub mod global_queue;
pub mod intrusive;
pub mod intrusive_heap;
pub mod invariant_monitor;
pub mod local_queue;
#[cfg(test)]
pub mod metamorphic_tests;
pub mod priority;
pub mod priority_inversion_oracle;
pub mod stealing;
pub mod three_lane;
pub mod work_stealing_checker;
pub mod worker;
pub use global_injector::GlobalInjector;
pub use global_queue::GlobalQueue;
pub use intrusive::{IntrusiveRing, IntrusiveStack, QUEUE_TAG_CANCEL, QUEUE_TAG_READY};
pub use intrusive_heap::IntrusivePriorityHeap;
pub use invariant_monitor::{
InvariantCategory, InvariantConfig, InvariantStats, InvariantViolation, QueueSnapshot,
SchedulerInvariant, SchedulerInvariantMonitor, WorkerLoadSnapshot,
};
pub use local_queue::LocalQueue;
pub use priority::{
DispatchLane, ScheduleCertificate, Scheduler as PriorityScheduler, SchedulerMode,
};
pub use priority_inversion_oracle::{
InversionId, InversionImpact, InversionOracleConfig, InversionSeverity, InversionStats,
InversionType, Priority, PriorityInversion, PriorityInversionOracle, ResourceId,
};
pub use three_lane::{ThreeLaneScheduler, ThreeLaneWorker};
pub use work_stealing_checker::{
OwnershipState, StealingStats, ViolationType, WorkStealingChecker,
};
pub use worker::{Parker, Worker};
use crate::types::TaskId;
#[derive(Debug)]
pub struct WorkStealingScheduler {
inner: ThreeLaneScheduler,
}
impl WorkStealingScheduler {
pub fn new(
worker_count: usize,
state: &std::sync::Arc<crate::sync::ContendedMutex<crate::runtime::RuntimeState>>,
) -> Self {
let worker_count = worker_count.max(1);
Self {
inner: ThreeLaneScheduler::new(worker_count, state),
}
}
pub fn spawn(&self, task: TaskId) {
self.inner.spawn(task, 0);
}
pub fn wake(&self, task: TaskId) {
self.inner.wake(task, 0);
}
pub fn take_workers(&mut self) -> Vec<ThreeLaneWorker> {
self.inner.take_workers()
}
pub fn shutdown(&self) {
self.inner.shutdown();
}
}
pub use priority::Scheduler;
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::RuntimeState;
use crate::sync::ContendedMutex;
use std::sync::Arc;
use std::time::Duration;
#[test]
fn test_worker_shutdown() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = WorkStealingScheduler::new(2, &state);
let workers = scheduler.take_workers();
assert_eq!(workers.len(), 2);
let handles: Vec<_> = workers
.into_iter()
.map(|mut worker| {
std::thread::spawn(move || {
worker.run_loop();
})
})
.collect();
std::thread::sleep(Duration::from_millis(10));
scheduler.shutdown();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_zero_worker_count_is_clamped_to_one() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = WorkStealingScheduler::new(0, &state);
let workers = scheduler.take_workers();
assert_eq!(
workers.len(),
1,
"scheduler must clamp zero workers to one for forward progress"
);
}
}