use super::{byte_wheel::*, *};
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(
feature = "serde",
serde(bound = "EntryType: serde::Serialize + serde::de::DeserializeOwned")
)]
#[derive(Clone)]
struct OverflowEntry<EntryType> {
entry: EntryType,
remaining_delay: Duration,
}
impl<EntryType> OverflowEntry<EntryType> {
fn new(entry: EntryType, remaining_delay: Duration) -> Self {
OverflowEntry {
entry,
remaining_delay,
}
}
}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[derive(PartialEq, Copy, Clone, Eq, Debug)]
pub enum PruneDecision {
Keep,
Drop,
}
impl PruneDecision {
#[inline(always)]
pub fn should_keep(&self) -> bool {
self == &PruneDecision::Keep
}
#[inline(always)]
pub fn should_drop(&self) -> bool {
self == &PruneDecision::Drop
}
}
pub fn no_prune<E>(_e: &E) -> PruneDecision {
PruneDecision::Keep
}
#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
#[cfg_attr(
feature = "serde",
serde(bound = "EntryType: serde::Serialize + serde::de::DeserializeOwned")
)]
pub struct QuadWheelWithOverflow<EntryType> {
primary: Box<ByteWheel<EntryType, [u8; 0]>>,
secondary: Box<ByteWheel<EntryType, [u8; 1]>>,
tertiary: Box<ByteWheel<EntryType, [u8; 2]>>,
quarternary: Box<ByteWheel<EntryType, [u8; 3]>>,
overflow: Vec<OverflowEntry<EntryType>>,
pruner: PruneDecision,
}
const MAX_SCHEDULE_DUR: Duration = Duration::from_millis(u32::MAX as u64);
const CYCLE_LENGTH: u64 = 1 << 32; const PRIMARY_LENGTH: u32 = 1 << 8; const SECONDARY_LENGTH: u32 = 1 << 16; const TERTIARY_LENGTH: u32 = 1 << 24;
impl<EntryType> Default for QuadWheelWithOverflow<EntryType> {
fn default() -> Self {
QuadWheelWithOverflow::new(PruneDecision::Keep)
}
}
impl<EntryType> QuadWheelWithOverflow<EntryType>
where
EntryType: TimerEntryWithDelay,
{
pub fn insert(&mut self, e: EntryType) -> Result<(), TimerError<EntryType>> {
let delay = e.delay();
self.insert_with_delay(e, delay)
}
}
impl<EntryType> QuadWheelWithOverflow<EntryType> {
pub fn new(pruner: PruneDecision) -> Self {
QuadWheelWithOverflow {
primary: Box::new(ByteWheel::new()),
secondary: Box::new(ByteWheel::new()),
tertiary: Box::new(ByteWheel::new()),
quarternary: Box::new(ByteWheel::new()),
overflow: Vec::new(),
pruner,
}
}
pub fn remaining_time_in_cycle(&self) -> u64 {
CYCLE_LENGTH - (self.current_time_in_cycle() as u64)
}
pub fn current_time_in_cycle(&self) -> u32 {
let time_bytes = [
self.quarternary.current(),
self.tertiary.current(),
self.secondary.current(),
self.primary.current(),
];
let mut result: u32 = 0;
for &byte in &time_bytes {
result = (result << 8) | (byte as u32);
}
result
}
pub fn insert_with_delay(
&mut self,
e: EntryType,
delay: Duration,
) -> Result<(), TimerError<EntryType>> {
if delay >= MAX_SCHEDULE_DUR {
let remaining_delay = Duration::from_millis(self.remaining_time_in_cycle());
let new_delay = delay - remaining_delay;
let overflow_e = OverflowEntry::new(e, new_delay);
self.overflow.push(overflow_e);
Ok(())
} else {
let delay = {
let s = (delay.as_secs() * 1000) as u32;
let ms = delay.subsec_millis();
s + ms
};
let current_time = self.current_time_in_cycle();
let absolute_time = delay.wrapping_add(current_time);
let absolute_bytes: [u8; 4] = absolute_time.to_be_bytes();
let zero_time = absolute_time ^ current_time; let zero_bytes: [u8; 4] = zero_time.to_be_bytes();
match zero_bytes {
[0, 0, 0, 0] => Err(TimerError::Expired(e)),
[0, 0, 0, _] => {
self.primary.insert(absolute_bytes[3], e, []);
Ok(())
}
[0, 0, _, _] => {
self.secondary
.insert(absolute_bytes[2], e, [absolute_bytes[3]]);
Ok(())
}
[0, _, _, _] => {
self.tertiary.insert(
absolute_bytes[1],
e,
[absolute_bytes[2], absolute_bytes[3]],
);
Ok(())
}
[_, _, _, _] => {
self.quarternary.insert(
absolute_bytes[0],
e,
[absolute_bytes[1], absolute_bytes[2], absolute_bytes[3]],
);
Ok(())
}
}
}
}
pub fn tick(&mut self) -> Vec<EntryType> {
let mut res: Vec<EntryType> = Vec::new();
let (move0_opt, current0) = self.primary.tick();
if let Some(move0) = move0_opt {
res.reserve(move0.len());
for we in move0 {
if self.pruner.should_keep() {
res.push(we.entry);
}
}
}
if current0 == 0u8 {
let (move1_opt, current1) = self.secondary.tick();
if let Some(move1) = move1_opt {
for we in move1 {
if self.pruner.should_keep() {
if we.rest[0] == 0u8 {
res.push(we.entry);
} else {
self.primary.insert(we.rest[0], we.entry, []);
}
}
}
}
if current1 == 0u8 {
let (move2_opt, current2) = self.tertiary.tick();
if let Some(move2) = move2_opt {
for we in move2 {
if self.pruner.should_keep() {
match we.rest {
[0, 0] => {
res.push(we.entry);
}
[0, b0] => {
self.primary.insert(b0, we.entry, []);
}
[b1, b0] => {
self.secondary.insert(b1, we.entry, [b0]);
}
}
}
}
}
if current2 == 0u8 {
let (move3_opt, current3) = self.quarternary.tick();
if let Some(move3) = move3_opt {
for we in move3 {
if self.pruner.should_keep() {
match we.rest {
[0, 0, 0] => {
res.push(we.entry);
}
[0, 0, b0] => {
self.primary.insert(b0, we.entry, []);
}
[0, b1, b0] => {
self.secondary.insert(b1, we.entry, [b0]);
}
[b2, b1, b0] => {
self.tertiary.insert(b2, we.entry, [b1, b0]);
}
}
}
}
}
if current3 == 0u8 {
if !self.overflow.is_empty() {
let mut ol = Vec::with_capacity(self.overflow.len() / 2); core::mem::swap(&mut self.overflow, &mut ol);
for overflow_e in ol {
if self.pruner.should_keep() {
match self.insert_with_delay(
overflow_e.entry,
overflow_e.remaining_delay,
) {
Ok(()) => (), Err(TimerError::Expired(e)) => res.push(e),
Err(_f) => panic!("Unexpected error during insert"),
}
}
}
}
}
}
}
}
res
}
pub fn skip(&mut self, amount: u32) {
let new_time = self.current_time_in_cycle().wrapping_add(amount);
let new_time_bytes: [u8; 4] = new_time.to_be_bytes();
self.primary.advance(new_time_bytes[3]);
self.secondary.advance(new_time_bytes[2]);
self.tertiary.advance(new_time_bytes[1]);
self.quarternary.advance(new_time_bytes[0]);
}
pub fn can_skip(&self) -> Skip {
if self.primary.is_empty() {
if self.secondary.is_empty() {
if self.tertiary.is_empty() {
if self.quarternary.is_empty() {
if self.overflow.is_empty() {
Skip::Empty
} else {
Skip::from_millis((self.remaining_time_in_cycle() - 1u64) as u32)
}
} else {
let tertiary_current =
self.current_time_in_cycle() & (TERTIARY_LENGTH - 1u32); let rem = TERTIARY_LENGTH - tertiary_current;
Skip::from_millis(rem - 1u32)
}
} else {
let secondary_current =
self.current_time_in_cycle() & (SECONDARY_LENGTH - 1u32); let rem = SECONDARY_LENGTH - secondary_current;
Skip::from_millis(rem - 1u32)
}
} else {
let primary_current = self.primary.current() as u32;
let rem = PRIMARY_LENGTH - primary_current;
Skip::from_millis(rem - 1u32)
}
} else {
Skip::None
}
}
}
#[cfg(test)]
mod u64_tests {
use super::*;
#[test]
fn single_schedule_fail() {
let mut timer = QuadWheelWithOverflow::default();
let id = 1u64;
let res = timer.insert(IdOnlyTimerEntry {
id,
delay: Duration::from_millis(0),
});
assert!(res.is_err());
match res {
Err(TimerError::Expired(e)) => assert_eq!(e.id(), &id),
_ => panic!("Unexpected result {:?}", res),
}
}
#[test]
fn single_ms_schedule() {
let mut timer = QuadWheelWithOverflow::default();
let id = 1u64;
timer
.insert(IdOnlyTimerEntry {
id,
delay: Duration::from_millis(1),
})
.expect("Could not insert timer entry!");
let res = timer.tick();
assert_eq!(res.len(), 1);
assert_eq!(res[0].id(), &id);
}
#[test]
fn single_ms_reschedule() {
let mut timer = QuadWheelWithOverflow::default();
let id = 1u64;
let entry = IdOnlyTimerEntry {
id,
delay: Duration::from_millis(1),
};
timer.insert(entry).expect("Could not insert timer entry!");
for _ in 0..1000 {
let mut res = timer.tick();
assert_eq!(res.len(), 1);
let entry = res.pop().unwrap();
assert_eq!(entry.id(), &id);
timer.insert(entry).expect("Could not insert timer entry!");
}
}
#[test]
fn increasing_schedule_no_overflow() {
let mut timer = QuadWheelWithOverflow::default();
let mut ids: [u64; 25] = [0; 25];
for (i, slot) in ids.iter_mut().enumerate() {
let timeout: u64 = 1 << i;
let id = i as u64;
*slot = id;
let entry = IdOnlyTimerEntry {
id,
delay: Duration::from_millis(timeout),
};
timer.insert(entry).expect("Could not insert timer entry!");
}
for (i, slot) in ids.iter().enumerate() {
let target: u64 = 1 << i;
let prev: u64 = if i == 0 { 0 } else { 1 << (i - 1) };
println!("target={} and prev={}", target, prev);
for _ in (prev + 1)..target {
let res = timer.tick();
assert_eq!(res.len(), 0);
}
let mut res = timer.tick();
assert_eq!(res.len(), 1);
let entry = res.pop().unwrap();
assert_eq!(entry.id(), slot);
}
}
#[test]
fn increasing_schedule_overflow() {
let mut timer = QuadWheelWithOverflow::default();
let mut ids: [u64; 33] = [0; 33];
for (i, slot) in ids.iter_mut().enumerate() {
let timeout: u64 = 1 << i;
let id = i as u64;
*slot = id;
let entry = IdOnlyTimerEntry {
id,
delay: Duration::from_millis(timeout),
};
timer.insert(entry).expect("Could not insert timer entry!");
}
for (i, slot) in ids.iter().enumerate() {
let target: u64 = 1 << i;
let prev: u64 = if i == 0 { 0 } else { 1 << (i - 1) };
println!("target={} (2^{}) and prev={}", target, i, prev);
let diff = (target - prev - 1) as u32;
timer.skip(diff);
let mut res = timer.tick();
assert_eq!(res.len(), 1);
let entry = res.pop().unwrap();
assert_eq!(entry.id(), slot);
}
}
#[test]
fn increasing_skip() {
let mut timer = QuadWheelWithOverflow::default();
let mut ids: [u64; 33] = [0; 33];
let mut timeouts: [u128; 33] = [0; 33];
for i in 0..=32 {
let timeout: u64 = 1 << i;
timeouts[i] = timeout as u128;
let id = i as u64;
ids[i] = id;
let entry = IdOnlyTimerEntry {
id,
delay: Duration::from_millis(timeout),
};
timer.insert(entry).expect("Could not insert timer entry!");
println!("Added timeout at index={} with time={}", i, timeout);
}
let mut index = 0usize;
let mut millis = 0u128;
while index < 33 {
match timer.can_skip() {
Skip::Empty => panic!(
"Timer ran empty with index={} and millis={}!",
index, millis
),
Skip::Millis(skip) => {
timer.skip(skip);
millis += skip as u128;
println!("Skipped {}ms to {}", skip, millis);
}
Skip::None => (),
}
let mut res = timer.tick();
millis += 1u128;
if !res.is_empty() {
let entry = res.pop().unwrap();
assert_eq!(entry.id, ids[index]);
assert_eq!(millis, timeouts[index]);
println!("Handled timeout {} at {}ms", index, millis);
index += 1usize;
} else {
}
}
assert_eq!(timer.can_skip(), Skip::Empty);
}
}