compact-waitgroup 0.1.3

A compact asynchronous WaitGroup synchronization primitive.
Documentation
use core::{
    panic::{RefUnwindSafe, UnwindSafe},
    pin::Pin,
    task::{Context, Poll},
};

use futures_test::task::new_count_waker;
use static_assertions::{assert_impl_all, assert_not_impl_any};

use crate::{MonoWaitGroup, MonoWorkerHandle, WaitGroup, WorkerHandle};

assert_impl_all!(WaitGroup: Sync, Send, UnwindSafe, RefUnwindSafe);
assert_impl_all!(WorkerHandle: Sync, Send, UnwindSafe, RefUnwindSafe, Clone);
assert_impl_all!(MonoWaitGroup: Sync, Send, UnwindSafe, RefUnwindSafe);
assert_impl_all!(MonoWorkerHandle: Sync, Send, UnwindSafe, RefUnwindSafe);

assert_not_impl_any!(WaitGroup: Clone);
assert_not_impl_any!(MonoWorkerHandle: Clone);
assert_not_impl_any!(MonoWaitGroup: Clone);

#[test]
fn test_wg_done() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);
    let (wg, handle) = WaitGroup::new();
    let mut rx = core::pin::pin!(wg);
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Pending);
    handle.done();
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(counter.get(), 1);
}

#[test]
fn test_wg_done_twice() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);
    let (wg, handle_a) = WaitGroup::new();
    let handle_b = handle_a.clone();
    let mut rx = core::pin::pin!(wg);
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Pending);
    handle_a.done();
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Pending);
    handle_b.done();
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(counter.get(), 1);
}

#[test]
fn test_wg_done_twice_rev() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);
    let (wg, handle_a) = WaitGroup::new();
    let handle_b = handle_a.clone();
    let mut rx = core::pin::pin!(wg);
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Pending);
    handle_b.done();
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Pending);
    handle_a.done();
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(counter.get(), 1);
}

#[test]
fn test_mono_wg_done() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);
    let (wg, handle) = MonoWaitGroup::new();
    let mut rx = core::pin::pin!(wg);
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Pending);
    handle.done();
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(counter.get(), 1);
}

#[test]
fn test_wg_send_before_poll() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);
    let (wg, handle) = WaitGroup::new();
    handle.done();
    let mut rx = core::pin::pin!(wg);
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(counter.get(), 0);
}

#[test]
fn test_mono_wg_send_before_poll() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);
    let (wg, handle) = MonoWaitGroup::new();
    handle.done();
    let mut rx = core::pin::pin!(wg);
    assert_eq!(rx.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(counter.get(), 0);
}

#[test]
fn test_wg_drop_before_send() {
    let (wg, handle) = WaitGroup::new();
    drop(wg);
    handle.done();
}

#[test]
fn test_mono_wg_drop_before_send() {
    let (wg, handle) = MonoWaitGroup::new();
    drop(wg);
    handle.done();
}

#[test]
#[should_panic]
#[allow(unreachable_code)]
fn test_wg_panic_both() {
    let (_wg, _handle) = WaitGroup::new();
    panic!();
    drop((_wg, _handle));
}

#[test]
#[should_panic]
#[allow(unreachable_code)]
fn test_wg_panic_wg() {
    let (_wg, handle) = WaitGroup::new();
    drop(handle);
    panic!();
    drop(_wg);
}

#[test]
#[should_panic]
#[allow(unreachable_code)]
fn test_wg_panic_handle() {
    let (wg, _handle) = WaitGroup::new();
    drop(wg);
    panic!();
    drop(_handle);
}

#[test]
#[should_panic]
#[allow(unreachable_code)]
fn test_mono_wg_panic_both() {
    let (_wg, _handle) = MonoWaitGroup::new();
    panic!();
    drop((_wg, _handle));
}

#[test]
#[should_panic]
#[allow(unreachable_code)]
fn test_mono_wg_panic_wg() {
    let (_wg, handle) = MonoWaitGroup::new();
    drop(handle);
    panic!();
    drop(_wg);
}

#[test]
#[should_panic]
#[allow(unreachable_code)]
fn test_mono_wg_panic_handle() {
    let (wg, _handle) = MonoWaitGroup::new();
    drop(wg);
    panic!();
    drop(_handle);
}

#[test]
fn test_wg_poll_by_others() {
    let (waker_a, counter_a) = new_count_waker();
    let (waker_b, counter_b) = new_count_waker();

    let (wg, handle) = WaitGroup::new();
    let mut wg = core::pin::pin!(wg);

    let mut cx = Context::from_waker(&waker_a);
    assert_eq!(wg.as_mut().poll(&mut cx), Poll::Pending);
    assert_eq!(counter_a.get(), 0);

    let mut cx = Context::from_waker(&waker_b);
    assert_eq!(wg.as_mut().poll(&mut cx), Poll::Pending);
    assert_eq!(counter_a.get(), 0);
    assert_eq!(counter_b.get(), 0);

    handle.done();

    assert_eq!(counter_a.get(), 0);
    assert_eq!(counter_b.get(), 1);

    assert_eq!(wg.as_mut().poll(&mut cx), Poll::Ready(()));
    assert_eq!(wg.as_mut().poll(&mut cx), Poll::Ready(()));

    assert_eq!(counter_a.get(), 0);
    assert_eq!(counter_b.get(), 1);
}

#[test]
fn test_wg_drop_early() {
    let (waker, counter) = new_count_waker();
    let mut cx = Context::from_waker(&waker);

    let (mut wg, handle) = WaitGroup::new();
    let pinned_wg = Pin::new(&mut wg);
    assert_eq!(pinned_wg.poll(&mut cx), Poll::Pending);

    drop(wg);

    handle.done();
    assert_eq!(counter.get(), 0);
}