use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::timer;
use spin::Mutex;
use std::fmt;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::thread;
use std::time::Duration;
const TIMER: Token = Token(1);
type State = Weak<TimerInner>;
struct TimerInner {
timer: Arc<Mutex<timer::Timer<State>>>,
pending: AtomicBool,
timeout: Mutex<Option<timer::Timeout>>,
callback: Box<dyn Fn() + Send + Sync>,
}
impl fmt::Debug for TimerInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Timer {{ pending = {} }} ",
self.pending.load(Ordering::Acquire)
)
}
}
pub struct Runner {
timer: Arc<Mutex<timer::Timer<State>>>,
handle: Option<thread::JoinHandle<()>>,
running: Arc<AtomicBool>,
}
impl fmt::Debug for Runner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Runner {{ running = {} }} ",
self.running.load(Ordering::Acquire)
)
}
}
#[derive(Clone)]
pub struct Timer(Arc<TimerInner>);
impl fmt::Debug for Timer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Runner {
pub fn new(tick: Duration, slots: usize, capacity: usize) -> Runner {
let builder: timer::Builder = Default::default();
let builder = timer::Builder::tick_duration(builder, tick);
let builder = timer::Builder::num_slots(builder, slots);
let builder = timer::Builder::capacity(builder, capacity);
let timer = timer::Builder::build(builder);
let poll = Poll::new().unwrap();
poll.register(&timer, TIMER, Ready::readable(), PollOpt::level())
.unwrap();
let timer = Arc::new(Mutex::new(timer));
let running = Arc::new(AtomicBool::new(true));
let handle = {
let timer = timer.clone();
let running = running.clone();
thread::spawn(move || {
let mut events = Events::with_capacity(256);
while running.load(Ordering::Acquire) {
poll.poll(&mut events, None).unwrap();
for event in &events {
match event.token() {
TIMER => {
let timer: Arc<TimerInner> = match timer
.lock()
.poll()
.and_then(|weak: Weak<TimerInner>| weak.upgrade())
{
Some(v) => v,
None => continue,
};
if timer.pending.swap(false, Ordering::SeqCst) {
(timer.callback)()
}
}
_ => unreachable!(),
}
}
}
})
};
Runner {
timer,
handle: Some(handle),
running,
}
}
pub fn timer<F>(&self, callback: F) -> Timer
where
F: 'static + Fn() + Send + Sync,
{
Timer(Arc::new(TimerInner {
callback: Box::new(callback),
pending: AtomicBool::new(false),
timer: self.timer.clone(),
timeout: Mutex::new(None),
}))
}
}
impl Drop for Runner {
fn drop(&mut self) {
self.running.store(false, Ordering::SeqCst);
self.timer
.lock()
.set_timeout(Duration::from_millis(0), Weak::new());
if let Some(handle) = mem::replace(&mut self.handle, None) {
handle.join().unwrap();
}
}
}
impl TimerInner {
fn stop(&self) {
if self.pending.swap(false, Ordering::Acquire) {
if let Some(tm) = self.timeout.lock().take() {
self.timer.lock().cancel_timeout(&tm);
}
}
}
fn fire(&self) {
self.stop();
(self.callback)()
}
}
impl Timer {
pub fn stop(&self) {
self.0.stop()
}
pub fn reset(&self, duration: Duration) {
let inner = &self.0;
inner.pending.store(true, Ordering::SeqCst);
let mut timeout = inner.timeout.lock();
let mut timer = inner.timer.lock();
let new = timer.set_timeout(duration, Arc::downgrade(&self.0));
if let Some(tm) = mem::replace(&mut *timeout, Some(new)) {
timer.cancel_timeout(&tm);
}
}
pub fn start(&self, duration: Duration) -> bool {
let inner = &self.0;
if inner.pending.load(Ordering::Acquire) {
return false;
}
let mut timeout = inner.timeout.lock();
let mut timer = inner.timer.lock();
if inner.pending.load(Ordering::Acquire) {
return false;
}
*timeout = Some(timer.set_timeout(duration, Arc::downgrade(&self.0)));
inner.pending.store(true, Ordering::SeqCst);
true
}
pub fn fire(&self) {
self.0.fire();
}
}
impl Drop for TimerInner {
fn drop(&mut self) {
self.stop()
}
}