async_foundation/timer/
timer.rs

1use crate::timer::timer_future::TimerFuture;
2use crate::timer::timer_state::TimerState;
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7/// A simple async timer that can schedule waits and manage many concurrent timers.
8///
9/// The timer maintains an internal queue of expirations and a dedicated thread
10/// that sleeps until the next expiration or is woken when a shorter timeout is
11/// scheduled. Each call to [`Timer::wait`] returns a [`TimerFuture`] that
12/// becomes ready when the configured duration elapses.
13pub struct Timer {
14    state: Arc<Mutex<TimerState>>,
15    condvar: Arc<Condvar>,
16}
17
18impl Timer {
19    /// Creates a new empty timer with no scheduled expirations.
20    pub fn new() -> Timer {
21        Timer {
22            state: Arc::new(Mutex::new(TimerState::new())),
23            condvar: Arc::new(Condvar::new()),
24        }
25    }
26
27    /// Schedules a new timeout and returns a future that resolves when it expires.
28    ///
29    /// Multiple waits can be outstanding at the same time; the timer keeps them
30    /// ordered by expiration and wakes the appropriate futures when their
31    /// deadlines are reached.
32    pub fn wait(&mut self, duration: Duration) -> TimerFuture {
33        let state = &mut *self.state.lock().unwrap();
34        let expiration = Instant::now() + duration;
35        let launched = state.queue.len() > 0;
36        let (time_future, shortest) = state.add_to_queue(expiration);
37        if !launched {
38            self.launch();
39        } else if shortest {
40            self.condvar.notify_one();
41        }
42        time_future
43    }
44
45    fn launch(&self) {
46        let lock = self.state.clone();
47        let condvar = self.condvar.clone();
48        thread::spawn(move || {
49            // eprintln!("timer: spawn new timer thread");
50            loop {
51                let mut state = lock.lock().unwrap();
52                let expiration = state.current_expiration();
53                if expiration.is_none() {
54                    break;
55                }
56                let expiration = expiration.unwrap();
57                let now = Instant::now();
58                let duration = if expiration > now {
59                    expiration - now
60                } else {
61                    Duration::ZERO
62                };
63                // eprintln!("waiting for {:?}  {:?}", duration, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH));
64                let result = match condvar.wait_timeout(state, duration) {
65                    Ok(result) => result,
66                    Err(err) => {
67                        eprintln!("Err condvar.wait_timeout: {:?}", &err);
68                        err.into_inner()
69                    }
70                };
71                state = result.0;
72                if result.1.timed_out() {
73                    // eprintln!("timer: timeout {:?} {:?}", duration, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH));
74                    let guard = state.queue.pop().unwrap();
75                    let mut future_state = guard.lock().unwrap();
76                    future_state.completed = true;
77                    if let Some(waker) = future_state.waker.take() {
78                        waker.wake()
79                    }
80                } else {
81                    // eprintln!("timer: interrupted");
82                }
83            }
84            // eprintln!("timer: thread has been dropped");
85        });
86    }
87}
88
89impl Clone for Timer {
90    fn clone(&self) -> Self {
91        Timer {
92            state: self.state.clone(),
93            condvar: self.condvar.clone(),
94        }
95    }
96}
97
98impl Drop for Timer {
99    fn drop(&mut self) {
100        let mut state = self.state.lock().unwrap();
101        state.queue.clear();
102        self.condvar.notify_one();
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use futures::executor::block_on;
110    use futures::join;
111    use std::time::Instant;
112
113    #[test]
114    fn test_timer() {
115        let mut timer = Timer::new();
116
117        let future = async {
118            let benchmark = Instant::now();
119            let future1 = timer.wait(Duration::from_millis(100));
120            thread::sleep(Duration::from_millis(30));
121            let future2 = timer.wait(Duration::from_millis(50));
122            let future3 = timer.wait(Duration::from_millis(100));
123
124            join!(future1, future2, future3);
125            assert!(benchmark.elapsed() <= Duration::from_millis(140));
126            let benchmark = Instant::now();
127            let future1 = timer.wait(Duration::from_millis(100));
128            thread::sleep(Duration::from_millis(30));
129            let future2 = timer.wait(Duration::from_millis(50));
130            let future3 = timer.wait(Duration::from_millis(100));
131            join!(future1, future2, future3);
132            let elapsed = benchmark.elapsed();
133            // dbg!(elapsed);
134            assert!(elapsed <= Duration::from_millis(140));
135        };
136        block_on(future);
137    }
138
139    #[test]
140    fn test_many_timers() {
141        let mut timer = Timer::new();
142        let futures = (0..100)
143            .map(|i| timer.wait(Duration::from_millis(i % 10 + 1)))
144            .collect::<Vec<_>>();
145        block_on(futures::future::join_all(futures));
146        assert!(timer.state.lock().unwrap().queue.is_empty());
147    }
148
149    #[test]
150    fn test_timer_new() {
151        let timer = Timer::new();
152        assert!(timer.state.lock().unwrap().queue.is_empty());
153    }
154
155
156    #[test]
157    fn test_timer_wait_single() {
158        let mut timer = Timer::new();
159        let start = Instant::now();
160        let future = timer.wait(Duration::from_millis(50));
161        
162        block_on(future);
163        let elapsed = start.elapsed();
164        
165        // Should take approximately 50ms, with some tolerance
166        assert!(elapsed >= Duration::from_millis(45));
167        assert!(elapsed <= Duration::from_millis(100));
168    }
169
170    #[test]
171    fn test_timer_wait_multiple_sequential() {
172        let mut timer = Timer::new();
173        let start = Instant::now();
174        
175        // Wait for multiple timers sequentially
176        block_on(timer.wait(Duration::from_millis(10)));
177        block_on(timer.wait(Duration::from_millis(10)));
178        block_on(timer.wait(Duration::from_millis(10)));
179        
180        let elapsed = start.elapsed();
181        
182        // Should take approximately 30ms total
183        assert!(elapsed >= Duration::from_millis(25));
184        assert!(elapsed <= Duration::from_millis(80));
185    }
186
187    #[test]
188    fn test_timer_wait_multiple_concurrent() {
189        let mut timer = Timer::new();
190        let start = Instant::now();
191        
192        // Wait for multiple timers concurrently
193        let future1 = timer.wait(Duration::from_millis(50));
194        let future2 = timer.wait(Duration::from_millis(30));
195        let future3 = timer.wait(Duration::from_millis(40));
196        
197        block_on(async {
198            join!(future1, future2, future3);
199        });
200        let elapsed = start.elapsed();
201        
202        // Should take approximately 50ms (the longest)
203        assert!(elapsed >= Duration::from_millis(45));
204        assert!(elapsed <= Duration::from_millis(100));
205    }
206
207    #[test]
208    fn test_timer_wait_zero_duration() {
209        let mut timer = Timer::new();
210        let start = Instant::now();
211        let future = timer.wait(Duration::ZERO);
212        
213        block_on(future);
214        let elapsed = start.elapsed();
215        
216        // Should complete very quickly
217        assert!(elapsed <= Duration::from_millis(10));
218    }
219
220    #[test]
221    fn test_timer_wait_very_short_duration() {
222        let mut timer = Timer::new();
223        let start = Instant::now();
224        let future = timer.wait(Duration::from_millis(1));
225        
226        block_on(future);
227        let elapsed = start.elapsed();
228        
229        // Should complete quickly
230        assert!(elapsed <= Duration::from_millis(20));
231    }
232
233    #[test]
234    fn test_timer_queue_ordering() {
235        let mut timer = Timer::new();
236        let start = Instant::now();
237        
238        // Add timers in reverse order
239        let future1 = timer.wait(Duration::from_millis(100));
240        let future2 = timer.wait(Duration::from_millis(50));
241        let future3 = timer.wait(Duration::from_millis(75));
242        
243        // All should complete in approximately 100ms (the longest)
244        block_on(async {
245            join!(future1, future2, future3);
246        });
247        let elapsed = start.elapsed();
248        
249        assert!(elapsed >= Duration::from_millis(90));
250        assert!(elapsed <= Duration::from_millis(150));
251    }
252
253    #[test]
254    fn test_timer_drop_clears_queue() {
255        let mut timer = Timer::new();
256        
257        // Add some timers
258        let _future1 = timer.wait(Duration::from_millis(100));
259        let _future2 = timer.wait(Duration::from_millis(200));
260        
261        // Queue should have timers
262        assert!(!timer.state.lock().unwrap().queue.is_empty());
263        
264        // Drop the timer
265        drop(timer);
266        
267        // Note: We can't test the queue after drop since we don't have access to it
268        // But the drop implementation should clear the queue and notify the condvar
269    }
270
271    #[test]
272    fn test_timer_concurrent_access() {
273        let timer = Timer::new();
274        let timer_arc = std::sync::Arc::new(std::sync::Mutex::new(timer));
275        let mut handles = vec![];
276        
277        // Spawn multiple threads that add timers
278        for i in 0..5 {
279            let timer_clone = timer_arc.clone();
280            let handle = std::thread::spawn(move || {
281                let mut timer = timer_clone.lock().unwrap();
282                let future = timer.wait(Duration::from_millis(10 + i * 10));
283                futures::executor::block_on(future);
284                i
285            });
286            handles.push(handle);
287        }
288        
289        // All should complete successfully
290        for handle in handles {
291            let id = handle.join().expect("Thread panicked");
292            println!("Timer thread {} completed", id);
293        }
294    }
295
296    #[test]
297    fn test_timer_shortest_detection() {
298        let mut timer = Timer::new();
299        let start = Instant::now();
300        
301        let future1 = timer.wait(Duration::from_millis(100));
302        let future2 = timer.wait(Duration::from_millis(20));
303        block_on(async {
304            join!(future1, future2);
305        });
306        let elapsed = start.elapsed();
307        
308        assert!(elapsed >= Duration::from_millis(90));
309        assert!(elapsed <= Duration::from_millis(150));
310    }
311}