use hierarchical_hash_wheel_timer::*;
use std::hash::Hash;
use std::time::Duration;
use channel::select;
use crossbeam_channel as channel;
use hierarchical_hash_wheel_timer::wheels::{cancellable::*, *};
use std::{cmp::Ordering, fmt, io, rc::Rc, thread, time::Instant};
#[derive(Debug)]
enum TimerMsg<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
Schedule(TimerEntry<I, O, P>),
Cancel(I),
Stop,
Tick,
}
pub struct TimerRef<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
work_queue: channel::Sender<TimerMsg<I, O, P>>,
}
pub trait TimerTicking {
fn tick(&mut self);
}
impl<I, O, P> TimerTicking for TimerRef<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
fn tick(&mut self) {
self.work_queue
.send(TimerMsg::Tick)
.unwrap_or_else(|e| eprintln!("Could not send Tick msg: {e:?}"));
}
}
impl<I, O, P> Timer for TimerRef<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
type Id = I;
type OneshotState = O;
type PeriodicState = P;
fn schedule_once(&mut self, timeout: Duration, state: Self::OneshotState) {
let e = TimerEntry::OneShot { timeout, state };
self.work_queue
.send(TimerMsg::Schedule(e))
.unwrap_or_else(|e| eprintln!("Could not send Schedule msg: {e:?}"));
}
fn schedule_periodic(&mut self, delay: Duration, period: Duration, state: Self::PeriodicState) {
let e = TimerEntry::Periodic {
delay,
period,
state,
};
self.work_queue
.send(TimerMsg::Schedule(e))
.unwrap_or_else(|e| eprintln!("Could not send Schedule msg: {e:?}"));
}
fn cancel(&mut self, id: &Self::Id) {
self.work_queue
.send(TimerMsg::Cancel(id.clone()))
.unwrap_or_else(|e| eprintln!("Could not send Cancel msg: {e:?}"));
}
}
impl<I, O, P> Clone for TimerRef<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
fn clone(&self) -> Self {
Self {
work_queue: self.work_queue.clone(),
}
}
}
pub struct TimerWithThread<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
timer_thread: thread::JoinHandle<()>,
work_queue: channel::Sender<TimerMsg<I, O, P>>,
}
impl<I, O, P> TimerWithThread<I, O, P>
where
I: Hash + Clone + Eq + fmt::Debug + Send + 'static,
O: OneshotState<Id = I> + fmt::Debug + Send + 'static,
P: PeriodicState<Id = I> + fmt::Debug + Send + 'static,
{
pub fn new() -> io::Result<TimerWithThread<I, O, P>> {
let (s, r) = channel::unbounded();
let handle = thread::Builder::new()
.name("timer-thread".to_string())
.spawn(move || {
let timer = TimerThread::new(r);
timer.run();
})?;
let twt = TimerWithThread {
timer_thread: handle,
work_queue: s,
};
Ok(twt)
}
fn new_sans_autotick() -> io::Result<TimerWithThread<I, O, P>> {
let (s, r) = channel::unbounded();
let handle = thread::Builder::new()
.name("timer-thread".to_string())
.spawn(move || {
let timer = TimerThread::new_sans_autotick(r);
timer.run();
})?;
let twt = TimerWithThread {
timer_thread: handle,
work_queue: s,
};
Ok(twt)
}
pub fn timer_ref(&self) -> TimerRef<I, O, P> {
TimerRef {
work_queue: self.work_queue.clone(),
}
}
pub fn shutdown(self) -> Result<(), ThreadTimerError<I, O, P>> {
self.work_queue
.send(TimerMsg::Stop)
.unwrap_or_else(|e| eprintln!("Could not send Stop msg: {e:?}"));
match self.timer_thread.join() {
Ok(_) => Ok(()),
Err(_) => {
eprintln!("Timer thread panicked!");
Err(ThreadTimerError::CouldNotJoinThread)
}
}
}
pub fn shutdown_async(&self) -> Result<(), ThreadTimerError<I, O, P>> {
self.work_queue
.send(TimerMsg::Stop)
.unwrap_or_else(|e| eprintln!("Could not send Stop msg: {e:?}"));
Ok(())
}
}
impl<I, O, P> fmt::Debug for TimerWithThread<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "<TimerWithThread>")
}
}
impl
TimerWithThread<uuid::Uuid, OneShotClosureState<uuid::Uuid>, PeriodicClosureState<uuid::Uuid>>
{
pub fn for_uuid_closures() -> Self {
Self::new().expect("timer")
}
pub fn for_uuid_closures_sans_autotick() -> Self {
Self::new_sans_autotick().expect("timer")
}
}
#[derive(Debug)]
pub enum ThreadTimerError<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
CouldNotSendStopAsync,
CouldNotSendStop(TimerWithThread<I, O, P>),
CouldNotJoinThread,
}
#[derive(Debug)]
enum ThreadTimerEntry<I, O, P>
where
I: Hash + Clone + Eq,
O: OneshotState<Id = I>,
P: PeriodicState<Id = I>,
{
OneShot { state: O },
Periodic { period: Duration, state: P },
}
impl<I, O, P> ThreadTimerEntry<I, O, P>
where
I: Hash + Clone + Eq + fmt::Debug,
O: OneshotState<Id = I> + fmt::Debug,
P: PeriodicState<Id = I> + fmt::Debug,
{
fn from(e: TimerEntry<I, O, P>) -> (Self, Duration) {
match e {
TimerEntry::OneShot { timeout, state } => {
let tte = ThreadTimerEntry::OneShot { state };
(tte, timeout)
}
TimerEntry::Periodic {
delay,
period,
state,
} => {
let tte = ThreadTimerEntry::Periodic { period, state };
(tte, delay)
}
}
}
fn execute(self) -> Option<(Self, Duration)> {
match self {
ThreadTimerEntry::OneShot { state } => {
state.trigger();
None
}
ThreadTimerEntry::Periodic { period, state } => match state.trigger() {
TimerReturn::Reschedule(new_state) => {
let new_entry = ThreadTimerEntry::Periodic {
period,
state: new_state,
};
Some((new_entry, period))
}
TimerReturn::Cancel => None,
},
}
}
fn execute_unique_ref(unique_ref: Rc<Self>) -> Option<(Rc<Self>, Duration)> {
let unique = Rc::try_unwrap(unique_ref).expect("shouldn't hold on to these refs anywhere");
unique.execute().map(|t| {
let (new_unique, delay) = t;
(Rc::new(new_unique), delay)
})
}
}
impl<I, O, P> CancellableTimerEntry for ThreadTimerEntry<I, O, P>
where
I: Hash + Clone + Eq + fmt::Debug,
O: OneshotState<Id = I> + fmt::Debug,
P: PeriodicState<Id = I> + fmt::Debug,
{
type Id = I;
fn id(&self) -> &Self::Id {
match self {
ThreadTimerEntry::OneShot { state, .. } => state.id(),
ThreadTimerEntry::Periodic { state, .. } => state.id(),
}
}
}
struct TimerThread<I, O, P>
where
I: Hash + Clone + Eq + fmt::Debug,
O: OneshotState<Id = I> + fmt::Debug,
P: PeriodicState<Id = I> + fmt::Debug,
{
timer: QuadWheelWithOverflow<ThreadTimerEntry<I, O, P>>,
work_queue: channel::Receiver<TimerMsg<I, O, P>>,
running: bool,
autoticking: bool,
start: Instant,
last_check: u128,
}
impl<I, O, P> TimerThread<I, O, P>
where
I: Hash + Clone + Eq + fmt::Debug,
O: OneshotState<Id = I> + fmt::Debug,
P: PeriodicState<Id = I> + fmt::Debug,
{
fn new(work_queue: channel::Receiver<TimerMsg<I, O, P>>) -> TimerThread<I, O, P> {
TimerThread {
timer: QuadWheelWithOverflow::new(),
work_queue,
running: true,
autoticking: true,
start: Instant::now(),
last_check: 0u128,
}
}
fn new_sans_autotick(work_queue: channel::Receiver<TimerMsg<I, O, P>>) -> TimerThread<I, O, P> {
TimerThread {
timer: QuadWheelWithOverflow::new(),
work_queue,
running: true,
autoticking: false,
start: Instant::now(),
last_check: 0u128,
}
}
fn run(mut self) {
while self.running {
if self.autoticking {
let elap = self.elapsed();
if elap > 0 {
for _ in 0..elap {
self.tick();
}
}
}
match self.work_queue.try_recv() {
Ok(msg) => self.handle_msg(msg),
Err(channel::TryRecvError::Empty) => {
match self.timer.can_skip() {
Skip::None => {
thread::yield_now(); }
Skip::Empty => {
match self.work_queue.recv() {
Ok(msg) => {
self.reset(); self.handle_msg(msg)
}
Err(channel::RecvError) => {
panic!("Timer work_queue unexpectedly shut down!")
}
}
}
Skip::Millis(can_skip) if can_skip > 5 => {
let waiting_time = can_skip - 5; let timeout = Duration::from_millis(waiting_time as u64);
let res = select! {
recv(self.work_queue) -> msg => msg.ok(),
default(timeout) => None,
};
let elap = self.elapsed();
self.skip_and_tick(can_skip, elap);
if let Some(msg) = res {
self.handle_msg(msg)
}
}
Skip::Millis(can_skip) => {
thread::yield_now();
let elap = self.elapsed();
self.skip_and_tick(can_skip, elap);
}
}
}
Err(channel::TryRecvError::Disconnected) => {
panic!("Timer work_queue unexpectedly shut down!")
}
}
}
}
#[inline(always)]
fn skip_and_tick(&mut self, can_skip: u32, elapsed: u128) {
let can_skip_u128 = can_skip as u128;
if self.autoticking {
match elapsed.cmp(&can_skip_u128) {
Ordering::Greater => {
self.timer.skip(can_skip);
let ticks = elapsed - can_skip_u128;
for _ in 0..ticks {
self.tick();
}
}
Ordering::Less => {
self.timer.skip(elapsed as u32);
}
Ordering::Equal => {
self.timer.skip(can_skip);
}
}
}
}
#[inline(always)]
fn elapsed(&mut self) -> u128 {
let elap = self.start.elapsed().as_millis();
let rel_elap = elap - self.last_check;
self.last_check = elap;
rel_elap
}
#[inline(always)]
fn reset(&mut self) {
self.start = Instant::now();
self.last_check = 0;
}
#[inline(always)]
fn handle_msg(&mut self, msg: TimerMsg<I, O, P>) {
match msg {
TimerMsg::Stop => self.running = false,
TimerMsg::Tick => self.tick(),
TimerMsg::Schedule(entry) => {
let (e, delay) = ThreadTimerEntry::from(entry);
match self.timer.insert_ref_with_delay(Rc::new(e), delay) {
Ok(_) => (), Err(TimerError::Expired(e)) => {
self.trigger_entry(e);
}
Err(f) => panic!("Could not insert timer entry! {f:?}"),
}
}
TimerMsg::Cancel(ref id) => match self.timer.cancel(id) {
Ok(_) => (), Err(TimerError::NotFound) => (), Err(f) => panic!("Unexpected error cancelling timer! {f:?}"),
},
}
}
fn trigger_entry(&mut self, e: Rc<ThreadTimerEntry<I, O, P>>) {
if let Some((new_e, delay)) = ThreadTimerEntry::execute_unique_ref(e) {
match self.timer.insert_ref_with_delay(new_e, delay) {
Ok(_) => (), Err(TimerError::Expired(e)) => {
panic!("Trying to insert periodic timer entry with 0ms period! {e:?}")
}
Err(f) => panic!("Could not insert timer entry! {f:?}"),
}
} }
#[inline(always)]
fn tick(&mut self) {
let res = self.timer.tick();
for e in res {
self.trigger_entry(e);
}
}
}