async_foundation/timer/
timer.rs1use crate::timer::timer_future::TimerFuture;
2use crate::timer::timer_state::TimerState;
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7pub struct Timer {
8 state: Arc<Mutex<TimerState>>,
9 condvar: Arc<Condvar>,
10}
11
12impl Timer {
13 pub fn new() -> Timer {
14 Timer {
15 state: Arc::new(Mutex::new(TimerState::new())),
16 condvar: Arc::new(Condvar::new()),
17 }
18 }
19
20 pub fn wait(&mut self, duration: Duration) -> TimerFuture {
21 let state = &mut *self.state.lock().unwrap();
22 let expiration = Instant::now() + duration;
23 let launched = state.queue.len() > 0;
24 let (time_future, shortest) = state.add_to_queue(expiration);
25 if !launched {
26 self.launch();
27 } else if shortest {
28 self.condvar.notify_one();
29 }
30 time_future
31 }
32
33 fn launch(&self) {
34 let lock = self.state.clone();
35 let condvar = self.condvar.clone();
36 thread::spawn(move || {
37 loop {
39 let mut state = lock.lock().unwrap();
40 let expiration = state.current_expiration();
41 if expiration.is_none() {
42 break;
43 }
44 let expiration = expiration.unwrap();
45 let now = Instant::now();
46 let duration = if expiration > now {
47 expiration - now
48 } else {
49 Duration::ZERO
50 };
51 let result = match condvar.wait_timeout(state, duration) {
53 Ok(result) => result,
54 Err(err) => {
55 eprintln!("Err condvar.wait_timeout: {:?}", &err);
56 err.into_inner()
57 }
58 };
59 state = result.0;
60 if result.1.timed_out() {
61 let guard = state.queue.pop().unwrap();
63 let mut future_state = guard.lock().unwrap();
64 future_state.completed = true;
65 if let Some(waker) = future_state.waker.take() {
66 waker.wake()
67 }
68 } else {
69 }
71 }
72 });
74 }
75}
76
77impl Clone for Timer {
78 fn clone(&self) -> Self {
79 Timer {
80 state: self.state.clone(),
81 condvar: self.condvar.clone(),
82 }
83 }
84}
85
86impl Drop for Timer {
87 fn drop(&mut self) {
88 let mut state = self.state.lock().unwrap();
89 state.queue.clear();
90 self.condvar.notify_one();
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use futures::executor::block_on;
98 use futures::join;
99 use std::time::Instant;
100
101 #[test]
102 fn test_timer() {
103 let mut timer = Timer::new();
104
105 let future = async {
106 let benchmark = Instant::now();
107 let future1 = timer.wait(Duration::from_millis(100));
108 thread::sleep(Duration::from_millis(30));
109 let future2 = timer.wait(Duration::from_millis(50));
110 let future3 = timer.wait(Duration::from_millis(100));
111
112 join!(future1, future2, future3);
113 assert!(benchmark.elapsed() <= Duration::from_millis(140));
114 let benchmark = Instant::now();
115 let future1 = timer.wait(Duration::from_millis(100));
116 thread::sleep(Duration::from_millis(30));
117 let future2 = timer.wait(Duration::from_millis(50));
118 let future3 = timer.wait(Duration::from_millis(100));
119 join!(future1, future2, future3);
120 let elapsed = benchmark.elapsed();
121 assert!(elapsed <= Duration::from_millis(140));
123 };
124 block_on(future);
125 }
126
127 #[test]
128 fn test_many_timers() {
129 let mut timer = Timer::new();
130 let futures = (0..100)
131 .map(|i| timer.wait(Duration::from_millis(i % 10 + 1)))
132 .collect::<Vec<_>>();
133 block_on(futures::future::join_all(futures));
134 assert!(timer.state.lock().unwrap().queue.is_empty());
135 }
136
137 #[test]
138 fn test_timer_new() {
139 let timer = Timer::new();
140 assert!(timer.state.lock().unwrap().queue.is_empty());
141 }
142
143
144 #[test]
145 fn test_timer_wait_single() {
146 let mut timer = Timer::new();
147 let start = Instant::now();
148 let future = timer.wait(Duration::from_millis(50));
149
150 block_on(future);
151 let elapsed = start.elapsed();
152
153 assert!(elapsed >= Duration::from_millis(45));
155 assert!(elapsed <= Duration::from_millis(100));
156 }
157
158 #[test]
159 fn test_timer_wait_multiple_sequential() {
160 let mut timer = Timer::new();
161 let start = Instant::now();
162
163 block_on(timer.wait(Duration::from_millis(10)));
165 block_on(timer.wait(Duration::from_millis(10)));
166 block_on(timer.wait(Duration::from_millis(10)));
167
168 let elapsed = start.elapsed();
169
170 assert!(elapsed >= Duration::from_millis(25));
172 assert!(elapsed <= Duration::from_millis(80));
173 }
174
175 #[test]
176 fn test_timer_wait_multiple_concurrent() {
177 let mut timer = Timer::new();
178 let start = Instant::now();
179
180 let future1 = timer.wait(Duration::from_millis(50));
182 let future2 = timer.wait(Duration::from_millis(30));
183 let future3 = timer.wait(Duration::from_millis(40));
184
185 block_on(async {
186 join!(future1, future2, future3);
187 });
188 let elapsed = start.elapsed();
189
190 assert!(elapsed >= Duration::from_millis(45));
192 assert!(elapsed <= Duration::from_millis(100));
193 }
194
195 #[test]
196 fn test_timer_wait_zero_duration() {
197 let mut timer = Timer::new();
198 let start = Instant::now();
199 let future = timer.wait(Duration::ZERO);
200
201 block_on(future);
202 let elapsed = start.elapsed();
203
204 assert!(elapsed <= Duration::from_millis(10));
206 }
207
208 #[test]
209 fn test_timer_wait_very_short_duration() {
210 let mut timer = Timer::new();
211 let start = Instant::now();
212 let future = timer.wait(Duration::from_millis(1));
213
214 block_on(future);
215 let elapsed = start.elapsed();
216
217 assert!(elapsed <= Duration::from_millis(20));
219 }
220
221 #[test]
222 fn test_timer_queue_ordering() {
223 let mut timer = Timer::new();
224 let start = Instant::now();
225
226 let future1 = timer.wait(Duration::from_millis(100));
228 let future2 = timer.wait(Duration::from_millis(50));
229 let future3 = timer.wait(Duration::from_millis(75));
230
231 block_on(async {
233 join!(future1, future2, future3);
234 });
235 let elapsed = start.elapsed();
236
237 assert!(elapsed >= Duration::from_millis(90));
238 assert!(elapsed <= Duration::from_millis(150));
239 }
240
241 #[test]
242 fn test_timer_drop_clears_queue() {
243 let mut timer = Timer::new();
244
245 let _future1 = timer.wait(Duration::from_millis(100));
247 let _future2 = timer.wait(Duration::from_millis(200));
248
249 assert!(!timer.state.lock().unwrap().queue.is_empty());
251
252 drop(timer);
254
255 }
258
259 #[test]
260 fn test_timer_concurrent_access() {
261 let timer = Timer::new();
262 let timer_arc = std::sync::Arc::new(std::sync::Mutex::new(timer));
263 let mut handles = vec![];
264
265 for i in 0..5 {
267 let timer_clone = timer_arc.clone();
268 let handle = std::thread::spawn(move || {
269 let mut timer = timer_clone.lock().unwrap();
270 let future = timer.wait(Duration::from_millis(10 + i * 10));
271 futures::executor::block_on(future);
272 i
273 });
274 handles.push(handle);
275 }
276
277 for handle in handles {
279 let id = handle.join().expect("Thread panicked");
280 println!("Timer thread {} completed", id);
281 }
282 }
283
284 #[test]
285 fn test_timer_shortest_detection() {
286 let mut timer = Timer::new();
287 let start = Instant::now();
288
289 let future1 = timer.wait(Duration::from_millis(100));
290 let future2 = timer.wait(Duration::from_millis(20));
291 block_on(async {
292 join!(future1, future2);
293 });
294 let elapsed = start.elapsed();
295
296 assert!(elapsed >= Duration::from_millis(90));
297 assert!(elapsed <= Duration::from_millis(150));
298 }
299}