stopper 0.2.8

an async stream and future stopper mechanism
Documentation
use futures_lite::{
    future::{self, poll_once},
    stream, StreamExt,
};
use std::future::{Future, IntoFuture};
use stopper::Stopper;
use test_harness::test;

#[cfg(not(all(loom, feature = "loom")))]
use std::thread::spawn;

#[cfg(all(loom, feature = "loom"))]
use loom::thread::spawn;

#[cfg(not(all(loom, feature = "loom")))]
#[track_caller]
fn harness<F, Fut>(test: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = ()> + Send,
{
    future::block_on(test());
}

#[cfg(all(loom, feature = "loom"))]
#[track_caller]
fn harness<F, Fut>(test: F)
where
    F: Fn() -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send,
{
    loom::model(move || loom::future::block_on(test()));
}

#[test(harness)]
async fn future_stopper() {
    let stopper = Stopper::new();
    let future = stopper.stop_future(future::pending::<()>());
    spawn(move || {
        stopper.stop();
    });
    assert_eq!(future.await, None);
}

#[test(harness)]
async fn stream_stopper() {
    let stopper = Stopper::new();
    let mut stream = stopper.stop_stream(stream::repeat("infinite stream"));
    spawn(move || {
        stopper.stop();
    });

    while let Some(item) = stream.next().await {
        println!("{}", item);
        #[cfg(all(loom, feature = "loom"))]
        loom::thread::yield_now();
    }

    assert_eq!(poll_once(stream.next()).await, Some(None));
}

#[test(harness)]
async fn stopped() {
    let stopper = Stopper::new();
    let future = stopper.clone().into_future();
    assert!(!stopper.is_stopped());
    assert!(future::poll_once(stopper.clone().into_future())
        .await
        .is_none());

    spawn({
        let stopper = stopper.clone();
        move || stopper.stop()
    });

    future.await;
    assert!(stopper.is_stopped());
    assert!(future::poll_once(stopper.into_future()).await.is_some());
}