swnb_timer/time.rs
1use std::alloc::System;
2use std::cmp::{Eq, Ord, Ordering};
3use std::collections::BinaryHeap;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::{
7 atomic::{AtomicBool, Ordering::SeqCst},
8 Arc, Condvar, Mutex,
9};
10use std::task::{Context, Poll};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12
13enum Callback {
14 Once(Option<Box<dyn FnOnce() + Send>>),
15 Mut(Box<dyn FnMut() + Send>),
16}
17
18// scheduler unit
19struct Task {
20 time: std::time::Instant,
21 is_deleted: Arc<AtomicBool>,
22 reuseable: bool,
23 callback: Callback,
24 duration: Duration,
25}
26
27impl Task {
28 fn call(&mut self) {
29 match self.callback {
30 Callback::Once(ref mut f) => {
31 if let Some(f) = f.take() {
32 f()
33 }
34 }
35 Callback::Mut(ref mut f) => f(),
36 }
37 }
38
39 fn is_deleted(&self) -> bool {
40 self.is_deleted.load(SeqCst)
41 }
42}
43
44impl PartialOrd for Task {
45 #[inline]
46 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
47 Some(other.time.cmp(&self.time))
48 }
49}
50
51impl PartialEq for Task {
52 #[inline]
53 fn eq(&self, other: &Self) -> bool {
54 self.time == other.time
55 }
56}
57
58impl Ord for Task {
59 #[inline]
60 fn cmp(&self, other: &Self) -> Ordering {
61 other.time.cmp(&self.time)
62 }
63}
64
65impl Eq for Task {}
66
67/// Timer store all timeout callback base on binaryHeap;
68/// The callback function will be triggered when the time expires
69///
70/// Example
71///
72/// ```
73/// use swnb_timer::Timer;
74/// use std::time::Duration;
75///
76/// let timer = Timer::new();
77///
78/// timer.set_timeout(||{
79/// println!("after 1 sec");
80/// },Duration::from_secs(1));
81///
82/// std::thread::sleep(Duration::from_secs(2));
83/// ```
84pub struct Timer {
85 thread_handler: std::thread::JoinHandle<()>,
86 cond: Arc<(Condvar, Mutex<BinaryHeap<Task>>)>,
87}
88
89impl Default for Timer {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl Timer {
96 /// create new Timer, this method will create one thread to handle all task base on binaryHeap;
97 ///
98 /// # Examples
99 ///
100 /// Basic usage:
101 /// ```
102 /// use swnb_timer::Timer;
103 /// use std::time::Duration;
104 ///
105 /// let timer = Timer::new();
106 ///
107 /// timer.set_timeout(||{
108 /// println!("after 1 sec");
109 /// },Duration::from_secs(1));
110 ///
111 /// timer.set_timeout(||{
112 /// println!("after 2 sec");
113 /// },Duration::from_secs(2));
114 ///
115 /// std::thread::sleep(Duration::from_secs(3));
116 ///
117 /// ```
118 ///
119 /// Async usage:
120 /// ```
121 /// use swnb_timer::Timer;
122 /// use std::time::Duration;
123 ///
124 /// let timer = Timer::new();
125 ///
126 /// let async_block = async {
127 /// timer.wait(Duration::from_secs(1)).await;
128 /// println!("after 1 sec");
129 /// };
130 /// // blocking_on(async_block);
131 /// ```
132 ///
133 pub fn new() -> Self {
134 let heap = BinaryHeap::new();
135 let cond = Arc::new((Condvar::new(), Mutex::new(heap)));
136 let thread_handler = Timer::handle_task(cond.clone());
137 Timer {
138 thread_handler,
139 cond,
140 }
141 }
142
143 fn handle_task(cond: Arc<(Condvar, Mutex<BinaryHeap<Task>>)>) -> std::thread::JoinHandle<()> {
144 let worker = move || {
145 let mut locker = cond.1.lock().unwrap();
146 loop {
147 loop {
148 match locker.peek() {
149 Some(&Task {
150 time,
151 ref is_deleted,
152 ..
153 }) => {
154 if is_deleted.load(SeqCst) {
155 locker.pop();
156 } else {
157 let now = std::time::Instant::now();
158 if time <= now {
159 break;
160 } else {
161 let (new_locker, _) = cond
162 .0
163 .wait_timeout(locker, time.duration_since(now))
164 .unwrap();
165 locker = new_locker;
166 }
167 }
168 }
169 None => {
170 locker = cond.0.wait(locker).unwrap();
171 }
172 }
173 }
174
175 while let Some(task) = locker.peek() {
176 if task.is_deleted() {
177 locker.pop();
178 continue;
179 }
180 let now = Instant::now();
181
182 if task.time <= now {
183 let mut task = locker.pop().unwrap();
184 task.call();
185 if task.reuseable {
186 task.time = now + task.duration;
187 locker.push(task);
188 }
189 } else {
190 break;
191 }
192 }
193 }
194 };
195
196 std::thread::Builder::new()
197 .name("swnb-timer".into())
198 .spawn(worker)
199 .unwrap()
200 }
201
202 /// set_timeout accept two arguments, callback and duration;
203 /// callback will run after duration;
204 /// if you want to cancel callback before the deadline,
205 /// set_timeout return cancel function,
206 /// run it will cancel current timeout callback;
207 ///
208 /// # Examples
209 ///
210 /// set_timeout:
211 ///
212 /// ```
213 /// use swnb_timer::Timer;
214 /// use std::time::Duration;
215 ///
216 /// let timer = Timer::new();
217 ///
218 /// timer.set_timeout(||{
219 /// println!("after 1 sec");
220 /// },Duration::from_secs(1));
221 ///
222 /// timer.set_timeout(||{
223 /// println!("after 2 sec");
224 /// },Duration::from_secs(2));
225 ///
226 /// std::thread::sleep(Duration::from_secs(3));
227 /// ```
228 ///
229 /// cancel_callback:
230 ///
231 /// ```
232 /// use swnb_timer::Timer;
233 /// use std::time::Duration;
234 ///
235 /// let timer = Timer::new();
236 ///
237 /// let cancel = timer.set_timeout(||{
238 /// println!("after 2 sec");
239 /// },Duration::from_secs(2));
240 ///
241 /// timer.set_timeout(move ||{
242 /// cancel();
243 /// println!("cancel previous timeout callback");
244 /// },Duration::from_secs(1));
245 ///
246 /// std::thread::sleep(Duration::from_secs(3));
247 /// ```
248 pub fn set_timeout(
249 &self,
250 callback: impl FnOnce() + 'static + Send,
251 duration: std::time::Duration,
252 ) -> impl FnOnce() + Sync + 'static {
253 let now = std::time::Instant::now();
254 let is_deleted = Arc::new(AtomicBool::new(false));
255
256 let task = Task {
257 callback: Callback::Once(Some(Box::new(callback))),
258 is_deleted: is_deleted.clone(),
259 time: now + duration,
260 reuseable: false,
261 duration,
262 };
263
264 self.push_task(task);
265
266 move || is_deleted.store(true, SeqCst)
267 }
268
269 /// set_interval is basically consistent with set_timeout
270 /// callback will run every duration;
271 /// if you want to cancel interval,
272 /// set_interval return cancel function,
273 /// run it will cancel current interval callback;
274 ///
275 /// # Examples
276 ///
277 /// set_interval:
278 ///
279 /// ```
280 /// use swnb_timer::Timer;
281 /// use std::time::Duration;
282 ///
283 /// let timer = Timer::new();
284 ///
285 /// timer.set_interval(||{
286 /// println!("every 1 sec");
287 /// },Duration::from_secs(1));
288 ///
289 /// timer.set_interval(||{
290 /// println!("every 2 sec");
291 /// },Duration::from_secs(2));
292 ///
293 /// std::thread::sleep(Duration::from_secs(3));
294 /// ```
295 ///
296 /// cancel_interval:
297 ///
298 /// ```
299 /// use swnb_timer::Timer;
300 /// use std::time::Duration;
301 ///
302 /// let timer = Timer::new();
303 ///
304 /// let cancel = timer.set_interval(||{
305 /// println!("every 2 sec");
306 /// },Duration::from_secs(2));
307 ///
308 /// timer.set_timeout(move ||{
309 /// cancel();
310 /// println!("cancel previous timeout callback");
311 /// },Duration::from_secs(1));
312 ///
313 /// std::thread::sleep(Duration::from_secs(7));
314 /// ```
315 pub fn set_interval(
316 &self,
317 callback: impl FnMut() + 'static + Send,
318 duration: std::time::Duration,
319 ) -> impl FnOnce() + Sync + 'static {
320 let now = std::time::Instant::now();
321 let is_deleted: Arc<AtomicBool> = Default::default();
322
323 let task = Task {
324 callback: Callback::Mut(Box::new(callback)),
325 is_deleted: is_deleted.clone(),
326 time: now + duration,
327 reuseable: true,
328 duration,
329 };
330
331 self.push_task(task);
332
333 move || is_deleted.store(true, SeqCst)
334 }
335
336 fn push_task(&self, task: Task) {
337 let mut locker = self.cond.1.lock().unwrap();
338 locker.push(task);
339 drop(locker);
340 self.cond.0.notify_one();
341 }
342
343 /// wait for `duration` time
344 ///
345 /// Examples
346 ///
347 /// ```
348 /// use swnb_timer::Timer;
349 /// use std::time::Duration;
350 ///
351 /// let timer = Timer::new();
352 ///
353 /// let async_block = async {
354 /// timer.wait(Duration::from_secs(1)).await;
355 /// };
356 ///
357 /// // blocking_on(async_block);
358 /// ```
359 ///
360 pub async fn wait(&self, duration: std::time::Duration) {
361 let future_timer = FutureTimer::new(duration, self);
362 future_timer.await
363 }
364}
365
366// FutureTimer impl Future
367// for Timer to do async wait
368struct FutureTimer<'a> {
369 duration: std::time::Duration,
370 is_set_timeout: AtomicBool,
371 is_resolved: Arc<AtomicBool>,
372 timer: &'a Timer,
373}
374
375impl<'a> Future for FutureTimer<'a> {
376 type Output = ();
377 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378 let result = self
379 .is_set_timeout
380 .compare_exchange(false, true, SeqCst, SeqCst);
381
382 if result.is_ok() {
383 let is_resolved = self.is_resolved.clone();
384 let waker = cx.waker().clone();
385 let _ = self.timer.set_timeout(
386 move || {
387 is_resolved.store(true, SeqCst);
388 waker.wake();
389 },
390 self.duration,
391 );
392 Poll::Pending
393 } else if self.is_resolved.load(SeqCst) {
394 Poll::Ready(())
395 } else {
396 Poll::Pending
397 }
398 }
399}
400
401impl<'a> FutureTimer<'a> {
402 fn new<'b: 'a>(duration: std::time::Duration, timer: &'b Timer) -> Self {
403 FutureTimer {
404 duration,
405 is_set_timeout: AtomicBool::new(false),
406 is_resolved: Arc::new(AtomicBool::new(false)),
407 timer,
408 }
409 }
410}
411
412mod test {
413 use std::{default, mem::size_of, time::Instant};
414
415 #[test]
416 fn test_size() {
417 use super::Task;
418 dbg!(size_of::<Task>());
419 dbg!(size_of::<Instant>());
420 let v: bool = Default::default();
421 dbg!(v);
422 }
423}