1use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
2use std::convert::From;
3use std::ops::Deref;
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use super::error::TimeEventError;
9use super::heap::{Heap, SlotHandle};
10
11lazy_static! {
12 pub static ref GLOBAL: TimeEventLoop = TimeEventLoop::new();
13}
14
15pub enum CallBackState {
16 None,
17 Recall(Duration),
18}
19
20pub(crate) struct TimeEvent {
22 when: Instant,
23 callback: Box<dyn Fn() -> CallBackState>,
24}
25
26pub struct TimeEventLoop {
27 inner: Arc<Inner>,
28 thread: Option<thread::JoinHandle<()>>,
29}
30
31struct Inner {
32 heap: Mutex<Heap<TimeEvent>>,
33 cond: Condvar,
34}
35pub struct EventHandle {
36 slot: SlotHandle,
37}
38
39impl Deref for EventHandle {
40 type Target = SlotHandle;
41
42 fn deref(&self) -> &Self::Target {
43 &self.slot
44 }
45}
46
47impl From<SlotHandle> for EventHandle {
48 fn from(from: SlotHandle) -> Self {
49 EventHandle { slot: from }
50 }
51}
52
53impl TimeEventLoop {
54 pub fn new() -> Self {
55 let mut event_loop = TimeEventLoop {
56 inner: Arc::new(Inner {
57 heap: Mutex::new(Heap::new()),
58 cond: Condvar::new(),
59 }),
60 thread: None,
61 };
62 let clone_inner = event_loop.inner.clone();
63 event_loop
64 .thread
65 .replace(thread::spawn(move || TimeEventLoop::run(clone_inner)));
66 event_loop
67 }
68
69 pub fn register(
70 &self,
71 instant: Instant,
72 callback: Box<dyn Fn() -> CallBackState>,
73 ) -> EventHandle {
74 let handle = self.inner.heap.lock().unwrap().push(TimeEvent {
75 when: instant,
76 callback: callback,
77 });
78 self.inner.cond.notify_one();
79 handle.into()
80 }
81
82 pub fn reset(&self, handle: &EventHandle, dur: Duration) -> Result<(), TimeEventError> {
83 let ret = self
84 .inner
85 .heap
86 .lock()
87 .unwrap()
88 .update(&handle.slot, |data| data.when = Instant::now() + dur)
89 .ok_or(TimeEventError::EventCanceled);
90 self.inner.cond.notify_one();
91 ret
92 }
93
94 pub fn remove(&self, handle: &EventHandle) {
95 match self
96 .inner
97 .heap
98 .lock()
99 .unwrap()
100 .remove_with_handle(&handle.slot)
101 {
102 Some(_) => self.inner.cond.notify_one(),
103 None => {}
104 }
105 }
106
107 fn run(inner: Arc<Inner>) {
108 let mut raw_heap = inner.heap.lock().unwrap();
109 'x: loop {
110 let timeout = match raw_heap.peek_mut() {
111 Some(event) => {
112 let now = Instant::now();
113 match now.cmp(&event.when) {
114 Ordering::Less => {
115 event.when - now
116 }
117 _ => {
118 match (event.callback)() {
119 CallBackState::None => {
120 raw_heap.pop();
121 }
122 CallBackState::Recall(dur) => {
123 event.when = now + dur;
124 raw_heap.percolate_down(0);
125 }
126 }
127 continue 'x;
128 }
129 }
130 }
131 None => Duration::from_secs(0),
132 };
133 if timeout == Duration::from_secs(0) {
134 raw_heap = inner.cond.wait(raw_heap).unwrap()
135 } else {
136 raw_heap = inner.cond.wait_timeout(raw_heap, timeout).unwrap().0
137 }
138 }
139 }
140}
141
142impl PartialEq for TimeEvent {
143 fn eq(&self, other: &Self) -> bool {
144 self.when.eq(&other.when)
145 }
146}
147impl PartialOrd for TimeEvent {
148 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
149 self.when.partial_cmp(&other.when)
150 }
151}
152impl Eq for TimeEvent {}
153impl Ord for TimeEvent {
154 fn cmp(&self, other: &Self) -> Ordering {
155 self.when.cmp(&other.when)
156 }
157}
158
159unsafe impl Sync for TimeEvent {}
160unsafe impl Send for TimeEvent {}
161unsafe impl Sync for Inner {}
162unsafe impl Send for Inner {}
163
164#[cfg(test)]
165mod test {
166 use super::*;
167 use std::thread;
168 use std::time::{Duration, Instant};
169 #[test]
170 fn test_run() {
171 let three = GLOBAL.register(
172 Instant::now() + Duration::from_secs(3),
173 Box::new(|| {
174 println!("3秒后");
175 CallBackState::Recall(Duration::from_secs(1))
176 }),
177 );
178
179 let _ = GLOBAL.register(
180 Instant::now() + Duration::from_secs(5),
181 Box::new(|| {
182 println!("5秒后");
183 CallBackState::Recall(Duration::from_secs(1))
184 }),
185 );
186
187 thread::sleep(Duration::from_secs(2));
188 let _ = match GLOBAL.reset(&three, Duration::from_secs(3)) {
189 Ok(_) => println!("success changed"),
190 Err(e) => {
191 println!("{:?}", e);
192 }
193 };
194 thread::sleep(Duration::from_secs(1));
195 println!("3秒过去了");
196
197 thread::sleep(Duration::from_secs(10));
198 println!("13秒过去了");
199 }
200}