time_event/
global.rs

1use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
2use std::convert::From;
3use std::ops::Deref;
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use super::error::TimeEventError;
9use super::heap::{Heap, SlotHandle};
10
11lazy_static! {
12    pub static ref GLOBAL: TimeEventLoop = TimeEventLoop::new();
13}
14
15pub enum CallBackState {
16    None,
17    Recall(Duration),
18}
19
20/// time event
21pub(crate) struct TimeEvent {
22    when: Instant,
23    callback: Box<dyn Fn() -> CallBackState>,
24}
25
26pub struct TimeEventLoop {
27    inner: Arc<Inner>,
28    thread: Option<thread::JoinHandle<()>>,
29}
30
31struct Inner {
32    heap: Mutex<Heap<TimeEvent>>,
33    cond: Condvar,
34}
35pub struct EventHandle {
36    slot: SlotHandle,
37}
38
39impl Deref for EventHandle {
40    type Target = SlotHandle;
41
42    fn deref(&self) -> &Self::Target {
43        &self.slot
44    }
45}
46
47impl From<SlotHandle> for EventHandle {
48    fn from(from: SlotHandle) -> Self {
49        EventHandle { slot: from }
50    }
51}
52
53impl TimeEventLoop {
54    pub fn new() -> Self {
55        let mut event_loop = TimeEventLoop {
56            inner: Arc::new(Inner {
57                heap: Mutex::new(Heap::new()),
58                cond: Condvar::new(),
59            }),
60            thread: None,
61        };
62        let clone_inner = event_loop.inner.clone();
63        event_loop
64            .thread
65            .replace(thread::spawn(move || TimeEventLoop::run(clone_inner)));
66        event_loop
67    }
68
69    pub fn register(
70        &self,
71        instant: Instant,
72        callback: Box<dyn Fn() -> CallBackState>,
73    ) -> EventHandle {
74        let handle = self.inner.heap.lock().unwrap().push(TimeEvent {
75            when: instant,
76            callback: callback,
77        });
78        self.inner.cond.notify_one();
79        handle.into()
80    }
81
82    pub fn reset(&self, handle: &EventHandle, dur: Duration) -> Result<(), TimeEventError> {
83        let ret = self
84            .inner
85            .heap
86            .lock()
87            .unwrap()
88            .update(&handle.slot, |data| data.when = Instant::now() + dur)
89            .ok_or(TimeEventError::EventCanceled);
90        self.inner.cond.notify_one();
91        ret
92    }
93
94    pub fn remove(&self, handle: &EventHandle) {
95        match self
96            .inner
97            .heap
98            .lock()
99            .unwrap()
100            .remove_with_handle(&handle.slot)
101        {
102            Some(_) => self.inner.cond.notify_one(),
103            None => {}
104        }
105    }
106
107    fn run(inner: Arc<Inner>) {
108        let mut raw_heap = inner.heap.lock().unwrap();
109        'x: loop {
110            let timeout = match raw_heap.peek_mut() {
111                Some(event) => {
112                    let now = Instant::now();
113                    match now.cmp(&event.when) {
114                        Ordering::Less => {
115                            event.when - now
116                        }
117                        _ => {
118                            match (event.callback)() {
119                                CallBackState::None => {
120                                    raw_heap.pop();
121                                }
122                                CallBackState::Recall(dur) => {
123                                    event.when = now + dur;
124                                    raw_heap.percolate_down(0);
125                                }
126                            }
127                            continue 'x;
128                        }
129                    }
130                }
131                None => Duration::from_secs(0),
132            };
133            if timeout == Duration::from_secs(0) {
134                raw_heap = inner.cond.wait(raw_heap).unwrap()
135            } else {
136                raw_heap = inner.cond.wait_timeout(raw_heap, timeout).unwrap().0
137            }
138        }
139    }
140}
141
142impl PartialEq for TimeEvent {
143    fn eq(&self, other: &Self) -> bool {
144        self.when.eq(&other.when)
145    }
146}
147impl PartialOrd for TimeEvent {
148    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
149        self.when.partial_cmp(&other.when)
150    }
151}
152impl Eq for TimeEvent {}
153impl Ord for TimeEvent {
154    fn cmp(&self, other: &Self) -> Ordering {
155        self.when.cmp(&other.when)
156    }
157}
158
159unsafe impl Sync for TimeEvent {}
160unsafe impl Send for TimeEvent {}
161unsafe impl Sync for Inner {}
162unsafe impl Send for Inner {}
163
164#[cfg(test)]
165mod test {
166    use super::*;
167    use std::thread;
168    use std::time::{Duration, Instant};
169    #[test]
170    fn test_run() {
171        let three = GLOBAL.register(
172            Instant::now() + Duration::from_secs(3),
173            Box::new(|| {
174                println!("3秒后");
175                CallBackState::Recall(Duration::from_secs(1))
176            }),
177        );
178
179        let _ = GLOBAL.register(
180            Instant::now() + Duration::from_secs(5),
181            Box::new(|| {
182                println!("5秒后");
183                CallBackState::Recall(Duration::from_secs(1))
184            }),
185        );
186
187        thread::sleep(Duration::from_secs(2));
188        let _ = match GLOBAL.reset(&three, Duration::from_secs(3)) {
189            Ok(_) => println!("success changed"),
190            Err(e) => {
191                println!("{:?}", e);
192            }
193        };
194        thread::sleep(Duration::from_secs(1));
195        println!("3秒过去了");
196
197        thread::sleep(Duration::from_secs(10));
198        println!("13秒过去了");
199    }
200}