async-foundation 0.2.1

Foundational async primitives for Rust - timers, networking, and common utilities
Documentation
use crate::common::ready_future::ReadyFuture;

/// A simple observable that lets multiple waiters be notified when an event occurs.
///
/// `ReadyObservable` starts in a non-ready state; callers can obtain a
/// [`ReadyFuture<()>`] via [`ReadyObservable::wait`]. When [`ReadyObservable::complete`]
/// is called, all pending waiters are woken and subsequent calls to `wait` return
/// an already-completed future.
pub struct ReadyObservable {
    ready: bool,
    observers: Vec<ReadyFuture<()>>,
}

impl ReadyObservable {
    pub fn complete(&mut self) {
        self.ready = true;
        for observer in self.observers.drain(..) {
            observer.get_state().complete(());
        }
    }

    pub fn is_ready(&self) -> bool {
        self.ready
    }

    pub fn wait(&mut self) -> ReadyFuture<()> {
        if self.is_ready() {
            ReadyFuture::new_completed(())
        } else {
            let future = ReadyFuture::new();
            self.observers.push(future.clone());
            future
        }
    }
}

impl Default for ReadyObservable {
    fn default() -> Self {
        Self {
            ready: false,
            observers: Vec::with_capacity(50),
        }
    }
}

#[cfg(test)]
mod tests {
    use std::{thread, time::Duration};

    use super::*;

    #[test]
    fn test_multiple_awaits_on_observable() {
        let mut ready = ReadyObservable::default();
        const COUNT: usize = 5;
        let mut handles = vec![];

        for _ in 0..COUNT {
            let ready = ready.wait();
            let handle =
                thread::spawn(move || futures::executor::block_on(async move { ready.await }));
            handles.push(handle);
        }
        assert_eq!(handles.len(), COUNT);
        thread::sleep(Duration::from_millis(100));
        ready.complete();

        let mut counter = 0usize;
        for handle in handles {
            handle.join().expect("Thread panicked");
            eprintln!("handler ready {counter}");
            counter += 1;
        }
        assert_eq!(counter, COUNT);
    }

    #[test]
    fn test_ready_observable_default() {
        let ready = ReadyObservable::default();
        assert!(!ready.is_ready());
    }

    #[test]
    fn test_ready_observable_complete() {
        let mut ready = ReadyObservable::default();
        assert!(!ready.is_ready());
        
        ready.complete();
        assert!(ready.is_ready());
    }

    #[test]
    fn test_wait_when_ready() {
        let mut ready = ReadyObservable::default();
        ready.complete();
        
        let future = ready.wait();
        let result = futures::executor::block_on(future);
        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
    }

    #[test]
    fn test_wait_when_not_ready() {
        let mut ready = ReadyObservable::default();
        assert!(!ready.is_ready());
        
        let future = ready.wait();
        assert!(!ready.is_ready()); // Should still not be ready
        
        let ready_clone = std::sync::Arc::new(std::sync::Mutex::new(ready));
        let ready_for_complete = ready_clone.clone();
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(50));
            ready_for_complete.lock().unwrap().complete();
        });
        
        let result = futures::executor::block_on(future);
        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
        
        handle.join().expect("Thread panicked");
        assert!(ready_clone.lock().unwrap().is_ready());
    }

    #[test]
    fn test_multiple_waits_before_complete() {
        let mut ready = ReadyObservable::default();
        
        let future1 = ready.wait();
        let future2 = ready.wait();
        let future3 = ready.wait();
        
        assert!(!ready.is_ready());
        
        ready.complete();
        assert!(ready.is_ready());
        
        let result1 = futures::executor::block_on(future1);
        let result2 = futures::executor::block_on(future2);
        let result3 = futures::executor::block_on(future3);
        
        assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
        assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
        assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
    }

    #[test]
    fn test_wait_after_complete() {
        let mut ready = ReadyObservable::default();
        ready.complete();
        assert!(ready.is_ready());
        
        let future1 = ready.wait();
        let future2 = ready.wait();
        let future3 = ready.wait();
        
        let result1 = futures::executor::block_on(future1);
        let result2 = futures::executor::block_on(future2);
        let result3 = futures::executor::block_on(future3);
        
        assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
        assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
        assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
    }

    #[test]
    fn test_observers_cleared_after_complete() {
        let mut ready = ReadyObservable::default();
        
        let _future1 = ready.wait();
        let _future2 = ready.wait();
        
        ready.complete();
        
        let future3 = ready.wait();
        let result = futures::executor::block_on(future3);
        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
    }

    #[test]
    fn test_concurrent_waits_and_complete() {
        let ready = ReadyObservable::default();
        let ready_arc = std::sync::Arc::new(std::sync::Mutex::new(ready));
        let mut handles = vec![];
        
        for i in 0..10 {
            let ready_clone = ready_arc.clone();
            let handle = thread::spawn(move || {
                let future = ready_clone.lock().unwrap().wait();
                let result = futures::executor::block_on(future);
                (i, result)
            });
            handles.push(handle);
        }
        
        thread::sleep(Duration::from_millis(100));
        ready_arc.lock().unwrap().complete();
        
        for handle in handles {
            let (id, result) = handle.join().expect("Thread panicked");
            assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
            println!("Thread {} completed", id);
        }
    }
}

#[cfg(test)]
#[allow(dead_code)]
mod web_tests {
    use super::*;
    use futures::join;
    use wasm_bindgen_test::wasm_bindgen_test;

    #[wasm_bindgen_test]
    async fn test_web_multiple_simultaneous_awaits() {
        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
        multiple_simultaneous_awaits().await;
    }

    #[wasm_bindgen_test]
    async fn test_node_multiple_simultaneous_awaits() {
        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
        multiple_simultaneous_awaits().await;
    }

    async fn multiple_simultaneous_awaits() {
        let mut ready = ReadyObservable::default();
        let ready_clone1 = ready.wait();
        let ready_clone2 = ready.wait();
        let ready_clone3 = ready.wait();

        let fut1 = async {
            ready_clone1.await;
        };
        let fut2 = async {
            ready_clone2.await;
        };
        let fut3 = async {
            ready_clone3.await;
        };

        ready.complete();
        assert_eq!(join!(fut1, fut2, fut3), ((), (), ()));
    }
}