swnb_timer/
time.rs

1use std::alloc::System;
2use std::cmp::{Eq, Ord, Ordering};
3use std::collections::BinaryHeap;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::{
7    atomic::{AtomicBool, Ordering::SeqCst},
8    Arc, Condvar, Mutex,
9};
10use std::task::{Context, Poll};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12
13enum Callback {
14    Once(Option<Box<dyn FnOnce() + Send>>),
15    Mut(Box<dyn FnMut() + Send>),
16}
17
18// scheduler unit
19struct Task {
20    time: std::time::Instant,
21    is_deleted: Arc<AtomicBool>,
22    reuseable: bool,
23    callback: Callback,
24    duration: Duration,
25}
26
27impl Task {
28    fn call(&mut self) {
29        match self.callback {
30            Callback::Once(ref mut f) => {
31                if let Some(f) = f.take() {
32                    f()
33                }
34            }
35            Callback::Mut(ref mut f) => f(),
36        }
37    }
38
39    fn is_deleted(&self) -> bool {
40        self.is_deleted.load(SeqCst)
41    }
42}
43
44impl PartialOrd for Task {
45    #[inline]
46    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47        Some(other.time.cmp(&self.time))
48    }
49}
50
51impl PartialEq for Task {
52    #[inline]
53    fn eq(&self, other: &Self) -> bool {
54        self.time == other.time
55    }
56}
57
58impl Ord for Task {
59    #[inline]
60    fn cmp(&self, other: &Self) -> Ordering {
61        other.time.cmp(&self.time)
62    }
63}
64
65impl Eq for Task {}
66
67/// Timer store all timeout callback base on binaryHeap;
68/// The callback function will be triggered when the time expires
69///
70/// Example
71///
72/// ```
73/// use swnb_timer::Timer;
74/// use std::time::Duration;
75///
76/// let timer = Timer::new();
77///
78/// timer.set_timeout(||{
79///     println!("after 1 sec");
80/// },Duration::from_secs(1));
81///
82/// std::thread::sleep(Duration::from_secs(2));
83/// ```
84pub struct Timer {
85    thread_handler: std::thread::JoinHandle<()>,
86    cond: Arc<(Condvar, Mutex<BinaryHeap<Task>>)>,
87}
88
89impl Default for Timer {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl Timer {
96    /// create new Timer, this method will create one thread to handle all task base on binaryHeap;
97    ///
98    /// # Examples
99    ///
100    /// Basic usage:
101    /// ```
102    /// use swnb_timer::Timer;
103    /// use std::time::Duration;
104    ///
105    /// let timer = Timer::new();
106    ///
107    /// timer.set_timeout(||{
108    ///     println!("after 1 sec");
109    /// },Duration::from_secs(1));
110    ///
111    /// timer.set_timeout(||{
112    ///     println!("after 2 sec");
113    /// },Duration::from_secs(2));
114    ///
115    /// std::thread::sleep(Duration::from_secs(3));
116    ///
117    /// ```
118    ///
119    /// Async usage:
120    /// ```
121    /// use swnb_timer::Timer;
122    /// use std::time::Duration;
123    ///
124    /// let timer = Timer::new();
125    ///
126    /// let async_block = async {
127    ///     timer.wait(Duration::from_secs(1)).await;
128    ///     println!("after 1 sec");
129    /// };
130    /// // blocking_on(async_block);
131    /// ```
132    ///
133    pub fn new() -> Self {
134        let heap = BinaryHeap::new();
135        let cond = Arc::new((Condvar::new(), Mutex::new(heap)));
136        let thread_handler = Timer::handle_task(cond.clone());
137        Timer {
138            thread_handler,
139            cond,
140        }
141    }
142
143    fn handle_task(cond: Arc<(Condvar, Mutex<BinaryHeap<Task>>)>) -> std::thread::JoinHandle<()> {
144        let worker = move || {
145            let mut locker = cond.1.lock().unwrap();
146            loop {
147                loop {
148                    match locker.peek() {
149                        Some(&Task {
150                            time,
151                            ref is_deleted,
152                            ..
153                        }) => {
154                            if is_deleted.load(SeqCst) {
155                                locker.pop();
156                            } else {
157                                let now = std::time::Instant::now();
158                                if time <= now {
159                                    break;
160                                } else {
161                                    let (new_locker, _) = cond
162                                        .0
163                                        .wait_timeout(locker, time.duration_since(now))
164                                        .unwrap();
165                                    locker = new_locker;
166                                }
167                            }
168                        }
169                        None => {
170                            locker = cond.0.wait(locker).unwrap();
171                        }
172                    }
173                }
174
175                while let Some(task) = locker.peek() {
176                    if task.is_deleted() {
177                        locker.pop();
178                        continue;
179                    }
180                    let now = Instant::now();
181
182                    if task.time <= now {
183                        let mut task = locker.pop().unwrap();
184                        task.call();
185                        if task.reuseable {
186                            task.time = now + task.duration;
187                            locker.push(task);
188                        }
189                    } else {
190                        break;
191                    }
192                }
193            }
194        };
195
196        std::thread::Builder::new()
197            .name("swnb-timer".into())
198            .spawn(worker)
199            .unwrap()
200    }
201
202    /// set_timeout accept two arguments, callback and duration;
203    /// callback will run after duration;
204    /// if you want to cancel callback before the deadline,
205    /// set_timeout return cancel function,
206    /// run it will cancel current timeout callback;
207    ///
208    /// # Examples
209    ///
210    /// set_timeout:
211    ///
212    /// ```
213    /// use swnb_timer::Timer;
214    /// use std::time::Duration;
215    ///
216    /// let timer = Timer::new();
217    ///
218    /// timer.set_timeout(||{
219    ///     println!("after 1 sec");
220    /// },Duration::from_secs(1));
221    ///
222    /// timer.set_timeout(||{
223    ///     println!("after 2 sec");
224    /// },Duration::from_secs(2));
225    ///
226    /// std::thread::sleep(Duration::from_secs(3));
227    /// ```
228    ///
229    /// cancel_callback:
230    ///
231    /// ```
232    /// use swnb_timer::Timer;
233    /// use std::time::Duration;
234    ///
235    /// let timer = Timer::new();
236    ///
237    /// let cancel = timer.set_timeout(||{
238    ///     println!("after 2 sec");
239    /// },Duration::from_secs(2));
240    ///
241    /// timer.set_timeout(move ||{
242    ///    cancel();
243    ///    println!("cancel previous timeout callback");
244    /// },Duration::from_secs(1));
245    ///
246    /// std::thread::sleep(Duration::from_secs(3));
247    /// ```
248    pub fn set_timeout(
249        &self,
250        callback: impl FnOnce() + 'static + Send,
251        duration: std::time::Duration,
252    ) -> impl FnOnce() + Sync + 'static {
253        let now = std::time::Instant::now();
254        let is_deleted = Arc::new(AtomicBool::new(false));
255
256        let task = Task {
257            callback: Callback::Once(Some(Box::new(callback))),
258            is_deleted: is_deleted.clone(),
259            time: now + duration,
260            reuseable: false,
261            duration,
262        };
263
264        self.push_task(task);
265
266        move || is_deleted.store(true, SeqCst)
267    }
268
269    /// set_interval is basically consistent with set_timeout
270    /// callback will run every duration;
271    /// if you want to cancel interval,
272    /// set_interval return cancel function,
273    /// run it will cancel current interval callback;
274    ///
275    /// # Examples
276    ///
277    /// set_interval:
278    ///
279    /// ```
280    /// use swnb_timer::Timer;
281    /// use std::time::Duration;
282    ///
283    /// let timer = Timer::new();
284    ///
285    /// timer.set_interval(||{
286    ///     println!("every 1 sec");
287    /// },Duration::from_secs(1));
288    ///
289    /// timer.set_interval(||{
290    ///     println!("every 2 sec");
291    /// },Duration::from_secs(2));
292    ///
293    /// std::thread::sleep(Duration::from_secs(3));
294    /// ```
295    ///
296    /// cancel_interval:
297    ///
298    /// ```
299    /// use swnb_timer::Timer;
300    /// use std::time::Duration;
301    ///
302    /// let timer = Timer::new();
303    ///
304    /// let cancel = timer.set_interval(||{
305    ///     println!("every 2 sec");
306    /// },Duration::from_secs(2));
307    ///
308    /// timer.set_timeout(move ||{
309    ///    cancel();
310    ///    println!("cancel previous timeout callback");
311    /// },Duration::from_secs(1));
312    ///
313    /// std::thread::sleep(Duration::from_secs(7));
314    /// ```
315    pub fn set_interval(
316        &self,
317        callback: impl FnMut() + 'static + Send,
318        duration: std::time::Duration,
319    ) -> impl FnOnce() + Sync + 'static {
320        let now = std::time::Instant::now();
321        let is_deleted: Arc<AtomicBool> = Default::default();
322
323        let task = Task {
324            callback: Callback::Mut(Box::new(callback)),
325            is_deleted: is_deleted.clone(),
326            time: now + duration,
327            reuseable: true,
328            duration,
329        };
330
331        self.push_task(task);
332
333        move || is_deleted.store(true, SeqCst)
334    }
335
336    fn push_task(&self, task: Task) {
337        let mut locker = self.cond.1.lock().unwrap();
338        locker.push(task);
339        drop(locker);
340        self.cond.0.notify_one();
341    }
342
343    /// wait for `duration` time
344    ///
345    /// Examples
346    ///
347    /// ```
348    /// use swnb_timer::Timer;
349    /// use std::time::Duration;
350    ///
351    /// let timer = Timer::new();
352    ///
353    /// let async_block = async {
354    ///     timer.wait(Duration::from_secs(1)).await;
355    /// };
356    ///
357    /// // blocking_on(async_block);
358    /// ```
359    ///
360    pub async fn wait(&self, duration: std::time::Duration) {
361        let future_timer = FutureTimer::new(duration, self);
362        future_timer.await
363    }
364}
365
366// FutureTimer impl Future
367// for Timer to do async wait
368struct FutureTimer<'a> {
369    duration: std::time::Duration,
370    is_set_timeout: AtomicBool,
371    is_resolved: Arc<AtomicBool>,
372    timer: &'a Timer,
373}
374
375impl<'a> Future for FutureTimer<'a> {
376    type Output = ();
377    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378        let result = self
379            .is_set_timeout
380            .compare_exchange(false, true, SeqCst, SeqCst);
381
382        if result.is_ok() {
383            let is_resolved = self.is_resolved.clone();
384            let waker = cx.waker().clone();
385            let _ = self.timer.set_timeout(
386                move || {
387                    is_resolved.store(true, SeqCst);
388                    waker.wake();
389                },
390                self.duration,
391            );
392            Poll::Pending
393        } else if self.is_resolved.load(SeqCst) {
394            Poll::Ready(())
395        } else {
396            Poll::Pending
397        }
398    }
399}
400
401impl<'a> FutureTimer<'a> {
402    fn new<'b: 'a>(duration: std::time::Duration, timer: &'b Timer) -> Self {
403        FutureTimer {
404            duration,
405            is_set_timeout: AtomicBool::new(false),
406            is_resolved: Arc::new(AtomicBool::new(false)),
407            timer,
408        }
409    }
410}
411
412mod test {
413    use std::{default, mem::size_of, time::Instant};
414
415    #[test]
416    fn test_size() {
417        use super::Task;
418        dbg!(size_of::<Task>());
419        dbg!(size_of::<Instant>());
420        let v: bool = Default::default();
421        dbg!(v);
422    }
423}