rust_rcs_core/util/
timer.rs1use std::sync::{Arc, Condvar, Mutex};
16use std::thread;
17use std::time::{Duration, Instant};
18
19type Callback = Box<dyn FnOnce() + Send + 'static>;
20
21struct TimerSource {
22 t: Instant,
23 cb: Callback,
24}
25
26pub struct Timer {
27 cond_pair: Arc<(Mutex<(bool, Vec<TimerSource>, bool)>, Condvar)>,
28 thread: Option<thread::JoinHandle<()>>,
29}
30
31impl Timer {
32 pub fn new() -> Timer {
36 let v: Vec<TimerSource> = Vec::with_capacity(16);
37
38 let cond_pair = Arc::new((Mutex::new((false, v, false)), Condvar::new()));
39
40 let cloned_pair = Arc::clone(&cond_pair);
41
42 let thread = thread::spawn(move || loop {
43 let (mutex, cond) = &*cloned_pair;
44
45 let v = mutex.lock().unwrap();
46
47 let now = Instant::now();
48
49 let mut delay = Duration::MAX;
50
51 for elem in &*v.1 {
52 let d = elem.t.saturating_duration_since(now);
53 if d < delay {
54 delay = d;
55 }
56 }
57
58 dbg!(delay);
59
60 let result = cond
61 .wait_timeout_while(v, delay, |v| {
62 if v.0 || v.2 {
63 return false;
64 }
65 let now = Instant::now();
66 for elem in &*v.1 {
67 if elem.t < now {
68 return false;
69 }
70 }
71 true
72 })
73 .unwrap();
74
75 let mut v = result.0;
76
77 v.0 = false;
78
79 let mut i = 0;
80
81 while i < v.1.len() {
82 let a = &v.1[i];
83
84 let now = Instant::now();
85
86 if a.t < now {
87 let b = v.1.swap_remove(i);
88 let cb = b.cb;
89 cb();
90 } else {
91 i = i + 1;
92 }
93 }
94
95 if v.2 {
96 dbg!("thread exit\n");
97 return;
98 }
99 });
100
101 Timer {
102 cond_pair,
103 thread: Some(thread),
104 }
105 }
106
107 pub fn schedule<F>(&self, delay: Duration, f: F)
120 where
121 F: FnOnce() + Send + 'static,
122 {
123 assert!(delay > Duration::ZERO);
124
125 let (mutex, cond) = &*self.cond_pair;
126
127 let mut v = mutex.lock().unwrap();
128
129 v.0 = true;
130
131 v.1.push(TimerSource {
132 t: Instant::now() + delay,
133 cb: Box::new(f),
134 });
135
136 cond.notify_one();
137 }
138}
139
140impl Drop for Timer {
141 fn drop(&mut self) {
142 dbg!("drop\n");
143
144 {
145 let (mutex, cond) = &*self.cond_pair;
146
147 let mut v = mutex.lock().unwrap();
148
149 v.2 = true;
150
151 cond.notify_one();
152 }
153
154 if let Some(t) = self.thread.take() {
155 t.join().unwrap();
156 }
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 #[test]
163 fn multiple_sources() {
164 let timer = super::Timer::new();
165 let (tx, rx) = std::sync::mpsc::channel();
166 let tx = std::sync::Arc::new(std::sync::Mutex::new(tx));
167 for i in 1..6 {
168 let tx = std::sync::Arc::clone(&tx);
169 print!("schedule {} over timer after {} seconds\n", i, i);
170 timer.schedule(super::Duration::from_secs(i), move || {
171 print!("send {} to callback\n", i);
172 tx.lock().unwrap().send(i).unwrap();
173 });
174 }
175
176 assert_eq!(Ok(1), rx.recv());
177 assert_eq!(Ok(2), rx.recv());
178 assert_eq!(Ok(3), rx.recv());
179 assert_eq!(Ok(4), rx.recv());
180 assert_eq!(Ok(5), rx.recv());
181
182 for i in 1..6 {
183 let tx = std::sync::Arc::clone(&tx);
184 print!("schedule {} over timer after {} seconds\n", i, 6 - i);
185 timer.schedule(super::Duration::from_secs(6 - i), move || {
186 print!("send {} to callback\n", i);
187 tx.lock().unwrap().send(i).unwrap();
188 });
189 }
190
191 assert_eq!(Ok(5), rx.recv());
192 assert_eq!(Ok(4), rx.recv());
193 assert_eq!(Ok(3), rx.recv());
194 assert_eq!(Ok(2), rx.recv());
195 assert_eq!(Ok(1), rx.recv());
196
197 std::thread::sleep(std::time::Duration::from_secs(8));
198 }
199}