use std::collections::HashMap;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct TaskId(pub u64);
impl TaskId {
pub fn new(id: u64) -> Self {
TaskId(id)
}
pub fn raw(self) -> u64 {
self.0
}
}
impl std::fmt::Display for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "task#{}", self.0)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(u8)]
pub enum TaskPriority {
Critical = 0,
High = 1,
Normal = 2,
Low = 3,
Background = 4,
}
impl TaskPriority {
pub fn level(self) -> u8 {
self as u8
}
pub fn from_level(level: u8) -> Self {
match level {
0 => TaskPriority::Critical,
1 => TaskPriority::High,
2 => TaskPriority::Normal,
3 => TaskPriority::Low,
_ => TaskPriority::Background,
}
}
}
impl std::fmt::Display for TaskPriority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
TaskPriority::Critical => "critical",
TaskPriority::High => "high",
TaskPriority::Normal => "normal",
TaskPriority::Low => "low",
TaskPriority::Background => "background",
};
write!(f, "{}", s)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TaskState {
Pending,
Running {
worker: usize,
},
Completed,
Failed(String),
Cancelled,
}
impl TaskState {
pub fn is_terminal(&self) -> bool {
matches!(
self,
TaskState::Completed | TaskState::Failed(_) | TaskState::Cancelled
)
}
pub fn is_pending(&self) -> bool {
matches!(self, TaskState::Pending)
}
pub fn is_running(&self) -> bool {
matches!(self, TaskState::Running { .. })
}
}
impl std::fmt::Display for TaskState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskState::Pending => write!(f, "pending"),
TaskState::Running { worker } => write!(f, "running(worker={})", worker),
TaskState::Completed => write!(f, "completed"),
TaskState::Failed(msg) => write!(f, "failed({})", msg),
TaskState::Cancelled => write!(f, "cancelled"),
}
}
}
#[derive(Clone, Debug)]
pub struct Task {
pub id: TaskId,
pub priority: TaskPriority,
pub state: TaskState,
pub enqueued_at: u64,
pub started_at: Option<u64>,
pub completed_at: Option<u64>,
}
impl Task {
pub fn new(id: TaskId, priority: TaskPriority, enqueued_at: u64) -> Self {
Task {
id,
priority,
state: TaskState::Pending,
enqueued_at,
started_at: None,
completed_at: None,
}
}
pub fn latency_ns(&self) -> Option<u64> {
self.completed_at
.map(|c| c.saturating_sub(self.enqueued_at))
}
pub fn queue_delay_ns(&self) -> Option<u64> {
self.started_at.map(|s| s.saturating_sub(self.enqueued_at))
}
pub fn execution_ns(&self) -> Option<u64> {
match (self.started_at, self.completed_at) {
(Some(s), Some(c)) => Some(c.saturating_sub(s)),
_ => None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct WorkerStats {
pub id: usize,
pub tasks_completed: u64,
pub tasks_stolen: u64,
pub idle_time_ns: u64,
pub busy_time_ns: u64,
}
impl WorkerStats {
pub fn new(id: usize) -> Self {
WorkerStats {
id,
..Default::default()
}
}
pub fn utilization(&self) -> f64 {
let total = self.busy_time_ns + self.idle_time_ns;
if total == 0 {
return 0.0;
}
self.busy_time_ns as f64 / total as f64
}
pub fn total_tasks(&self) -> u64 {
self.tasks_completed + self.tasks_stolen
}
}
impl std::fmt::Display for WorkerStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Worker[{}] completed={} stolen={} util={:.2}",
self.id,
self.tasks_completed,
self.tasks_stolen,
self.utilization()
)
}
}
#[derive(Clone, Debug, Default)]
pub struct SchedulerMetrics {
pub total_tasks: u64,
pub completed: u64,
pub failed: u64,
pub workers: Vec<WorkerStats>,
pub throughput_per_sec: f64,
pub avg_latency_ns: u64,
}
impl SchedulerMetrics {
pub fn in_flight(&self) -> u64 {
self.total_tasks
.saturating_sub(self.completed + self.failed)
}
pub fn success_rate(&self) -> f64 {
let terminal = self.completed + self.failed;
if terminal == 0 {
return 1.0;
}
self.completed as f64 / terminal as f64
}
pub fn busiest_worker(&self) -> Option<&WorkerStats> {
self.workers.iter().max_by_key(|w| w.tasks_completed)
}
pub fn least_loaded_worker(&self) -> Option<&WorkerStats> {
self.workers.iter().min_by_key(|w| w.tasks_completed)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum LoadBalancePolicy {
RoundRobin,
LeastLoaded,
WorkStealing,
PriorityFirst,
}
impl std::fmt::Display for LoadBalancePolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
LoadBalancePolicy::RoundRobin => "round-robin",
LoadBalancePolicy::LeastLoaded => "least-loaded",
LoadBalancePolicy::WorkStealing => "work-stealing",
LoadBalancePolicy::PriorityFirst => "priority-first",
};
write!(f, "{}", s)
}
}
pub struct AdaptiveScheduler {
pub policy: LoadBalancePolicy,
pub workers: Vec<WorkerStats>,
pub metrics: SchedulerMetrics,
pub(super) tasks: HashMap<TaskId, Task>,
pub(super) next_task_id: u64,
pub(super) clock: u64,
pub(super) rr_cursor: usize,
pub(super) total_latency_ns: u64,
pub(super) latency_sample_count: u64,
}