pub mod autotuner;
pub mod content;
#[cfg(test)]
pub mod content_tests;
pub mod decision_contract;
#[cfg(test)]
pub mod edf_priority_metamorphic;
pub mod global_injector;
pub mod global_queue;
pub mod intrusive;
pub mod intrusive_heap;
pub mod invariant_monitor;
#[cfg(test)]
pub mod lane_pressure_scaling_metamorphic;
pub mod local_queue;
#[cfg(test)]
pub mod metamorphic_tests;
pub mod priority;
#[cfg(test)]
pub mod priority_inversion_metamorphic;
pub mod priority_inversion_oracle;
#[cfg(test)]
pub mod ready_dispatch_invariance_metamorphic;
#[cfg(test)]
pub mod shutdown_behavior_audit_test;
pub mod state_backing;
pub mod stealing;
pub mod stream_priority;
pub mod swarm_evidence;
pub mod three_lane;
pub mod work_stealing_checker;
#[cfg(test)]
pub mod work_stealing_fairness_metamorphic;
pub mod worker;
pub use crate::runtime::config::SchedulerPlacementMode;
pub use autotuner::{
AutotunerConfig, AutotunerRecommendation, HotPathObservation,
SchedulerAdmissionControlThresholds, SchedulerAutotuner, SchedulerFeedbackClamp,
SchedulerFeedbackClampReason, SchedulerFeedbackCurrentKnobs, SchedulerFeedbackEvidence,
SchedulerFeedbackKnob, SchedulerFeedbackKnobSet, SchedulerFeedbackMetrics,
SchedulerFeedbackPolicy, SchedulerFeedbackProtectedInvariants, SchedulerFeedbackReason,
SchedulerFeedbackRecommendation, SchedulerFeedbackSignal, SchedulerFeedbackWorkloadClass,
extract_observation, recommend_scheduler_feedback,
};
pub use content::{
ContentId, ContentItem, ContentScheduler, PressureSnapshot, PriorityClass, ScheduleEvidence,
ScheduleReason,
};
pub use global_injector::GlobalInjector;
pub use global_queue::GlobalQueue;
pub use intrusive::{IntrusiveRing, IntrusiveStack, QUEUE_TAG_CANCEL, QUEUE_TAG_READY};
pub use intrusive_heap::IntrusivePriorityHeap;
pub use invariant_monitor::{
InvariantCategory, InvariantConfig, InvariantStats, InvariantViolation, QueueSnapshot,
SchedulerInvariant, SchedulerInvariantMonitor, WorkerLoadSnapshot,
};
pub use local_queue::LocalQueue;
pub use priority::{
DispatchLane, ScheduleCertificate, Scheduler as PriorityScheduler, SchedulerMode,
};
pub use priority_inversion_oracle::{
InversionId, InversionImpact, InversionOracleConfig, InversionSeverity, InversionStats,
InversionType, Priority, PriorityInversion, PriorityInversionOracle, ResourceId,
};
pub use stream_priority::{
SchedulerIntegration, SchedulerStats, StreamAssignment, StreamPriority, StreamPriorityScheduler,
};
pub use swarm_evidence::{
CoordinationPressureFamily, SCHEDULER_COORDINATION_EVIDENCE_SCHEMA_VERSION,
SCHEDULER_EVIDENCE_SCHEMA_VERSION, SWARM_ADMISSION_POLICY_REPORT_SCHEMA_VERSION,
SWARM_CAPACITY_SNAPSHOT_SCHEMA_VERSION, SWARM_MEMORY_BUDGET_PLAN_SCHEMA_VERSION,
SWARM_MEMORY_RESIDENCY_POLICY_SCHEMA_VERSION, SchedulerCoordinationEvidenceInput,
SchedulerCoordinationEvidenceInputs, SchedulerEvidenceArtifact, SchedulerEvidenceError,
SchedulerEvidenceMetrics, SchedulerKnobProfile, SchedulerRecommendationReason,
SchedulerTopologyDescriptor, SchedulerTuneReport, SchedulerWorkloadClass,
SwarmAdmissionDecision, SwarmAdmissionLane, SwarmAdmissionReasonCode, SwarmAdmissionReport,
SwarmCapacitySnapshot, SwarmCoordinationBacklogSignals, SwarmCpuTopologyHints,
SwarmDiskCapacity, SwarmDiskPressureLevel, SwarmLaneAdmission, SwarmMemoryBrownoutClass,
SwarmMemoryBudgetPlan, SwarmMemoryCapacity, SwarmMemoryHostTier, SwarmMemoryPressureTier,
SwarmMemoryProtectedInvariant, SwarmMemoryResidencyDecision, SwarmMemoryResidencyEnvelope,
SwarmMemoryResidencyFallbackReason, SwarmMemoryResidencyPlan, SwarmMemoryResidencyRequest,
SwarmMemoryResidencyTier, SwarmMemoryResidencyWorkloadClass, SwarmRchAdmissibility,
SwarmRchCapacity, SwarmValidationClass,
};
pub use three_lane::{ThreeLaneScheduler, ThreeLaneWorker};
pub use work_stealing_checker::{
OwnershipState, StealingStats, ViolationType, WorkStealingChecker,
};
pub use worker::{Parker, Worker};
use crate::types::TaskId;
#[derive(Debug)]
pub struct WorkStealingScheduler {
inner: ThreeLaneScheduler,
}
impl WorkStealingScheduler {
pub fn new(
worker_count: usize,
state: &std::sync::Arc<crate::sync::ContendedMutex<crate::runtime::RuntimeState>>,
) -> Self {
let worker_count = worker_count.max(1);
Self {
inner: ThreeLaneScheduler::new(worker_count, state),
}
}
pub fn spawn(&self, task: TaskId) {
self.inner.spawn(task, 0);
}
pub fn wake(&self, task: TaskId) {
self.inner.wake(task, 0);
}
pub fn take_workers(&mut self) -> Vec<ThreeLaneWorker> {
self.inner.take_workers()
}
pub fn shutdown(&self) {
self.inner.shutdown();
}
}
pub use priority::Scheduler;
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::runtime::RuntimeState;
use crate::sync::ContendedMutex;
use std::sync::Arc;
use std::time::Duration;
#[test]
fn test_worker_shutdown() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = WorkStealingScheduler::new(2, &state);
let workers = scheduler.take_workers();
assert_eq!(workers.len(), 2);
let handles: Vec<_> = workers
.into_iter()
.map(|mut worker| {
std::thread::spawn(move || {
worker.run_loop();
})
})
.collect();
std::thread::sleep(Duration::from_millis(10));
scheduler.shutdown();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_zero_worker_count_is_clamped_to_one() {
let state = Arc::new(ContendedMutex::new("runtime_state", RuntimeState::new()));
let mut scheduler = WorkStealingScheduler::new(0, &state);
let workers = scheduler.take_workers();
assert_eq!(
workers.len(),
1,
"scheduler must clamp zero workers to one for forward progress"
);
}
}