use crate::common::ready_future::ReadyFuture;
pub struct ReadyObservable {
ready: bool,
observers: Vec<ReadyFuture<()>>,
}
impl ReadyObservable {
pub fn complete(&mut self) {
self.ready = true;
for observer in self.observers.drain(..) {
observer.get_state().complete(());
}
}
pub fn is_ready(&self) -> bool {
self.ready
}
pub fn wait(&mut self) -> ReadyFuture<()> {
if self.is_ready() {
ReadyFuture::new_completed(())
} else {
let future = ReadyFuture::new();
self.observers.push(future.clone());
future
}
}
}
impl Default for ReadyObservable {
fn default() -> Self {
Self {
ready: false,
observers: Vec::with_capacity(50),
}
}
}
#[cfg(test)]
mod tests {
use std::{thread, time::Duration};
use super::*;
#[test]
fn test_multiple_awaits_on_observable() {
let mut ready = ReadyObservable::default();
const COUNT: usize = 5;
let mut handles = vec![];
for _ in 0..COUNT {
let ready = ready.wait();
let handle =
thread::spawn(move || futures::executor::block_on(async move { ready.await }));
handles.push(handle);
}
assert_eq!(handles.len(), COUNT);
thread::sleep(Duration::from_millis(100));
ready.complete();
let mut counter = 0usize;
for handle in handles {
handle.join().expect("Thread panicked");
eprintln!("handler ready {counter}");
counter += 1;
}
assert_eq!(counter, COUNT);
}
#[test]
fn test_ready_observable_default() {
let ready = ReadyObservable::default();
assert!(!ready.is_ready());
}
#[test]
fn test_ready_observable_complete() {
let mut ready = ReadyObservable::default();
assert!(!ready.is_ready());
ready.complete();
assert!(ready.is_ready());
}
#[test]
fn test_wait_when_ready() {
let mut ready = ReadyObservable::default();
ready.complete();
let future = ready.wait();
let result = futures::executor::block_on(future);
assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
}
#[test]
fn test_wait_when_not_ready() {
let mut ready = ReadyObservable::default();
assert!(!ready.is_ready());
let future = ready.wait();
assert!(!ready.is_ready());
let ready_clone = std::sync::Arc::new(std::sync::Mutex::new(ready));
let ready_for_complete = ready_clone.clone();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
ready_for_complete.lock().unwrap().complete();
});
let result = futures::executor::block_on(future);
assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
handle.join().expect("Thread panicked");
assert!(ready_clone.lock().unwrap().is_ready());
}
#[test]
fn test_multiple_waits_before_complete() {
let mut ready = ReadyObservable::default();
let future1 = ready.wait();
let future2 = ready.wait();
let future3 = ready.wait();
assert!(!ready.is_ready());
ready.complete();
assert!(ready.is_ready());
let result1 = futures::executor::block_on(future1);
let result2 = futures::executor::block_on(future2);
let result3 = futures::executor::block_on(future3);
assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
}
#[test]
fn test_wait_after_complete() {
let mut ready = ReadyObservable::default();
ready.complete();
assert!(ready.is_ready());
let future1 = ready.wait();
let future2 = ready.wait();
let future3 = ready.wait();
let result1 = futures::executor::block_on(future1);
let result2 = futures::executor::block_on(future2);
let result3 = futures::executor::block_on(future3);
assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
}
#[test]
fn test_observers_cleared_after_complete() {
let mut ready = ReadyObservable::default();
let _future1 = ready.wait();
let _future2 = ready.wait();
ready.complete();
let future3 = ready.wait();
let result = futures::executor::block_on(future3);
assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
}
#[test]
fn test_concurrent_waits_and_complete() {
let ready = ReadyObservable::default();
let ready_arc = std::sync::Arc::new(std::sync::Mutex::new(ready));
let mut handles = vec![];
for i in 0..10 {
let ready_clone = ready_arc.clone();
let handle = thread::spawn(move || {
let future = ready_clone.lock().unwrap().wait();
let result = futures::executor::block_on(future);
(i, result)
});
handles.push(handle);
}
thread::sleep(Duration::from_millis(100));
ready_arc.lock().unwrap().complete();
for handle in handles {
let (id, result) = handle.join().expect("Thread panicked");
assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
println!("Thread {} completed", id);
}
}
}
#[cfg(test)]
#[allow(dead_code)]
mod web_tests {
use super::*;
use futures::join;
use wasm_bindgen_test::wasm_bindgen_test;
#[wasm_bindgen_test]
async fn test_web_multiple_simultaneous_awaits() {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
multiple_simultaneous_awaits().await;
}
#[wasm_bindgen_test]
async fn test_node_multiple_simultaneous_awaits() {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
multiple_simultaneous_awaits().await;
}
async fn multiple_simultaneous_awaits() {
let mut ready = ReadyObservable::default();
let ready_clone1 = ready.wait();
let ready_clone2 = ready.wait();
let ready_clone3 = ready.wait();
let fut1 = async {
ready_clone1.await;
};
let fut2 = async {
ready_clone2.await;
};
let fut3 = async {
ready_clone3.await;
};
ready.complete();
assert_eq!(join!(fut1, fut2, fut3), ((), (), ()));
}
}