async-cpupool 0.4.0

A simple async threadpool for CPU-bound tasks
Documentation
#![cfg(loom)]

fn assert_counts<T>(queue: async_cpupool::tests::queue::Queue<T>) {
    assert_eq!(
        async_cpupool::tests::queue::queue_count(),
        1,
        "Should have created one queue"
    );

    assert!(
        async_cpupool::tests::notify::notify_count() > 0,
        "Should have created notifies"
    );

    drop(queue);

    assert_eq!(
        async_cpupool::tests::queue::queue_count(),
        0,
        "Should have dropped queue"
    );

    assert_eq!(
        async_cpupool::tests::notify::notify_count(),
        0,
        "Should have dropped notifies"
    );
}

#[test]
fn push_pop() {
    loom::model(|| {
        let queue = async_cpupool::tests::queue::bounded(1);

        assert!(queue.try_push(()).is_none(), "Failed to push item");

        assert!(queue.try_push(()).is_some(), "Shouldn't have pushed item");

        assert!(queue.try_pop().is_some(), "Failed to pop item");

        assert!(queue.try_pop().is_none(), "Shoudln't have popped item");

        assert_eq!(queue.len(), 0, "Should have popped pushed item");

        assert!(queue.try_push(()).is_none(), "Failed to push item");

        assert_counts(queue);
    });
}

#[test]
fn async_push_pop() {
    loom::model(|| {
        let queue = async_cpupool::tests::queue::bounded(1);

        loom::future::block_on(async {
            queue.push(()).await;
            queue.pop().await;
        });

        assert_eq!(queue.len(), 0, "Should have popped pushed item");

        assert_counts(queue);
    });
}

#[test]
fn threaded_push_pop() {
    loom::model(|| {
        let queue = async_cpupool::tests::queue::bounded(1);

        let q2 = queue.clone();
        let handle = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.pop().await;
            });

            drop(q2);
        });

        assert!(queue.try_push(()).is_none(), "failed to push item");
        handle.join().unwrap();

        assert_eq!(queue.len(), 0, "Should have popped pushed item");

        assert_counts(queue);
    });
}

#[test]
fn multiple_threaded_push_pop() {
    loom::model(move || {
        let queue = async_cpupool::tests::queue::bounded(1);

        let q2 = queue.clone();
        let h1 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.pop().await;
                q2.pop().await;
            });

            drop(q2);
        });

        loom::future::block_on(async {
            queue.push(()).await;
            queue.push(()).await;
        });

        h1.join().unwrap();

        assert_eq!(queue.len(), 0, "Should have popped both pushed items");

        assert_counts(queue);
    });
}

#[test]
fn multiple_threaded_push_pop_2() {
    loom::model(|| {
        let queue = async_cpupool::tests::queue::bounded(1);

        let q2 = queue.clone();
        let h1 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.push(()).await;
            });

            drop(q2);
        });

        let q2 = queue.clone();
        let h2 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.push(()).await;
            });

            drop(q2);
        });

        loom::future::block_on(async {
            queue.pop().await;
            queue.pop().await;
        });

        h1.join().unwrap();
        h2.join().unwrap();

        assert_eq!(queue.len(), 0, "Should have popped both pushed items");

        assert_counts(queue);
    });
}

#[test]
fn multiple_threaded_push_pop_3() {
    loom::model(|| {
        let queue = async_cpupool::tests::queue::bounded(1);

        let q2 = queue.clone();
        let h1 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.push(()).await;
            });

            drop(q2);
        });

        let q2 = queue.clone();
        let h2 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.push(()).await;
            });

            drop(q2);
        });

        let q2 = queue.clone();
        let h3 = loom::thread::spawn(move || {
            loom::future::block_on(async {
                q2.pop().await;
            });

            drop(q2);
        });

        loom::future::block_on(async {
            queue.pop().await;
        });

        h1.join().unwrap();
        h2.join().unwrap();
        h3.join().unwrap();

        assert_eq!(queue.len(), 0, "Should have popped both pushed items");

        assert_counts(queue);
    });
}