use std::cmp::Ordering;
use std::collections::BinaryHeap;
use crate::ids::CommandId;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TimerKind {
Sleep(CommandId),
Interval {
period_ns: u64,
key: u64,
},
After {
key: u64,
},
Completion(CommandId),
}
#[derive(Debug)]
struct Entry {
maturity_ns: u64,
kind: TimerKind,
}
impl Ord for Entry {
fn cmp(&self, other: &Self) -> Ordering {
other.maturity_ns.cmp(&self.maturity_ns)
}
}
impl PartialOrd for Entry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Entry {
fn eq(&self, other: &Self) -> bool {
self.maturity_ns == other.maturity_ns
}
}
impl Eq for Entry {}
pub const DEFAULT_TIMER_HEAP_CAP: usize = 65_536;
pub struct Scheduler {
heap: BinaryHeap<Entry>,
now_ns: u64,
cap: usize,
dropped: u64,
}
impl Default for Scheduler {
fn default() -> Self {
Self {
heap: BinaryHeap::new(),
now_ns: 0,
cap: DEFAULT_TIMER_HEAP_CAP,
dropped: 0,
}
}
}
impl Scheduler {
pub fn new() -> Self {
Self::default()
}
pub fn set_cap(&mut self, cap: usize) {
self.cap = cap.max(1);
}
pub fn dropped(&self) -> u64 {
self.dropped
}
pub fn set_now(&mut self, now_ns: u64) {
self.now_ns = now_ns;
}
pub fn now_ns(&self) -> u64 {
self.now_ns
}
pub fn schedule(&mut self, maturity_ns: u64, kind: TimerKind) {
if self.heap.len() >= self.cap {
self.dropped = self.dropped.saturating_add(1);
tracing::warn!(
cap = self.cap,
dropped_total = self.dropped,
?kind,
"Scheduler: timer dropped, heap at cap",
);
return;
}
self.heap.push(Entry { maturity_ns, kind });
}
pub fn has_matured(&self, now_ns: u64) -> bool {
self.heap.peek().is_some_and(|e| e.maturity_ns <= now_ns)
}
pub fn poll_matured(&mut self, now_ns: u64) -> Vec<TimerKind> {
let mut out = Vec::new();
while let Some(entry) = self.heap.peek() {
if entry.maturity_ns > now_ns {
break;
}
out.push(self.heap.pop().expect("just peeked").kind);
}
out
}
pub fn len(&self) -> usize {
self.heap.len()
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
}
#[cfg(test)]
mod cap_tests {
use super::*;
use crate::ids::CommandId;
#[test]
fn schedule_drops_at_cap() {
let mut s = Scheduler::new();
s.set_cap(3);
for i in 0..3 {
s.schedule(i, TimerKind::Sleep(CommandId::from(i)));
}
assert_eq!(s.len(), 3);
s.schedule(100, TimerKind::Sleep(CommandId::from(100)));
assert_eq!(s.len(), 3, "4th schedule dropped");
assert_eq!(s.dropped(), 1);
}
#[test]
fn schedule_resumes_when_heap_drains_below_cap() {
let mut s = Scheduler::new();
s.set_cap(2);
s.schedule(0, TimerKind::Sleep(CommandId::from(0)));
s.schedule(1, TimerKind::Sleep(CommandId::from(1)));
let _matured = s.poll_matured(0);
s.schedule(2, TimerKind::Sleep(CommandId::from(2)));
assert_eq!(s.dropped(), 0);
}
}