//! Module with the shared timers implementation.
//!
//! Also see the local timers implementation.
use std::cmp::{min, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use crate::ProcessId;
#[cfg(test)]
#[path = "timers_tests.rs"]
mod timers_tests;
/// Bits needed for the number of slots.
const SLOT_BITS: usize = 6;
/// Number of slots in the [`Timers`] wheel, 64.
const SLOTS: usize = 1 << SLOT_BITS;
/// Bits needed for the nanoseconds per slot.
const NS_PER_SLOT_BITS: usize = 30;
/// Nanoseconds per slot, 1073741824 ns ~= 1 second.
const NS_PER_SLOT: TimeOffset = 1 << NS_PER_SLOT_BITS;
/// Duration per slot, [`NS_PER_SLOT`] as [`Duration`].
const DURATION_PER_SLOT: Duration = Duration::from_nanos(NS_PER_SLOT as u64);
/// Timers within `((1 << 6) * (1 << 30))` ~= 68 seconds since the epoch fit in
/// the wheel, others get added to the overflow.
const NS_OVERFLOW: u64 = SLOTS as u64 * NS_PER_SLOT as u64;
/// Duration per slot, [`NS_OVERFLOW`] as [`Duration`].
const OVERFLOW_DURATION: Duration = Duration::from_nanos(NS_OVERFLOW);
/// Mask to get the nanoseconds for a slot.
const NS_SLOT_MASK: u128 = (1 << NS_PER_SLOT_BITS) - 1;
/// Time offset since the epoch of [`Timers::epoch`].
///
/// Must fit [`NS_PER_SLOT`].
type TimeOffset = u32;
/// Timers.
///
/// This implementation is based on a Timing Wheel as discussed in the paper
/// "Hashed and hierarchical timing wheels: efficient data structures for
/// implementing a timer facility" by George Varghese and Anthony Lauck (1997).
///
/// This uses a scheme that splits the timers based on when they're going to
/// expire. It has 64 ([`SLOTS`]) slots each representing roughly a second of
/// time ([`NS_PER_SLOT`]). This allows us to only consider a portion of all
/// timers when processing the timers. Any timers that don't fit into these
/// slots, i.e. timers with a deadline more than 68 seconds ([`NS_OVERFLOW`])
/// past `epoch`, are put in a overflow list. Ideally this overflow list is
/// empty however.
///
/// The `slots` hold the timers with a [`TimeOffset`] which is the number of
/// nanosecond since epoch times it's index. The `index` filed determines the
/// current zero-slot, meaning its timers will expire next and all have a
/// deadline within `0..NS_PER_SLOT` nanoseconds after `epoch`. The
/// `slots[index+1]` list will have timers that expire
/// `NS_PER_SLOT..2*NS_PER_SLOT` nanoseconds after `epoch`. In other words each
/// slot holds the timers that expire in the ~second after the previous slot.
///
/// Whenever timers are removed by `remove_next` it will attempt to update the
/// `epoch`, which is used as anchor point to determine in what slot/overflow
/// the timer must go (see above). When updating the epoch it will increase the
/// `index` by 1 and the `epoch` by [`NS_PER_SLOT`] nanoseconds in a single
/// atomic step (thus requiring a lock around `Epoch`). This means the next slot
/// (now `slots[index+1]`) holds timers that expire `0..NS_PER_SLOT` nanoseconds
/// after `epoch`.
///
/// Note that it's possible for a thread to read the epoch (index and time),
/// than gets descheduled, another thread updates the epoch and finally the
/// second thread insert the time based on a now outdated epoch. This situation
/// is fine as the timer will still be added to the correct slot, but it has a
/// higher change of being added to the overflow list (which
/// `maybe_update_epoch` deals with correctly).
#[derive(Debug)]
pub(crate) struct Timers {
epoch: RwLock<Epoch>,
/// The vectors are sorted.
slots: [RwLock<Vec<Timer<TimeOffset>>>; SLOTS],
/// The vector is sorted.
overflow: RwLock<Vec<Timer<Instant>>>,
}
/// Separate struct because both fields need to be updated atomically.
#[derive(Debug)]
struct Epoch {
time: Instant,
index: u8,
}
impl Timers {
/// Create a new collection of timers.
pub(crate) fn new() -> Timers {
Timers {
epoch: RwLock::new(Epoch {
time: Instant::now(),
index: 0,
}),
// TODO: replace with `RwLock::new(Vec::new()); SLOTS]` once
// possible.
#[rustfmt::skip]
slots: [RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new()), RwLock::new(Vec::new())],
overflow: RwLock::new(Vec::new()),
}
}
/// Returns the total number of timers.
pub(crate) fn len(&self) -> usize {
let mut timers = 0;
for slots in &self.slots {
let slots = slots.read().unwrap();
let len = slots.len();
drop(slots);
timers += len;
}
{
let overflow = self.overflow.read().unwrap();
timers += overflow.len()
}
timers
}
/// Returns the next deadline, if any.
///
/// If this return `Some` `woke_from_polling` must be called after polling,
/// before removing timers. That thread must also wake other workers threads
/// as they will see `None` here, **even if there is a timer set**.
pub(crate) fn next(&self) -> Option<Instant> {
let (epoch_time, index) = {
let epoch = self.epoch.read().unwrap();
(epoch.time, epoch.index as usize)
};
let (second, first) = self.slots.split_at(index);
let iter = first.iter().chain(second.iter());
for (n, slot) in iter.enumerate() {
let deadline = { slot.read().unwrap().last().map(|timer| timer.deadline) };
if let Some(deadline) = deadline {
let ns_since_epoch = u64::from(deadline) + (n as u64 * u64::from(NS_PER_SLOT));
let deadline = epoch_time + Duration::from_nanos(ns_since_epoch);
return Some(deadline);
}
}
#[rustfmt::skip]
self.overflow.read().unwrap().last().map(|timer| timer.deadline)
}
/// Same as [`next`], but returns a [`Duration`] instead. If the next
/// deadline is already passed this returns a duration of zero.
///
/// [`next`]: Timers::next
pub(crate) fn next_timer(&self) -> Option<Duration> {
self.next().map(|deadline| {
Instant::now()
.checked_duration_since(deadline)
.unwrap_or(Duration::ZERO)
})
}
/// Add a new deadline.
pub(crate) fn add(&self, pid: ProcessId, deadline: Instant) {
// NOTE: it's possible that we call `add_timer` based on an outdated
// epoch.
self.get_timers(pid, deadline, add_timer, add_timer);
}
/// Remove a previously added deadline.
pub(crate) fn remove(&self, pid: ProcessId, deadline: Instant) {
self.get_timers(pid, deadline, remove_timer, remove_timer);
}
/// Change the `ProcessId` of a previously added deadline.
pub(crate) fn change(&self, pid: ProcessId, deadline: Instant, new_pid: ProcessId) {
self.get_timers(
pid,
deadline,
|timers, timer| change_timer(timers, timer, new_pid),
|timers, timer| change_timer(timers, timer, new_pid),
);
}
/// Determines in what list of timers a timer with `pid` and `deadline`
/// would be/go into. Then calls the `slot_f` function for a timer list in
/// the slots, or `overflow_f` with the overflow list.
fn get_timers<SF, OF>(&self, pid: ProcessId, deadline: Instant, slot_f: SF, overflow_f: OF)
where
SF: FnOnce(&mut Vec<Timer<TimeOffset>>, Timer<TimeOffset>),
OF: FnOnce(&mut Vec<Timer<Instant>>, Timer<Instant>),
{
let (epoch_time, epoch_index) = {
let epoch = self.epoch.read().unwrap();
(epoch.time, epoch.index)
};
let ns_since_epoch = deadline.saturating_duration_since(epoch_time).as_nanos();
if ns_since_epoch < u128::from(NS_OVERFLOW) {
#[allow(clippy::cast_possible_truncation)] // OK to truncate.
let deadline = (ns_since_epoch & NS_SLOT_MASK) as TimeOffset;
let index = ((ns_since_epoch >> NS_PER_SLOT_BITS) & ((1 << SLOT_BITS) - 1)) as usize;
let index = (epoch_index as usize + index) % SLOTS;
let mut timers = self.slots[index].write().unwrap();
slot_f(&mut timers, Timer { pid, deadline });
} else {
// Too far into the future to fit in the slots.
let mut overflow = self.overflow.write().unwrap();
overflow_f(&mut overflow, Timer { pid, deadline });
}
}
/// Remove the next deadline that passed `now` returning the pid.
///
/// # Safety
///
/// `now` may never go backwards between calls.
pub(crate) fn remove_next(&self, now: Instant) -> Option<ProcessId> {
loop {
// NOTE: Each loop iteration needs to calculate the `epoch_offset`
// as the epoch changes each iteration.
let (epoch_time, index) = {
let epoch = self.epoch.read().unwrap();
(epoch.time, epoch.index as usize)
};
// Safety: `now` can't go backwards, otherwise this will panic.
let epoch_offset = now.duration_since(epoch_time).as_nanos();
// NOTE: this truncates, which is fine as we need a max. of
// `NS_PER_SLOT` anyway.
#[allow(clippy::cast_possible_truncation)]
let epoch_offset = min(epoch_offset, u128::from(TimeOffset::MAX)) as TimeOffset;
let res = {
let mut timers = self.slots[index].write().unwrap();
remove_if_before(&mut timers, epoch_offset)
};
match res {
Ok(timer) => return Some(timer.pid),
Err(true) => {
// Safety: slot is empty, which makes calling
// `maybe_update_epoch` OK.
if !self.maybe_update_epoch(now) {
// Didn't update epoch, no more timers to process.
return None;
}
// Else try again in the next loop.
}
// Slot has timers with a deadline past `now`.
Err(false) => return None,
}
}
}
/// Attempt to update the epoch based on the current time.
///
/// # Panics
///
/// This panics if the current slot is not empty.
#[allow(clippy::cast_possible_truncation)] // TODO: move to new `epoch.index` line.
fn maybe_update_epoch(&self, now: Instant) -> bool {
let epoch_time = {
let mut epoch = self.epoch.write().unwrap();
let new_epoch = epoch.time + DURATION_PER_SLOT;
if new_epoch > now {
// Can't move to the next slot yet.
return false;
}
// Can't have old timers with a different absolute time.
debug_assert!(self.slots[epoch.index as usize].read().unwrap().is_empty());
// Move to the next slot and update the epoch.
epoch.index = (epoch.index + 1) % self.slots.len() as u8;
epoch.time = new_epoch;
new_epoch
};
// Next move all the overflow timers that now fit in the slots.
let time = epoch_time + OVERFLOW_DURATION;
while let Ok(timer) = { remove_if_before(&mut self.overflow.write().unwrap(), time) } {
// NOTE: we can't use the same optimisation as we do in the local
// version where we know that all timers removed here go into the
// `self.index-1` slot.
// Because `add` has to work with outdated epoch information it
// could be that it add a timers to the overflow list which could
// have fit in one of the slots. So we have to deal with that
// possbility here.
self.add(timer.pid, timer.deadline);
}
true
}
}
/// Add `timer` to `timers`, ensuring it remains sorted.
fn add_timer<T>(timers: &mut Vec<Timer<T>>, timer: Timer<T>)
where
Timer<T>: Ord + Copy,
{
let idx = match timers.binary_search(&timer) {
Ok(idx) | Err(idx) => idx,
};
timers.insert(idx, timer);
}
/// Remove a previously added `timer` from `timers`, ensuring it remains sorted.
fn remove_timer<T>(timers: &mut Vec<Timer<T>>, timer: Timer<T>)
where
Timer<T>: Ord + Copy,
{
if let Ok(idx) = timers.binary_search(&timer) {
let _ = timers.remove(idx);
}
}
/// Change the pid of a previously added `timer` in `timers`
fn change_timer<T>(timers: &mut Vec<Timer<T>>, timer: Timer<T>, new_pid: ProcessId)
where
Timer<T>: Ord + Copy,
{
match timers.binary_search(&timer) {
Ok(idx) => timers[idx].pid = new_pid,
#[rustfmt::skip]
Err(idx) => timers.insert(idx, Timer { pid: new_pid, deadline: timer.deadline }),
}
}
/// Remove the first timer if it's before `time`.
///
/// Returns `Ok(timer)` if there is a timer with a deadline before `time`.
/// Returns `Err(is_empty)`, indicating if `timers` is empty. Returns
/// `Err(true)` is `timers` is empty, `Err(false)` if the are more timers in
/// `timers`, but none with a deadline before `time`.
fn remove_if_before<T>(timers: &mut Vec<Timer<T>>, time: T) -> Result<Timer<T>, bool>
where
T: Ord + Copy,
{
match timers.last() {
// TODO: is the `unwrap` removed here? Or do we need `unwrap_unchecked`?
Some(timer) if timer.deadline <= time => Ok(timers.pop().unwrap()),
Some(_) => Err(false),
None => Err(true),
}
}
/// A timer.
///
/// # Notes
///
/// The [`Ord`] implementation is in reverse order, i.e. the deadline to expire
/// first will have the highest ordering value. Furthermore the ordering is only
/// done base on the deadline, the process id is ignored in ordering. This
/// allows `change_timer` to not worry about order when changing the process id
/// of a timer.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct Timer<T> {
pid: ProcessId,
deadline: T,
}
impl<T> Ord for Timer<T>
where
T: Ord,
{
fn cmp(&self, other: &Self) -> Ordering {
other.deadline.cmp(&self.deadline)
}
}
impl<T> PartialOrd for Timer<T>
where
T: Ord,
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}