async-cpupool 0.4.0

A simple async threadpool for CPU-bound tasks
Documentation
#![cfg(loom)]

use async_cpupool::tests::notify::Notify;
use std::future::Future;

struct NoopWaker;

impl std::task::Wake for NoopWaker {
    fn wake(self: std::sync::Arc<Self>) {}
    fn wake_by_ref(self: &std::sync::Arc<Self>) {}
}

fn noop_waker() -> std::task::Waker {
    std::sync::Arc::new(NoopWaker).into()
}

#[test]
fn dropped_notified_listener() {
    loom::model(|| {
        let notify = Notify::new();

        loom::future::block_on(async {
            let listener = notify.listen().await;

            notify.notify_one();
            drop(listener);

            assert_eq!(
                notify.token(),
                1,
                "Dropped notify should not have consumed token"
            );
        });

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}

#[test]
fn threaded_dropped_notified_listener() {
    loom::model(|| {
        let notify = loom::sync::Arc::new(Notify::new());

        let notify2 = notify.clone();
        let handle = loom::thread::spawn(move || {
            loom::future::block_on(async move {
                drop(notify2.listen().await);
            });
        });

        notify.notify_one();

        handle.join().unwrap();

        assert_eq!(
            notify.token(),
            1,
            "Dropped notify should not have consumed token"
        );

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}

#[test]
fn notified_listener() {
    loom::model(|| {
        let notify = Notify::new();

        loom::future::block_on(async {
            let mut listener = notify.listen().await;

            notify.notify_one();

            let waker = noop_waker();
            let mut cx = std::task::Context::from_waker(&waker);

            assert_eq!(
                std::pin::Pin::new(&mut listener).poll(&mut cx),
                std::task::Poll::Ready(()),
                "Polled listen should be notified"
            );
            assert_eq!(
                notify.token(),
                0,
                "Dropped notify should have consumed token"
            );
        });

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}

#[test]
fn threaded_notified_listener() {
    loom::model(|| {
        let notify = loom::sync::Arc::new(Notify::new());

        let notify2 = notify.clone();
        let handle = loom::thread::spawn(move || {
            loom::future::block_on(async move {
                notify2.listen().await.await;
            });
        });

        notify.notify_one();

        handle.join().unwrap();

        assert_eq!(
            notify.token(),
            0,
            "Dropped notify should have consumed token"
        );

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}

#[test]
fn multiple_listeners() {
    loom::model(|| {
        let notify = Notify::new();

        loom::future::block_on(async {
            let mut listener_1 = notify.listen().await;
            let mut listener_2 = notify.listen().await;

            notify.notify_one();

            let waker = noop_waker();
            let mut cx = std::task::Context::from_waker(&waker);

            assert_eq!(
                std::pin::Pin::new(&mut listener_1).poll(&mut cx),
                std::task::Poll::Ready(()),
                "Polled listen_1 should be notified"
            );

            assert_eq!(
                std::pin::Pin::new(&mut listener_2).poll(&mut cx),
                std::task::Poll::Pending,
                "Polled listen_2 should not be notified"
            );
        });

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}

#[test]
fn multiple_notifies() {
    loom::model(|| {
        let notify = Notify::new();

        loom::future::block_on(async {
            let mut listener_1 = notify.listen().await;
            let mut listener_2 = notify.listen().await;

            notify.notify_one();
            notify.notify_one();

            assert_eq!(notify.token(), 0, "listeners should have consumed tokens");

            let waker = noop_waker();
            let mut cx = std::task::Context::from_waker(&waker);

            assert_eq!(
                std::pin::Pin::new(&mut listener_1).poll(&mut cx),
                std::task::Poll::Ready(()),
                "Polled listen_1 should be notified"
            );

            assert_eq!(
                std::pin::Pin::new(&mut listener_2).poll(&mut cx),
                std::task::Poll::Ready(()),
                "Polled listen_2 should be notified"
            );
        });

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}

#[test]
fn threaded_multiple_notifies() {
    loom::model(|| {
        let notify = loom::sync::Arc::new(Notify::new());

        let notify2 = notify.clone();
        let handle1 = loom::thread::spawn(move || {
            loom::future::block_on(async move {
                notify2.listen().await.await;
            })
        });

        let notify2 = notify.clone();
        let handle2 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                notify2.listen().await.await;
            });

            drop(notify2);
        });

        notify.notify_one();
        notify.notify_one();

        handle1.join().unwrap();
        handle2.join().unwrap();

        assert_eq!(
            notify.token(),
            0,
            "threaded notifies should have consumed tokens"
        );

        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            1,
            "Should have created one Notify"
        );
        drop(notify);
        assert_eq!(
            async_cpupool::tests::notify::notify_count(),
            0,
            "Should have dropped notify"
        );
    });
}