use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex, Weak};
use crate::advanced::Priority;
use crate::task::TaskId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct RevPriority(u8);
impl RevPriority {
fn new(p: Priority) -> Self {
Self(u8::MAX - (p as u8))
}
fn to_priority(self) -> Priority {
Priority::from(u8::MAX - self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct SeqNum(u64);
pub struct DynamicPriorityScheduler {
index: BTreeMap<(RevPriority, SeqNum), TaskId>,
reverse: HashMap<TaskId, (RevPriority, SeqNum)>,
seq: u64,
}
impl DynamicPriorityScheduler {
pub fn new() -> Self {
Self {
index: BTreeMap::new(),
reverse: HashMap::new(),
seq: 0,
}
}
pub fn push(&mut self, task_id: TaskId, priority: Priority) {
self.push_inner(task_id, priority);
}
pub(crate) fn push_inner(&mut self, task_id: TaskId, priority: Priority) -> SeqNum {
let rp = RevPriority::new(priority);
let seq = SeqNum(self.seq);
self.seq += 1;
self.index.insert((rp, seq), task_id);
self.reverse.insert(task_id, (rp, seq));
seq
}
pub fn pop(&mut self) -> Option<TaskId> {
let (&key, &task_id) = self.index.iter().next()?;
self.index.remove(&key);
self.reverse.remove(&task_id);
Some(task_id)
}
pub fn peek(&self) -> Option<(TaskId, Priority)> {
self.index.iter().next().map(|((rp, _), &tid)| (tid, rp.to_priority()))
}
pub fn reprioritize(&mut self, task_id: TaskId, new_priority: Priority) -> bool {
let (old_rp, old_seq) = match self.reverse.remove(&task_id) {
Some(v) => v,
None => return false,
};
self.index.remove(&(old_rp, old_seq));
let new_rp = RevPriority::new(new_priority);
self.index.insert((new_rp, old_seq), task_id);
self.reverse.insert(task_id, (new_rp, old_seq));
true
}
pub fn peek_priority(&self, task_id: TaskId) -> Option<Priority> {
self.reverse.get(&task_id).map(|(rp, _)| rp.to_priority())
}
pub fn remove(&mut self, task_id: TaskId) -> bool {
if let Some((rp, seq)) = self.reverse.remove(&task_id) {
self.index.remove(&(rp, seq));
true
} else {
false
}
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn drain_ordered(&mut self) -> Vec<(TaskId, Priority)> {
let mut result = Vec::with_capacity(self.len());
while let Some(task_id) = self.pop() {
let priority = Priority::Normal; result.push((task_id, priority));
}
result
}
pub fn snapshot(&self) -> Vec<(TaskId, Priority)> {
self.index
.iter()
.map(|((rp, _), &tid)| (tid, rp.to_priority()))
.collect()
}
}
impl Default for DynamicPriorityScheduler {
fn default() -> Self { Self::new() }
}
#[derive(Clone)]
pub struct SharedDynamicScheduler(Arc<Mutex<DynamicPriorityScheduler>>);
impl SharedDynamicScheduler {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(DynamicPriorityScheduler::new())))
}
pub fn push(&self, task_id: TaskId, priority: Priority) -> PriorityHandle {
let seq = self.0.lock().unwrap().push_inner(task_id, priority);
PriorityHandle {
task_id,
seq,
scheduler: Arc::downgrade(&self.0),
}
}
pub fn pop(&self) -> Option<TaskId> {
self.0.lock().unwrap().pop()
}
pub fn peek(&self) -> Option<(TaskId, Priority)> {
self.0.lock().unwrap().peek()
}
pub fn reprioritize(&self, task_id: TaskId, new_priority: Priority) -> bool {
self.0.lock().unwrap().reprioritize(task_id, new_priority)
}
pub fn peek_priority(&self, task_id: TaskId) -> Option<Priority> {
self.0.lock().unwrap().peek_priority(task_id)
}
pub fn remove(&self, task_id: TaskId) -> bool {
self.0.lock().unwrap().remove(task_id)
}
pub fn len(&self) -> usize {
self.0.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.0.lock().unwrap().is_empty()
}
pub fn snapshot(&self) -> Vec<(TaskId, Priority)> {
self.0.lock().unwrap().snapshot()
}
}
impl Default for SharedDynamicScheduler {
fn default() -> Self { Self::new() }
}
#[derive(Debug, Clone)]
pub struct PriorityHandle {
pub task_id: TaskId,
seq: SeqNum,
scheduler: Weak<Mutex<DynamicPriorityScheduler>>,
}
impl PriorityHandle {
pub fn reprioritize(&self, new_priority: Priority) -> bool {
self.scheduler
.upgrade()
.map(|arc| arc.lock().unwrap().reprioritize(self.task_id, new_priority))
.unwrap_or(false)
}
pub fn current_priority(&self) -> Option<Priority> {
self.scheduler
.upgrade()
.and_then(|arc| arc.lock().unwrap().peek_priority(self.task_id))
}
pub fn cancel(&self) -> bool {
self.scheduler
.upgrade()
.map(|arc| arc.lock().unwrap().remove(self.task_id))
.unwrap_or(false)
}
pub fn is_pending(&self) -> bool {
self.current_priority().is_some()
}
pub fn sequence_number(&self) -> u64 {
self.seq.0
}
}
pub struct EscalationPolicy {
scheduler: SharedDynamicScheduler,
tick_interval: u64,
tick_count: u64,
#[allow(dead_code)]
low_age_ticks: u64,
#[allow(dead_code)]
normal_age_ticks: u64,
}
impl EscalationPolicy {
pub fn new(
scheduler: SharedDynamicScheduler,
tick_interval: u64,
low_age_ticks: u64,
normal_age_ticks: u64,
) -> Self {
Self {
scheduler,
tick_interval,
tick_count: 0,
low_age_ticks,
normal_age_ticks,
}
}
pub fn tick(&mut self) {
self.tick_count += 1;
if self.tick_count % self.tick_interval != 0 {
return;
}
self.escalate();
}
pub fn escalate(&self) {
let snapshot = self.scheduler.snapshot();
for (task_id, priority) in snapshot {
let new_priority = match priority {
Priority::Low => Priority::Normal,
Priority::Normal => Priority::High,
_ => continue, };
self.scheduler.reprioritize(task_id, new_priority);
}
}
}
impl crate::scheduler::Scheduler for DynamicPriorityScheduler {
fn push(&mut self, task_id: TaskId, priority: Priority) {
self.push_inner(task_id, priority);
}
fn pop(&mut self) -> Option<TaskId> {
DynamicPriorityScheduler::pop(self)
}
fn is_empty(&self) -> bool {
DynamicPriorityScheduler::is_empty(self)
}
fn len(&self) -> usize {
DynamicPriorityScheduler::len(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_push_pop_priority_order() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(1, Priority::Low);
sched.push(2, Priority::High);
sched.push(3, Priority::Normal);
sched.push(4, Priority::Critical);
assert_eq!(sched.pop(), Some(4)); assert_eq!(sched.pop(), Some(2)); assert_eq!(sched.pop(), Some(3)); assert_eq!(sched.pop(), Some(1)); assert_eq!(sched.pop(), None);
}
#[test]
fn fifo_within_equal_priority() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(10, Priority::Normal);
sched.push(20, Priority::Normal);
sched.push(30, Priority::Normal);
assert_eq!(sched.pop(), Some(10));
assert_eq!(sched.pop(), Some(20));
assert_eq!(sched.pop(), Some(30));
}
#[test]
fn reprioritize_elevates_task() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(1, Priority::Low);
sched.push(2, Priority::Normal);
assert!(sched.reprioritize(1, Priority::Critical));
assert_eq!(sched.pop(), Some(1));
assert_eq!(sched.pop(), Some(2));
}
#[test]
fn reprioritize_demotes_task() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(1, Priority::Critical);
sched.push(2, Priority::Normal);
assert!(sched.reprioritize(1, Priority::Low));
assert_eq!(sched.pop(), Some(2)); assert_eq!(sched.pop(), Some(1));
}
#[test]
fn reprioritize_nonexistent_returns_false() {
let mut sched = DynamicPriorityScheduler::new();
assert!(!sched.reprioritize(999, Priority::High));
}
#[test]
fn reprioritize_preserves_fifo_within_new_priority() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(1, Priority::Normal); sched.push(2, Priority::Normal); sched.push(3, Priority::High);
assert!(sched.reprioritize(1, Priority::High));
assert_eq!(sched.pop(), Some(1)); assert_eq!(sched.pop(), Some(3)); assert_eq!(sched.pop(), Some(2)); }
#[test]
fn remove_works() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(1, Priority::Normal);
sched.push(2, Priority::High);
assert!(sched.remove(2));
assert_eq!(sched.len(), 1);
assert_eq!(sched.pop(), Some(1));
assert!(!sched.remove(2)); }
#[test]
fn peek_priority() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(5, Priority::High);
assert_eq!(sched.peek_priority(5), Some(Priority::High));
sched.pop();
assert_eq!(sched.peek_priority(5), None);
}
#[test]
fn snapshot_ordered() {
let mut sched = DynamicPriorityScheduler::new();
sched.push(1, Priority::Low);
sched.push(2, Priority::Critical);
sched.push(3, Priority::Normal);
let snap = sched.snapshot();
assert_eq!(snap[0].1, Priority::Critical);
assert_eq!(snap[1].1, Priority::Normal);
assert_eq!(snap[2].1, Priority::Low);
assert_eq!(sched.len(), 3);
}
#[test]
fn shared_scheduler_push_pop() {
let sched = SharedDynamicScheduler::new();
sched.push(1, Priority::Normal);
sched.push(2, Priority::Critical);
assert_eq!(sched.pop(), Some(2));
assert_eq!(sched.pop(), Some(1));
}
#[test]
fn priority_handle_reprioritize() {
let sched = SharedDynamicScheduler::new();
let handle = sched.push(42, Priority::Low);
sched.push(7, Priority::Normal);
assert_eq!(handle.current_priority(), Some(Priority::Low));
assert!(handle.reprioritize(Priority::Critical));
assert_eq!(handle.current_priority(), Some(Priority::Critical));
assert_eq!(sched.pop(), Some(42)); assert_eq!(sched.pop(), Some(7));
assert_eq!(handle.current_priority(), None);
assert!(!handle.is_pending());
}
#[test]
fn priority_handle_cancel() {
let sched = SharedDynamicScheduler::new();
let h = sched.push(99, Priority::High);
sched.push(1, Priority::Normal);
assert!(h.cancel());
assert!(!h.cancel());
assert_eq!(sched.len(), 1);
assert_eq!(sched.pop(), Some(1));
}
#[test]
fn escalation_policy_bumps_low_to_normal() {
let sched = SharedDynamicScheduler::new();
sched.push(1, Priority::Low);
sched.push(2, Priority::Normal);
let policy = EscalationPolicy::new(sched.clone(), 1, 1, 2);
policy.escalate();
assert_eq!(sched.peek_priority(1), Some(Priority::Normal));
assert_eq!(sched.peek_priority(2), Some(Priority::High));
}
#[test]
fn concurrent_push_reprioritize() {
use std::sync::Arc;
let sched = SharedDynamicScheduler::new();
let sched2 = sched.clone();
let handle = sched.push(10, Priority::Low);
let t = std::thread::spawn(move || {
handle.reprioritize(Priority::Critical);
});
t.join().unwrap();
assert_eq!(sched2.pop(), Some(10));
}
}