tokio_interval/
timer.rs

1use once_cell::sync::Lazy;
2use std::collections::HashMap;
3use std::future::Future;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Mutex;
6use tokio::task::JoinHandle;
7use tokio::time::{interval_at, sleep, Duration, Instant};
8#[allow(dead_code)]
9static TIMER_ID: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
10#[allow(dead_code)]
11static TIMERS: Lazy<Mutex<HashMap<u64, JoinHandle<()>>>> = Lazy::new(|| Mutex::new(HashMap::new()));
12/// 清除指定的定时器,推荐使用宏 clear_timer!
13#[doc(hidden)]
14pub fn _clear_timer(id: u64) {
15    let mut timer_map = TIMERS.lock().unwrap();
16    // 如果定时器不存在,则提前释放锁
17    if !timer_map.contains_key(&id) {
18        return;
19    }
20    let handler = timer_map.get(&id).unwrap();
21    // 如果未完成,则停止
22    if !handler.is_finished() {
23        handler.abort();
24    }
25    // 从map中删除对应的数据
26    timer_map.remove(&id);
27}
28
29/// 清除所有通过tokio_interval的定时器,包含set_interval!、set_timeout!、set_interval_async!、set_timeout_async!
30#[doc(hidden)]
31pub fn _clear_all_timer() {
32    let mut timer_map = TIMERS.lock().unwrap();
33    if timer_map.len() == 0 {
34        // 提前释放锁
35        return;
36    }
37    for (_, h) in timer_map.drain() {
38        // 如果未完成,则停止
39        if !h.is_finished() {
40            h.abort();
41        }
42    }
43}
44
45/// 开启定时器,指定ms后,执行传入的回调函数,推荐使用宏 set_interval!
46#[doc(hidden)]
47pub fn _set_interval<F: Fn() + Send + 'static>(f: F, ms: u64) -> u64 {
48    let start = Instant::now() + Duration::from_millis(ms);
49    let period = Duration::from_millis(ms);
50    let handler: JoinHandle<()> = tokio::spawn(async move {
51        let mut int = interval_at(start, period);
52        loop {
53            int.tick().await;
54            f();
55        }
56    });
57    // 保存timer数据
58    let id = TIMER_ID.fetch_add(1, Ordering::SeqCst);
59    TIMERS.lock().unwrap().insert(id, handler);
60    id
61}
62/// 定时器的异步版本,指定ms后,执行传入的Future,,推荐使用宏 set_interval_async!
63#[doc(hidden)]
64pub fn _set_interval_async<
65    F: (Fn() -> Fut) + Sync + Send + 'static,
66    Fut: Future + Sync + Send + 'static,
67>(
68    f: F,
69    ms: u64,
70) -> u64 {
71    let start = Instant::now() + Duration::from_millis(ms);
72    let period = Duration::from_millis(ms);
73    let handler: JoinHandle<()> = tokio::spawn(async move {
74        let mut int = interval_at(start, period);
75        loop {
76            int.tick().await;
77            f().await;
78        }
79    });
80    // 保存timer数据
81    let id = TIMER_ID.fetch_add(1, Ordering::SeqCst);
82    TIMERS.lock().unwrap().insert(id, handler);
83    id
84}
85/// 延时器, 推荐使用宏 set_timeout!
86#[doc(hidden)]
87pub fn _set_timeout<F: Fn() + Send + 'static>(f: F, ms: u64) -> u64 {
88    let delay = Duration::from_millis(ms);
89    let id = TIMER_ID.fetch_add(1, Ordering::SeqCst);
90    let handler = tokio::spawn(async move {
91        sleep(delay).await;
92        f();
93        // 执行完毕后自动清除定时器
94        _clear_timer(id);
95    });
96    // 保存timer数据
97    TIMERS.lock().unwrap().insert(id, handler);
98    id
99}
100/// 延时器的异步版本, 推荐使用宏 set_timeout_async!
101#[doc(hidden)]
102pub fn _set_timeout_async<
103    F: (Fn() -> Fut) + Send + Sync + 'static,
104    Fut: Future + Send + Sync + 'static,
105>(
106    f: F,
107    ms: u64,
108) -> u64 {
109    let delay = Duration::from_millis(ms);
110    let id = TIMER_ID.fetch_add(1, Ordering::SeqCst);
111    let handler = tokio::spawn(async move {
112        sleep(delay).await;
113        f().await;
114        // 执行完毕后,自动清除定时器
115        _clear_timer(id);
116    });
117    // 保存timer数据
118    TIMERS.lock().unwrap().insert(id, handler);
119    id
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use std::sync::atomic::{AtomicU64, Ordering};
126    use std::sync::Arc;
127    async fn test_set_interval() {
128        let times = 3;
129        let counter = Arc::new(AtomicU64::new(0));
130        {
131            let counter = counter.clone();
132            _set_interval(
133                move || {
134                    counter.clone().fetch_add(1, Ordering::SeqCst);
135                },
136                1 * 100,
137            );
138        }
139        {
140            let counter = counter.clone();
141            _set_interval(
142                move || {
143                    counter.clone().fetch_add(1, Ordering::SeqCst);
144                },
145                1 * 100,
146            );
147        }
148        assert_eq!(TIMERS.lock().unwrap().len(), 2);
149        tokio::time::sleep(Duration::from_millis(times * 110)).await;
150        assert_eq!(counter.load(Ordering::SeqCst), times * 2);
151        counter.store(0, Ordering::SeqCst);
152        _clear_all_timer();
153    }
154    async fn test_set_interval_async() {
155        let times = 3;
156        let counter = Arc::new(AtomicU64::new(0));
157        {
158            let counter = counter.clone();
159            let closure_async = move || {
160                let counter_inner = counter.clone();
161                async move {
162                    counter_inner.fetch_add(1, Ordering::SeqCst);
163                }
164            };
165            _set_interval_async(closure_async, 1 * 100);
166        }
167        {
168            let counter = counter.clone();
169            let closure_async = move || {
170                let counter_inner = counter.clone();
171                async move {
172                    counter_inner.fetch_add(1, Ordering::SeqCst);
173                }
174            };
175            _set_interval_async(closure_async, 1 * 100);
176        }
177        assert_eq!(TIMERS.lock().unwrap().len(), 2);
178        tokio::time::sleep(Duration::from_millis(times * 110)).await;
179        assert_eq!(counter.load(Ordering::SeqCst), times * 2);
180        counter.store(0, Ordering::SeqCst);
181        _clear_all_timer();
182    }
183    async fn test_set_timeout() {
184        let counter = Arc::new(AtomicU64::new(0));
185        {
186            let counter = counter.clone();
187            _set_timeout(
188                move || {
189                    counter.fetch_add(1, Ordering::SeqCst);
190                },
191                100,
192            );
193        }
194        {
195            let counter = counter.clone();
196            _set_timeout(
197                move || {
198                    counter.fetch_add(1, Ordering::SeqCst);
199                },
200                100,
201            );
202        }
203        assert_eq!(TIMERS.lock().unwrap().len(), 2);
204        tokio::time::sleep(Duration::from_millis(110)).await;
205        assert_eq!(TIMERS.lock().unwrap().len(), 0);
206        assert_eq!(counter.load(Ordering::SeqCst), 2);
207        counter.store(0, Ordering::SeqCst);
208        _clear_all_timer();
209    }
210    async fn test_set_timeout_async() {
211        let counter = Arc::new(AtomicU64::new(0));
212        {
213            let counter = counter.clone();
214            let closure_async = move || {
215                let counter_inner = counter.clone();
216                async move {
217                    counter_inner.fetch_add(1, Ordering::SeqCst);
218                }
219            };
220            _set_timeout_async(closure_async, 100);
221        }
222        {
223            let counter = counter.clone();
224            let closure_async = move || {
225                let counter_inner = counter.clone();
226                async move {
227                    counter_inner.fetch_add(1, Ordering::SeqCst);
228                }
229            };
230            _set_timeout_async(closure_async, 100);
231        }
232        assert_eq!(TIMERS.lock().unwrap().len(), 2);
233        tokio::time::sleep(Duration::from_millis(200)).await;
234        assert_eq!(TIMERS.lock().unwrap().len(), 0);
235        assert_eq!(counter.load(Ordering::SeqCst), 2);
236        counter.store(0, Ordering::SeqCst);
237        _clear_all_timer();
238    }
239    async fn test_clear_timer() {
240        let times = 3;
241        let counter = Arc::new(AtomicU64::new(0));
242        {
243            let counter = counter.clone();
244            _set_interval(
245                move || {
246                    counter.clone().fetch_add(1, Ordering::SeqCst);
247                },
248                1 * 100,
249            );
250        }
251        {
252            let counter = counter.clone();
253            let closure_async = move || {
254                let counter_inner = counter.clone();
255                async move {
256                    counter_inner.fetch_add(1, Ordering::SeqCst);
257                }
258            };
259            let id = _set_interval_async(closure_async, 1 * 100);
260            _clear_timer(id);
261        }
262        {
263            let counter = counter.clone();
264            let id = _set_timeout(
265                move || {
266                    counter.fetch_add(1, Ordering::SeqCst);
267                },
268                100,
269            );
270            _clear_timer(id);
271        }
272        {
273            let counter = counter.clone();
274            let closure_async = move || {
275                let counter_inner = counter.clone();
276                async move {
277                    counter_inner.fetch_add(1, Ordering::SeqCst);
278                }
279            };
280            _set_timeout_async(closure_async, 100);
281        }
282        assert_eq!(TIMERS.lock().unwrap().len(), 2);
283        tokio::time::sleep(Duration::from_millis(times * 110)).await;
284        assert_eq!(TIMERS.lock().unwrap().len(), 1);
285        assert_eq!(counter.load(Ordering::SeqCst), times * 1 + 1);
286        counter.store(0, Ordering::SeqCst);
287        _clear_all_timer();
288    }
289    async fn test_clear_all_timer() {
290        _set_interval(|| println!("hello1"), 100);
291        _set_interval(|| println!("hello2"), 100);
292        _set_interval(|| println!("hello3"), 100);
293        _set_timeout(|| println!("hello4"), 100);
294        _set_timeout(|| println!("hello5"), 250);
295        assert_eq!(TIMERS.lock().unwrap().len(), 5);
296        tokio::time::sleep(Duration::from_millis(200)).await;
297        assert_eq!(TIMERS.lock().unwrap().len(), 4);
298        _clear_all_timer();
299        assert_eq!(TIMERS.lock().unwrap().len(), 0);
300        tokio::time::sleep(Duration::from_millis(200)).await;
301    }
302    // 统一在这里测试,避免同时进行,导致counter数值无法确定
303    #[tokio::test]
304    async fn test_timer() {
305        test_set_interval().await;
306        test_set_interval_async().await;
307        test_set_timeout().await;
308        test_set_timeout_async().await;
309        test_clear_timer().await;
310        test_clear_all_timer().await;
311    }
312}