use std::cmp::{min, Ordering};
use std::time::{Duration, Instant};
use crate::ProcessId;
#[cfg(test)]
#[path = "timers_tests.rs"]
mod timers_tests;
const SLOT_BITS: usize = 6;
const SLOTS: usize = 1 << SLOT_BITS;
const NS_PER_SLOT_BITS: usize = 30;
const NS_PER_SLOT: TimeOffset = 1 << NS_PER_SLOT_BITS;
const DURATION_PER_SLOT: Duration = Duration::from_nanos(NS_PER_SLOT as u64);
const NS_OVERFLOW: u64 = SLOTS as u64 * NS_PER_SLOT as u64;
const OVERFLOW_DURATION: Duration = Duration::from_nanos(NS_OVERFLOW);
const NS_SLOT_MASK: u128 = (1 << NS_PER_SLOT_BITS) - 1;
type TimeOffset = u32;
#[derive(Debug)]
pub(crate) struct Timers {
epoch: Instant,
index: u8,
slots: [Vec<Timer<TimeOffset>>; SLOTS],
overflow: Vec<Timer<Instant>>,
cached_next_deadline: CachedInstant,
}
impl Timers {
pub(crate) fn new() -> Timers {
const EMPTY: Vec<Timer<TimeOffset>> = Vec::new();
Timers {
epoch: Instant::now(),
index: 0,
slots: [EMPTY; SLOTS],
overflow: Vec::new(),
cached_next_deadline: CachedInstant::Empty,
}
}
pub(crate) fn len(&self) -> usize {
let mut timers = 0;
for slots in &self.slots {
timers += slots.len();
}
timers + self.overflow.len()
}
pub(crate) fn next(&mut self) -> Option<Instant> {
match self.cached_next_deadline {
CachedInstant::Empty => None,
CachedInstant::Set(deadline) => Some(deadline),
CachedInstant::Unset => {
let (second, first) = self.slots.split_at(self.index as usize);
let iter = first.iter().chain(second.iter());
for (n, slot) in iter.enumerate() {
if let Some(timer) = slot.last() {
let ns_since_epoch =
u64::from(timer.deadline) + (n as u64 * u64::from(NS_PER_SLOT));
let deadline = self.epoch + Duration::from_nanos(ns_since_epoch);
self.cached_next_deadline = CachedInstant::Set(deadline);
return Some(deadline);
}
}
if let Some(timer) = self.overflow.last() {
self.cached_next_deadline = CachedInstant::Set(timer.deadline);
Some(timer.deadline)
} else {
self.cached_next_deadline = CachedInstant::Empty;
None
}
}
}
}
pub(crate) fn next_timer(&mut self) -> Option<Duration> {
self.next().map(|deadline| {
Instant::now()
.checked_duration_since(deadline)
.unwrap_or(Duration::ZERO)
})
}
pub(crate) fn add(&mut self, pid: ProcessId, deadline: Instant) {
let deadline = self.checked_deadline(deadline);
self.cached_next_deadline.update(deadline);
self.get_timers(pid, deadline, add_timer, add_timer);
}
pub(crate) fn remove(&mut self, pid: ProcessId, deadline: Instant) {
let deadline = self.checked_deadline(deadline);
self.cached_next_deadline.invalidate(deadline);
self.get_timers(pid, deadline, remove_timer, remove_timer);
}
pub(crate) fn change(&mut self, pid: ProcessId, deadline: Instant, new_pid: ProcessId) {
let deadline = self.checked_deadline(deadline);
self.cached_next_deadline.update(deadline);
self.get_timers(
pid,
deadline,
|timers, timer| change_timer(timers, timer, new_pid),
|timers, timer| change_timer(timers, timer, new_pid),
);
}
fn get_timers<SF, OF>(&mut self, pid: ProcessId, deadline: Instant, slot_f: SF, overflow_f: OF)
where
SF: FnOnce(&mut Vec<Timer<TimeOffset>>, Timer<TimeOffset>),
OF: FnOnce(&mut Vec<Timer<Instant>>, Timer<Instant>),
{
let ns_since_epoch = deadline.saturating_duration_since(self.epoch).as_nanos();
if ns_since_epoch < u128::from(NS_OVERFLOW) {
#[allow(clippy::cast_possible_truncation)] let offset = (ns_since_epoch & NS_SLOT_MASK) as TimeOffset;
let index = ((ns_since_epoch >> NS_PER_SLOT_BITS) & ((1 << SLOT_BITS) - 1)) as usize;
#[rustfmt::skip]
debug_assert_eq!(
deadline,
self.epoch + Duration::from_nanos((index as u64 * u64::from(NS_PER_SLOT)) + u64::from(offset))
);
let index = (self.index as usize + index) % SLOTS;
let timer = Timer {
pid,
deadline: offset,
};
slot_f(&mut self.slots[index], timer);
} else {
overflow_f(&mut self.overflow, Timer { pid, deadline });
}
}
pub(crate) fn deadlines(&mut self, now: Instant) -> Deadlines<'_> {
Deadlines { timers: self, now }
}
fn remove_next(&mut self, now: Instant) -> Option<ProcessId> {
loop {
let epoch_offset = now.duration_since(self.epoch).as_nanos();
#[allow(clippy::cast_possible_truncation)]
let epoch_offset = min(epoch_offset, u128::from(TimeOffset::MAX)) as TimeOffset;
match remove_if_before(self.current_slot(), epoch_offset) {
Ok(timer) => {
self.cached_next_deadline = CachedInstant::Unset;
return Some(timer.pid);
}
Err(true) => {
if !self.maybe_update_epoch(epoch_offset) {
return None;
}
}
Err(false) => return None,
}
}
}
#[allow(clippy::cast_possible_truncation)] #[allow(clippy::debug_assert_with_mut_call)]
fn maybe_update_epoch(&mut self, epoch_offset: TimeOffset) -> bool {
if epoch_offset < NS_PER_SLOT {
return false;
}
debug_assert!(self.current_slot().is_empty());
let last_index = self.index as usize;
self.index = (self.index + 1) % SLOTS as u8;
self.epoch += DURATION_PER_SLOT;
let time = self.epoch + OVERFLOW_DURATION;
let slot_epoch = self.epoch + OVERFLOW_DURATION - DURATION_PER_SLOT;
let timers = &mut self.slots[last_index];
while let Ok(timer) = remove_if_before(&mut self.overflow, time) {
timers.push(Timer {
pid: timer.pid,
deadline: as_offset(slot_epoch, timer.deadline),
});
}
timers.reverse();
debug_assert!(timers.is_sorted());
true
}
fn checked_deadline(&self, deadline: Instant) -> Instant {
if deadline < self.epoch {
self.epoch
} else {
deadline
}
}
fn current_slot(&mut self) -> &mut Vec<Timer<TimeOffset>> {
&mut self.slots[self.index as usize]
}
}
fn add_timer<T>(timers: &mut Vec<Timer<T>>, timer: Timer<T>)
where
Timer<T>: Ord,
{
let idx = match timers.binary_search(&timer) {
Ok(idx) | Err(idx) => idx,
};
timers.insert(idx, timer);
}
fn remove_timer<T>(timers: &mut Vec<Timer<T>>, timer: Timer<T>)
where
Timer<T>: Ord + Copy,
{
if let Ok(idx) = timers.binary_search(&timer) {
let _ = timers.remove(idx);
}
}
fn change_timer<T>(timers: &mut Vec<Timer<T>>, timer: Timer<T>, new_pid: ProcessId)
where
Timer<T>: Ord + Copy,
{
match timers.binary_search(&timer) {
Ok(idx) => timers[idx].pid = new_pid,
#[rustfmt::skip]
Err(idx) => timers.insert(idx, Timer { pid: new_pid, deadline: timer.deadline }),
}
}
#[allow(clippy::cast_possible_truncation)] fn as_offset(epoch: Instant, time: Instant) -> TimeOffset {
let nanos = time.duration_since(epoch).as_nanos();
debug_assert!(nanos < u128::from(NS_PER_SLOT));
(nanos & NS_SLOT_MASK) as TimeOffset
}
fn remove_if_before<T>(timers: &mut Vec<Timer<T>>, time: T) -> Result<Timer<T>, bool>
where
T: Ord + Copy,
{
match timers.last() {
Some(timer) if timer.deadline <= time => Ok(timers.pop().unwrap()),
Some(_) => Err(false),
None => Err(true),
}
}
#[derive(Debug)]
enum CachedInstant {
Empty,
Unset,
Set(Instant),
}
impl CachedInstant {
fn update(&mut self, deadline: Instant) {
match self {
CachedInstant::Empty => *self = CachedInstant::Set(deadline),
CachedInstant::Set(current) if deadline < *current => {
*current = deadline;
}
CachedInstant::Unset |
CachedInstant::Set(_) => {},
}
}
fn invalidate(&mut self, deadline: Instant) {
match self {
CachedInstant::Set(current) if *current == deadline => {
*self = CachedInstant::Unset;
}
CachedInstant::Set(_) | CachedInstant::Empty | CachedInstant::Unset => {}
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct Timer<T> {
pid: ProcessId,
deadline: T,
}
impl<T> Ord for Timer<T>
where
T: Ord,
{
fn cmp(&self, other: &Self) -> Ordering {
other.deadline.cmp(&self.deadline)
}
}
impl<T> PartialOrd for Timer<T>
where
T: Ord,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug)]
pub(crate) struct Deadlines<'t> {
timers: &'t mut Timers,
now: Instant,
}
impl<'t> Iterator for Deadlines<'t> {
type Item = ProcessId;
fn next(&mut self) -> Option<Self::Item> {
self.timers.remove_next(self.now)
}
}