async_foundation/timer/
timer.rs

1use crate::timer::timer_future::TimerFuture;
2use crate::timer::timer_state::TimerState;
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7pub struct Timer {
8    state: Arc<Mutex<TimerState>>,
9    condvar: Arc<Condvar>,
10}
11
12impl Timer {
13    pub fn new() -> Timer {
14        Timer {
15            state: Arc::new(Mutex::new(TimerState::new())),
16            condvar: Arc::new(Condvar::new()),
17        }
18    }
19
20    pub fn wait(&mut self, duration: Duration) -> TimerFuture {
21        let state = &mut *self.state.lock().unwrap();
22        let expiration = Instant::now() + duration;
23        let launched = state.queue.len() > 0;
24        let (time_future, shortest) = state.add_to_queue(expiration);
25        if !launched {
26            self.launch();
27        } else if shortest {
28            self.condvar.notify_one();
29        }
30        time_future
31    }
32
33    fn launch(&self) {
34        let lock = self.state.clone();
35        let condvar = self.condvar.clone();
36        thread::spawn(move || {
37            // eprintln!("timer: spawn new timer thread");
38            loop {
39                let mut state = lock.lock().unwrap();
40                let expiration = state.current_expiration();
41                if expiration.is_none() {
42                    break;
43                }
44                let expiration = expiration.unwrap();
45                let now = Instant::now();
46                let duration = if expiration > now {
47                    expiration - now
48                } else {
49                    Duration::ZERO
50                };
51                // eprintln!("waiting for {:?}  {:?}", duration, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH));
52                let result = match condvar.wait_timeout(state, duration) {
53                    Ok(result) => result,
54                    Err(err) => {
55                        eprintln!("Err condvar.wait_timeout: {:?}", &err);
56                        err.into_inner()
57                    }
58                };
59                state = result.0;
60                if result.1.timed_out() {
61                    // eprintln!("timer: timeout {:?} {:?}", duration, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH));
62                    let guard = state.queue.pop().unwrap();
63                    let mut future_state = guard.lock().unwrap();
64                    future_state.completed = true;
65                    if let Some(waker) = future_state.waker.take() {
66                        waker.wake()
67                    }
68                } else {
69                    // eprintln!("timer: interrupted");
70                }
71            }
72            // eprintln!("timer: thread has been dropped");
73        });
74    }
75}
76
77impl Clone for Timer {
78    fn clone(&self) -> Self {
79        Timer {
80            state: self.state.clone(),
81            condvar: self.condvar.clone(),
82        }
83    }
84}
85
86impl Drop for Timer {
87    fn drop(&mut self) {
88        let mut state = self.state.lock().unwrap();
89        state.queue.clear();
90        self.condvar.notify_one();
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use futures::executor::block_on;
98    use futures::join;
99    use std::time::Instant;
100
101    #[test]
102    fn test_timer() {
103        let mut timer = Timer::new();
104
105        let future = async {
106            let benchmark = Instant::now();
107            let future1 = timer.wait(Duration::from_millis(100));
108            thread::sleep(Duration::from_millis(30));
109            let future2 = timer.wait(Duration::from_millis(50));
110            let future3 = timer.wait(Duration::from_millis(100));
111
112            join!(future1, future2, future3);
113            assert!(benchmark.elapsed() <= Duration::from_millis(140));
114            let benchmark = Instant::now();
115            let future1 = timer.wait(Duration::from_millis(100));
116            thread::sleep(Duration::from_millis(30));
117            let future2 = timer.wait(Duration::from_millis(50));
118            let future3 = timer.wait(Duration::from_millis(100));
119            join!(future1, future2, future3);
120            let elapsed = benchmark.elapsed();
121            // dbg!(elapsed);
122            assert!(elapsed <= Duration::from_millis(140));
123        };
124        block_on(future);
125    }
126
127    #[test]
128    fn test_many_timers() {
129        let mut timer = Timer::new();
130        let futures = (0..100)
131            .map(|i| timer.wait(Duration::from_millis(i % 10 + 1)))
132            .collect::<Vec<_>>();
133        block_on(futures::future::join_all(futures));
134        assert!(timer.state.lock().unwrap().queue.is_empty());
135    }
136
137    #[test]
138    fn test_timer_new() {
139        let timer = Timer::new();
140        assert!(timer.state.lock().unwrap().queue.is_empty());
141    }
142
143
144    #[test]
145    fn test_timer_wait_single() {
146        let mut timer = Timer::new();
147        let start = Instant::now();
148        let future = timer.wait(Duration::from_millis(50));
149        
150        block_on(future);
151        let elapsed = start.elapsed();
152        
153        // Should take approximately 50ms, with some tolerance
154        assert!(elapsed >= Duration::from_millis(45));
155        assert!(elapsed <= Duration::from_millis(100));
156    }
157
158    #[test]
159    fn test_timer_wait_multiple_sequential() {
160        let mut timer = Timer::new();
161        let start = Instant::now();
162        
163        // Wait for multiple timers sequentially
164        block_on(timer.wait(Duration::from_millis(10)));
165        block_on(timer.wait(Duration::from_millis(10)));
166        block_on(timer.wait(Duration::from_millis(10)));
167        
168        let elapsed = start.elapsed();
169        
170        // Should take approximately 30ms total
171        assert!(elapsed >= Duration::from_millis(25));
172        assert!(elapsed <= Duration::from_millis(80));
173    }
174
175    #[test]
176    fn test_timer_wait_multiple_concurrent() {
177        let mut timer = Timer::new();
178        let start = Instant::now();
179        
180        // Wait for multiple timers concurrently
181        let future1 = timer.wait(Duration::from_millis(50));
182        let future2 = timer.wait(Duration::from_millis(30));
183        let future3 = timer.wait(Duration::from_millis(40));
184        
185        block_on(async {
186            join!(future1, future2, future3);
187        });
188        let elapsed = start.elapsed();
189        
190        // Should take approximately 50ms (the longest)
191        assert!(elapsed >= Duration::from_millis(45));
192        assert!(elapsed <= Duration::from_millis(100));
193    }
194
195    #[test]
196    fn test_timer_wait_zero_duration() {
197        let mut timer = Timer::new();
198        let start = Instant::now();
199        let future = timer.wait(Duration::ZERO);
200        
201        block_on(future);
202        let elapsed = start.elapsed();
203        
204        // Should complete very quickly
205        assert!(elapsed <= Duration::from_millis(10));
206    }
207
208    #[test]
209    fn test_timer_wait_very_short_duration() {
210        let mut timer = Timer::new();
211        let start = Instant::now();
212        let future = timer.wait(Duration::from_millis(1));
213        
214        block_on(future);
215        let elapsed = start.elapsed();
216        
217        // Should complete quickly
218        assert!(elapsed <= Duration::from_millis(20));
219    }
220
221    #[test]
222    fn test_timer_queue_ordering() {
223        let mut timer = Timer::new();
224        let start = Instant::now();
225        
226        // Add timers in reverse order
227        let future1 = timer.wait(Duration::from_millis(100));
228        let future2 = timer.wait(Duration::from_millis(50));
229        let future3 = timer.wait(Duration::from_millis(75));
230        
231        // All should complete in approximately 100ms (the longest)
232        block_on(async {
233            join!(future1, future2, future3);
234        });
235        let elapsed = start.elapsed();
236        
237        assert!(elapsed >= Duration::from_millis(90));
238        assert!(elapsed <= Duration::from_millis(150));
239    }
240
241    #[test]
242    fn test_timer_drop_clears_queue() {
243        let mut timer = Timer::new();
244        
245        // Add some timers
246        let _future1 = timer.wait(Duration::from_millis(100));
247        let _future2 = timer.wait(Duration::from_millis(200));
248        
249        // Queue should have timers
250        assert!(!timer.state.lock().unwrap().queue.is_empty());
251        
252        // Drop the timer
253        drop(timer);
254        
255        // Note: We can't test the queue after drop since we don't have access to it
256        // But the drop implementation should clear the queue and notify the condvar
257    }
258
259    #[test]
260    fn test_timer_concurrent_access() {
261        let timer = Timer::new();
262        let timer_arc = std::sync::Arc::new(std::sync::Mutex::new(timer));
263        let mut handles = vec![];
264        
265        // Spawn multiple threads that add timers
266        for i in 0..5 {
267            let timer_clone = timer_arc.clone();
268            let handle = std::thread::spawn(move || {
269                let mut timer = timer_clone.lock().unwrap();
270                let future = timer.wait(Duration::from_millis(10 + i * 10));
271                futures::executor::block_on(future);
272                i
273            });
274            handles.push(handle);
275        }
276        
277        // All should complete successfully
278        for handle in handles {
279            let id = handle.join().expect("Thread panicked");
280            println!("Timer thread {} completed", id);
281        }
282    }
283
284    #[test]
285    fn test_timer_shortest_detection() {
286        let mut timer = Timer::new();
287        let start = Instant::now();
288        
289        let future1 = timer.wait(Duration::from_millis(100));
290        let future2 = timer.wait(Duration::from_millis(20));
291        block_on(async {
292            join!(future1, future2);
293        });
294        let elapsed = start.elapsed();
295        
296        assert!(elapsed >= Duration::from_millis(90));
297        assert!(elapsed <= Duration::from_millis(150));
298    }
299}