1use std::time::{Duration, Instant};
2use std::sync::{Arc, Mutex, Condvar};
3use std::collections::BinaryHeap;
4use std::thread;
5use std::cmp::{Ord, PartialOrd, Ordering, Eq};
6
7struct Entry {
8 pub instant: Instant,
9 pub callback: Box<FnMut() + Send>
10}
11
12impl PartialOrd for Entry {
13 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
14 Some(self.cmp(other))
15 }
16}
17
18impl Eq for Entry {}
19
20impl PartialEq for Entry {
21 fn eq(&self, other: &Self) -> bool {
22 self.instant == other.instant
23 }
24}
25
26impl Ord for Entry {
27 fn cmp(&self, other: &Self) -> Ordering {
28 match self.instant.cmp(&other.instant) {
29 Ordering::Greater => Ordering::Less,
30 Ordering::Less => Ordering::Greater,
31 Ordering::Equal => Ordering::Equal
32 }
33 }
34}
35
36struct SharedData {
37 pub cond_var: Condvar,
38 pub callbacks: Mutex<BinaryHeap<Entry>>
39}
40
41pub struct Scheduler {
42 data: Arc<SharedData>
43}
44
45impl Scheduler {
46 pub fn new() -> Scheduler {
47 let shared_data = Arc::new(SharedData {
48 cond_var: Condvar::new(),
49 callbacks: Mutex::new(BinaryHeap::new())
50 });
51 {
52 let shared_data = shared_data.clone();
53 thread::spawn(move || {
54 let mut callbacks = shared_data.callbacks.lock().unwrap();
55 loop {
56 let entry = callbacks.pop();
57 match entry {
58 Some(mut entry) => {
59 let now = Instant::now();
60 if entry.instant > now {
61 let wait_duration = entry.instant - now;
62 callbacks.push(entry);
63 callbacks = shared_data.cond_var
64 .wait_timeout(callbacks, wait_duration).unwrap().0;
65 } else {
66 (entry.callback)()
67 }
68 }
69 None => {
70 callbacks = shared_data.cond_var.wait(callbacks).unwrap();
71 }
72 }
73 }
74 });
75 }
76
77 Scheduler {
78 data: shared_data
79 }
80 }
81
82 pub fn after_instant<F>(&self, instant: Instant, func: F)
83 where F: FnOnce() + Send + 'static {
84 let mut func = Some(func);
85 self.data.callbacks.lock().unwrap().push(Entry {
86 instant,
87 callback: Box::new(move || {
88 if let Some(func) = func.take() {
89 (func)()
90 }
91 }),
92 });
93 self.data.cond_var.notify_all();
94 }
95
96 pub fn after_duration<F>(&self, duration: Duration, func: F)
97 where F: FnOnce() + Send + 'static {
98 self.after_instant(Instant::now() + duration, func)
99 }
100}
101
102#[test]
103fn test() {
104 use std::sync::atomic::{AtomicBool, Ordering};
105
106 let atomic = Arc::new(AtomicBool::new(false));
107 let scheduler = Scheduler::new();
108 {
109 let atomic = atomic.clone();
110 scheduler.after_instant(Instant::now() + Duration::from_millis(10), move || {
111 atomic.store(true, Ordering::Relaxed);
112 });
113 }
114 thread::sleep(Duration::from_millis(100));
115 assert_eq!(atomic.load(Ordering::Relaxed), true);
116}