async_foundation/timer/
timer.rs1use crate::timer::timer_future::TimerFuture;
2use crate::timer::timer_state::TimerState;
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7pub struct Timer {
14 state: Arc<Mutex<TimerState>>,
15 condvar: Arc<Condvar>,
16}
17
18impl Timer {
19 pub fn new() -> Timer {
21 Timer {
22 state: Arc::new(Mutex::new(TimerState::new())),
23 condvar: Arc::new(Condvar::new()),
24 }
25 }
26
27 pub fn wait(&mut self, duration: Duration) -> TimerFuture {
33 let state = &mut *self.state.lock().unwrap();
34 let expiration = Instant::now() + duration;
35 let launched = state.queue.len() > 0;
36 let (time_future, shortest) = state.add_to_queue(expiration);
37 if !launched {
38 self.launch();
39 } else if shortest {
40 self.condvar.notify_one();
41 }
42 time_future
43 }
44
45 fn launch(&self) {
46 let lock = self.state.clone();
47 let condvar = self.condvar.clone();
48 thread::spawn(move || {
49 loop {
51 let mut state = lock.lock().unwrap();
52 let expiration = state.current_expiration();
53 if expiration.is_none() {
54 break;
55 }
56 let expiration = expiration.unwrap();
57 let now = Instant::now();
58 let duration = if expiration > now {
59 expiration - now
60 } else {
61 Duration::ZERO
62 };
63 let result = match condvar.wait_timeout(state, duration) {
65 Ok(result) => result,
66 Err(err) => {
67 eprintln!("Err condvar.wait_timeout: {:?}", &err);
68 err.into_inner()
69 }
70 };
71 state = result.0;
72 if result.1.timed_out() {
73 let guard = state.queue.pop().unwrap();
75 let mut future_state = guard.lock().unwrap();
76 future_state.completed = true;
77 if let Some(waker) = future_state.waker.take() {
78 waker.wake()
79 }
80 } else {
81 }
83 }
84 });
86 }
87}
88
89impl Clone for Timer {
90 fn clone(&self) -> Self {
91 Timer {
92 state: self.state.clone(),
93 condvar: self.condvar.clone(),
94 }
95 }
96}
97
98impl Drop for Timer {
99 fn drop(&mut self) {
100 let mut state = self.state.lock().unwrap();
101 state.queue.clear();
102 self.condvar.notify_one();
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use futures::executor::block_on;
110 use futures::join;
111 use std::time::Instant;
112
113 #[test]
114 fn test_timer() {
115 let mut timer = Timer::new();
116
117 let future = async {
118 let benchmark = Instant::now();
119 let future1 = timer.wait(Duration::from_millis(100));
120 thread::sleep(Duration::from_millis(30));
121 let future2 = timer.wait(Duration::from_millis(50));
122 let future3 = timer.wait(Duration::from_millis(100));
123
124 join!(future1, future2, future3);
125 assert!(benchmark.elapsed() <= Duration::from_millis(140));
126 let benchmark = Instant::now();
127 let future1 = timer.wait(Duration::from_millis(100));
128 thread::sleep(Duration::from_millis(30));
129 let future2 = timer.wait(Duration::from_millis(50));
130 let future3 = timer.wait(Duration::from_millis(100));
131 join!(future1, future2, future3);
132 let elapsed = benchmark.elapsed();
133 assert!(elapsed <= Duration::from_millis(140));
135 };
136 block_on(future);
137 }
138
139 #[test]
140 fn test_many_timers() {
141 let mut timer = Timer::new();
142 let futures = (0..100)
143 .map(|i| timer.wait(Duration::from_millis(i % 10 + 1)))
144 .collect::<Vec<_>>();
145 block_on(futures::future::join_all(futures));
146 assert!(timer.state.lock().unwrap().queue.is_empty());
147 }
148
149 #[test]
150 fn test_timer_new() {
151 let timer = Timer::new();
152 assert!(timer.state.lock().unwrap().queue.is_empty());
153 }
154
155
156 #[test]
157 fn test_timer_wait_single() {
158 let mut timer = Timer::new();
159 let start = Instant::now();
160 let future = timer.wait(Duration::from_millis(50));
161
162 block_on(future);
163 let elapsed = start.elapsed();
164
165 assert!(elapsed >= Duration::from_millis(45));
167 assert!(elapsed <= Duration::from_millis(100));
168 }
169
170 #[test]
171 fn test_timer_wait_multiple_sequential() {
172 let mut timer = Timer::new();
173 let start = Instant::now();
174
175 block_on(timer.wait(Duration::from_millis(10)));
177 block_on(timer.wait(Duration::from_millis(10)));
178 block_on(timer.wait(Duration::from_millis(10)));
179
180 let elapsed = start.elapsed();
181
182 assert!(elapsed >= Duration::from_millis(25));
184 assert!(elapsed <= Duration::from_millis(80));
185 }
186
187 #[test]
188 fn test_timer_wait_multiple_concurrent() {
189 let mut timer = Timer::new();
190 let start = Instant::now();
191
192 let future1 = timer.wait(Duration::from_millis(50));
194 let future2 = timer.wait(Duration::from_millis(30));
195 let future3 = timer.wait(Duration::from_millis(40));
196
197 block_on(async {
198 join!(future1, future2, future3);
199 });
200 let elapsed = start.elapsed();
201
202 assert!(elapsed >= Duration::from_millis(45));
204 assert!(elapsed <= Duration::from_millis(100));
205 }
206
207 #[test]
208 fn test_timer_wait_zero_duration() {
209 let mut timer = Timer::new();
210 let start = Instant::now();
211 let future = timer.wait(Duration::ZERO);
212
213 block_on(future);
214 let elapsed = start.elapsed();
215
216 assert!(elapsed <= Duration::from_millis(10));
218 }
219
220 #[test]
221 fn test_timer_wait_very_short_duration() {
222 let mut timer = Timer::new();
223 let start = Instant::now();
224 let future = timer.wait(Duration::from_millis(1));
225
226 block_on(future);
227 let elapsed = start.elapsed();
228
229 assert!(elapsed <= Duration::from_millis(20));
231 }
232
233 #[test]
234 fn test_timer_queue_ordering() {
235 let mut timer = Timer::new();
236 let start = Instant::now();
237
238 let future1 = timer.wait(Duration::from_millis(100));
240 let future2 = timer.wait(Duration::from_millis(50));
241 let future3 = timer.wait(Duration::from_millis(75));
242
243 block_on(async {
245 join!(future1, future2, future3);
246 });
247 let elapsed = start.elapsed();
248
249 assert!(elapsed >= Duration::from_millis(90));
250 assert!(elapsed <= Duration::from_millis(150));
251 }
252
253 #[test]
254 fn test_timer_drop_clears_queue() {
255 let mut timer = Timer::new();
256
257 let _future1 = timer.wait(Duration::from_millis(100));
259 let _future2 = timer.wait(Duration::from_millis(200));
260
261 assert!(!timer.state.lock().unwrap().queue.is_empty());
263
264 drop(timer);
266
267 }
270
271 #[test]
272 fn test_timer_concurrent_access() {
273 let timer = Timer::new();
274 let timer_arc = std::sync::Arc::new(std::sync::Mutex::new(timer));
275 let mut handles = vec![];
276
277 for i in 0..5 {
279 let timer_clone = timer_arc.clone();
280 let handle = std::thread::spawn(move || {
281 let mut timer = timer_clone.lock().unwrap();
282 let future = timer.wait(Duration::from_millis(10 + i * 10));
283 futures::executor::block_on(future);
284 i
285 });
286 handles.push(handle);
287 }
288
289 for handle in handles {
291 let id = handle.join().expect("Thread panicked");
292 println!("Timer thread {} completed", id);
293 }
294 }
295
296 #[test]
297 fn test_timer_shortest_detection() {
298 let mut timer = Timer::new();
299 let start = Instant::now();
300
301 let future1 = timer.wait(Duration::from_millis(100));
302 let future2 = timer.wait(Duration::from_millis(20));
303 block_on(async {
304 join!(future1, future2);
305 });
306 let elapsed = start.elapsed();
307
308 assert!(elapsed >= Duration::from_millis(90));
309 assert!(elapsed <= Duration::from_millis(150));
310 }
311}