use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
use std::convert::From;
use std::ops::Deref;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::error::TimeEventError;
use super::heap::{Heap, SlotHandle};
lazy_static! {
pub static ref GLOBAL: TimeEventLoop = TimeEventLoop::new();
}
pub enum CallBackState {
None,
Recall(Duration),
}
pub(crate) struct TimeEvent {
when: Instant,
callback: Box<dyn Fn() -> CallBackState>,
}
pub struct TimeEventLoop {
inner: Arc<Inner>,
thread: Option<thread::JoinHandle<()>>,
}
struct Inner {
heap: Mutex<Heap<TimeEvent>>,
cond: Condvar,
}
pub struct EventHandle {
slot: SlotHandle,
}
impl Deref for EventHandle {
type Target = SlotHandle;
fn deref(&self) -> &Self::Target {
&self.slot
}
}
impl From<SlotHandle> for EventHandle {
fn from(from: SlotHandle) -> Self {
EventHandle { slot: from }
}
}
impl TimeEventLoop {
pub fn new() -> Self {
let mut event_loop = TimeEventLoop {
inner: Arc::new(Inner {
heap: Mutex::new(Heap::new()),
cond: Condvar::new(),
}),
thread: None,
};
let clone_inner = event_loop.inner.clone();
event_loop
.thread
.replace(thread::spawn(move || TimeEventLoop::run(clone_inner)));
event_loop
}
pub fn register(
&self,
instant: Instant,
callback: Box<dyn Fn() -> CallBackState>,
) -> EventHandle {
let handle = self.inner.heap.lock().unwrap().push(TimeEvent {
when: instant,
callback: callback,
});
self.inner.cond.notify_one();
handle.into()
}
pub fn reset(&self, handle: &EventHandle, dur: Duration) -> Result<(), TimeEventError> {
let ret = self
.inner
.heap
.lock()
.unwrap()
.update(&handle.slot, |data| data.when = Instant::now() + dur)
.ok_or(TimeEventError::EventCanceled);
self.inner.cond.notify_one();
ret
}
pub fn remove(&self, handle: &EventHandle) {
match self
.inner
.heap
.lock()
.unwrap()
.remove_with_handle(&handle.slot)
{
Some(_) => self.inner.cond.notify_one(),
None => {}
}
}
fn run(inner: Arc<Inner>) {
let mut raw_heap = inner.heap.lock().unwrap();
'x: loop {
let timeout = match raw_heap.peek_mut() {
Some(event) => {
let now = Instant::now();
match now.cmp(&event.when) {
Ordering::Less => {
event.when - now
}
_ => {
match (event.callback)() {
CallBackState::None => {
raw_heap.pop();
}
CallBackState::Recall(dur) => {
event.when = now + dur;
raw_heap.percolate_down(0);
}
}
continue 'x;
}
}
}
None => Duration::from_secs(0),
};
if timeout == Duration::from_secs(0) {
raw_heap = inner.cond.wait(raw_heap).unwrap()
} else {
raw_heap = inner.cond.wait_timeout(raw_heap, timeout).unwrap().0
}
}
}
}
impl PartialEq for TimeEvent {
fn eq(&self, other: &Self) -> bool {
self.when.eq(&other.when)
}
}
impl PartialOrd for TimeEvent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.when.partial_cmp(&other.when)
}
}
impl Eq for TimeEvent {}
impl Ord for TimeEvent {
fn cmp(&self, other: &Self) -> Ordering {
self.when.cmp(&other.when)
}
}
unsafe impl Sync for TimeEvent {}
unsafe impl Send for TimeEvent {}
unsafe impl Sync for Inner {}
unsafe impl Send for Inner {}
#[cfg(test)]
mod test {
use super::*;
use std::thread;
use std::time::{Duration, Instant};
#[test]
fn test_run() {
let three = GLOBAL.register(
Instant::now() + Duration::from_secs(3),
Box::new(|| {
println!("3秒后");
CallBackState::Recall(Duration::from_secs(1))
}),
);
let _ = GLOBAL.register(
Instant::now() + Duration::from_secs(5),
Box::new(|| {
println!("5秒后");
CallBackState::Recall(Duration::from_secs(1))
}),
);
thread::sleep(Duration::from_secs(2));
let _ = match GLOBAL.reset(&three, Duration::from_secs(3)) {
Ok(_) => println!("success changed"),
Err(e) => {
println!("{:?}", e);
}
};
thread::sleep(Duration::from_secs(1));
println!("3秒过去了");
thread::sleep(Duration::from_secs(10));
println!("13秒过去了");
}
}