async-foundation 0.2.1

Foundational async primitives for Rust - timers, networking, and common utilities
Documentation
use crate::timer::timer_future::TimerFuture;
use crate::timer::timer_state::TimerState;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};

/// A simple async timer that can schedule waits and manage many concurrent timers.
///
/// The timer maintains an internal queue of expirations and a dedicated thread
/// that sleeps until the next expiration or is woken when a shorter timeout is
/// scheduled. Each call to [`Timer::wait`] returns a [`TimerFuture`] that
/// becomes ready when the configured duration elapses.
pub struct Timer {
    state: Arc<Mutex<TimerState>>,
    condvar: Arc<Condvar>,
}

impl Timer {
    /// Creates a new empty timer with no scheduled expirations.
    pub fn new() -> Timer {
        Timer {
            state: Arc::new(Mutex::new(TimerState::new())),
            condvar: Arc::new(Condvar::new()),
        }
    }

    /// Schedules a new timeout and returns a future that resolves when it expires.
    ///
    /// Multiple waits can be outstanding at the same time; the timer keeps them
    /// ordered by expiration and wakes the appropriate futures when their
    /// deadlines are reached.
    pub fn wait(&mut self, duration: Duration) -> TimerFuture {
        let state = &mut *self.state.lock().unwrap();
        let expiration = Instant::now() + duration;
        let launched = state.queue.len() > 0;
        let (time_future, shortest) = state.add_to_queue(expiration);
        if !launched {
            self.launch();
        } else if shortest {
            self.condvar.notify_one();
        }
        time_future
    }

    fn launch(&self) {
        let lock = self.state.clone();
        let condvar = self.condvar.clone();
        thread::spawn(move || {
            // eprintln!("timer: spawn new timer thread");
            loop {
                let mut state = lock.lock().unwrap();
                let expiration = state.current_expiration();
                if expiration.is_none() {
                    break;
                }
                let expiration = expiration.unwrap();
                let now = Instant::now();
                let duration = if expiration > now {
                    expiration - now
                } else {
                    Duration::ZERO
                };
                // eprintln!("waiting for {:?}  {:?}", duration, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH));
                let result = match condvar.wait_timeout(state, duration) {
                    Ok(result) => result,
                    Err(err) => {
                        eprintln!("Err condvar.wait_timeout: {:?}", &err);
                        err.into_inner()
                    }
                };
                state = result.0;
                if result.1.timed_out() {
                    // eprintln!("timer: timeout {:?} {:?}", duration, std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH));
                    let guard = state.queue.pop().unwrap();
                    let mut future_state = guard.lock().unwrap();
                    future_state.completed = true;
                    if let Some(waker) = future_state.waker.take() {
                        waker.wake()
                    }
                } else {
                    // eprintln!("timer: interrupted");
                }
            }
            // eprintln!("timer: thread has been dropped");
        });
    }
}

impl Clone for Timer {
    fn clone(&self) -> Self {
        Timer {
            state: self.state.clone(),
            condvar: self.condvar.clone(),
        }
    }
}

impl Drop for Timer {
    fn drop(&mut self) {
        let mut state = self.state.lock().unwrap();
        state.queue.clear();
        self.condvar.notify_one();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::executor::block_on;
    use futures::join;
    use std::time::Instant;

    #[test]
    fn test_timer() {
        let mut timer = Timer::new();

        let future = async {
            let benchmark = Instant::now();
            let future1 = timer.wait(Duration::from_millis(100));
            thread::sleep(Duration::from_millis(30));
            let future2 = timer.wait(Duration::from_millis(50));
            let future3 = timer.wait(Duration::from_millis(100));

            join!(future1, future2, future3);
            assert!(benchmark.elapsed() <= Duration::from_millis(140));
            let benchmark = Instant::now();
            let future1 = timer.wait(Duration::from_millis(100));
            thread::sleep(Duration::from_millis(30));
            let future2 = timer.wait(Duration::from_millis(50));
            let future3 = timer.wait(Duration::from_millis(100));
            join!(future1, future2, future3);
            let elapsed = benchmark.elapsed();
            // dbg!(elapsed);
            assert!(elapsed <= Duration::from_millis(140));
        };
        block_on(future);
    }

    #[test]
    fn test_many_timers() {
        let mut timer = Timer::new();
        let futures = (0..100)
            .map(|i| timer.wait(Duration::from_millis(i % 10 + 1)))
            .collect::<Vec<_>>();
        block_on(futures::future::join_all(futures));
        assert!(timer.state.lock().unwrap().queue.is_empty());
    }

    #[test]
    fn test_timer_new() {
        let timer = Timer::new();
        assert!(timer.state.lock().unwrap().queue.is_empty());
    }


    #[test]
    fn test_timer_wait_single() {
        let mut timer = Timer::new();
        let start = Instant::now();
        let future = timer.wait(Duration::from_millis(50));
        
        block_on(future);
        let elapsed = start.elapsed();
        
        // Should take approximately 50ms, with some tolerance
        assert!(elapsed >= Duration::from_millis(45));
        assert!(elapsed <= Duration::from_millis(100));
    }

    #[test]
    fn test_timer_wait_multiple_sequential() {
        let mut timer = Timer::new();
        let start = Instant::now();
        
        // Wait for multiple timers sequentially
        block_on(timer.wait(Duration::from_millis(10)));
        block_on(timer.wait(Duration::from_millis(10)));
        block_on(timer.wait(Duration::from_millis(10)));
        
        let elapsed = start.elapsed();
        
        // Should take approximately 30ms total
        assert!(elapsed >= Duration::from_millis(25));
        assert!(elapsed <= Duration::from_millis(80));
    }

    #[test]
    fn test_timer_wait_multiple_concurrent() {
        let mut timer = Timer::new();
        let start = Instant::now();
        
        // Wait for multiple timers concurrently
        let future1 = timer.wait(Duration::from_millis(50));
        let future2 = timer.wait(Duration::from_millis(30));
        let future3 = timer.wait(Duration::from_millis(40));
        
        block_on(async {
            join!(future1, future2, future3);
        });
        let elapsed = start.elapsed();
        
        // Should take approximately 50ms (the longest)
        assert!(elapsed >= Duration::from_millis(45));
        assert!(elapsed <= Duration::from_millis(100));
    }

    #[test]
    fn test_timer_wait_zero_duration() {
        let mut timer = Timer::new();
        let start = Instant::now();
        let future = timer.wait(Duration::ZERO);
        
        block_on(future);
        let elapsed = start.elapsed();
        
        // Should complete very quickly
        assert!(elapsed <= Duration::from_millis(10));
    }

    #[test]
    fn test_timer_wait_very_short_duration() {
        let mut timer = Timer::new();
        let start = Instant::now();
        let future = timer.wait(Duration::from_millis(1));
        
        block_on(future);
        let elapsed = start.elapsed();
        
        // Should complete quickly
        assert!(elapsed <= Duration::from_millis(20));
    }

    #[test]
    fn test_timer_queue_ordering() {
        let mut timer = Timer::new();
        let start = Instant::now();
        
        // Add timers in reverse order
        let future1 = timer.wait(Duration::from_millis(100));
        let future2 = timer.wait(Duration::from_millis(50));
        let future3 = timer.wait(Duration::from_millis(75));
        
        // All should complete in approximately 100ms (the longest)
        block_on(async {
            join!(future1, future2, future3);
        });
        let elapsed = start.elapsed();
        
        assert!(elapsed >= Duration::from_millis(90));
        assert!(elapsed <= Duration::from_millis(150));
    }

    #[test]
    fn test_timer_drop_clears_queue() {
        let mut timer = Timer::new();
        
        // Add some timers
        let _future1 = timer.wait(Duration::from_millis(100));
        let _future2 = timer.wait(Duration::from_millis(200));
        
        // Queue should have timers
        assert!(!timer.state.lock().unwrap().queue.is_empty());
        
        // Drop the timer
        drop(timer);
        
        // Note: We can't test the queue after drop since we don't have access to it
        // But the drop implementation should clear the queue and notify the condvar
    }

    #[test]
    fn test_timer_concurrent_access() {
        let timer = Timer::new();
        let timer_arc = std::sync::Arc::new(std::sync::Mutex::new(timer));
        let mut handles = vec![];
        
        // Spawn multiple threads that add timers
        for i in 0..5 {
            let timer_clone = timer_arc.clone();
            let handle = std::thread::spawn(move || {
                let mut timer = timer_clone.lock().unwrap();
                let future = timer.wait(Duration::from_millis(10 + i * 10));
                futures::executor::block_on(future);
                i
            });
            handles.push(handle);
        }
        
        // All should complete successfully
        for handle in handles {
            let id = handle.join().expect("Thread panicked");
            println!("Timer thread {} completed", id);
        }
    }

    #[test]
    fn test_timer_shortest_detection() {
        let mut timer = Timer::new();
        let start = Instant::now();
        
        let future1 = timer.wait(Duration::from_millis(100));
        let future2 = timer.wait(Duration::from_millis(20));
        block_on(async {
            join!(future1, future2);
        });
        let elapsed = start.elapsed();
        
        assert!(elapsed >= Duration::from_millis(90));
        assert!(elapsed <= Duration::from_millis(150));
    }
}