async-lock 3.3.0

Async synchronization primitives
Documentation
mod common;

use std::future::Future;
use std::mem::forget;
use std::pin::Pin;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    mpsc, Arc,
};
use std::task::Context;
use std::task::Poll;
use std::thread;

use common::check_yields_when_contended;

use async_lock::Semaphore;
use futures_lite::{future, pin};

#[test]
fn try_acquire() {
    let s = Semaphore::new(2);
    let g1 = s.try_acquire().unwrap();
    let _g2 = s.try_acquire().unwrap();

    assert!(s.try_acquire().is_none());
    drop(g1);
    assert!(s.try_acquire().is_some());
}

#[test]
fn stress() {
    const COUNT: usize = if cfg!(miri) { 500 } else { 10_000 };

    let s = Arc::new(Semaphore::new(5));
    let (tx, rx) = mpsc::channel::<()>();

    for _ in 0..50 {
        let s = s.clone();
        let tx = tx.clone();

        thread::spawn(move || {
            future::block_on(async {
                for _ in 0..COUNT {
                    s.acquire().await;
                }
                drop(tx);
            })
        });
    }

    drop(tx);
    let _ = rx.recv();

    let _g1 = s.try_acquire().unwrap();
    let g2 = s.try_acquire().unwrap();
    let _g3 = s.try_acquire().unwrap();
    let _g4 = s.try_acquire().unwrap();
    let _g5 = s.try_acquire().unwrap();

    assert!(s.try_acquire().is_none());
    drop(g2);
    assert!(s.try_acquire().is_some());
}

#[test]
fn as_mutex() {
    let s = Arc::new(Semaphore::new(1));
    let s2 = s.clone();
    let _t = thread::spawn(move || {
        future::block_on(async {
            let _g = s2.acquire().await;
        });
    });
    future::block_on(async {
        let _g = s.acquire().await;
    });
}

#[test]
fn multi_resource() {
    let s = Arc::new(Semaphore::new(2));
    let s2 = s.clone();
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();
    let _t = thread::spawn(move || {
        future::block_on(async {
            let _g = s2.acquire().await;
            let _ = rx2.recv();
            tx1.send(()).unwrap();
        });
    });
    future::block_on(async {
        let _g = s.acquire().await;
        tx2.send(()).unwrap();
        rx1.recv().unwrap();
    });
}

#[test]
fn lifetime() {
    // Show that the future keeps the semaphore alive.
    let _fut = {
        let mutex = Arc::new(Semaphore::new(2));
        mutex.acquire_arc()
    };
}

#[test]
fn yields_when_contended() {
    let s = Semaphore::new(1);
    check_yields_when_contended(s.try_acquire().unwrap(), s.acquire());

    let s = Arc::new(s);
    check_yields_when_contended(s.try_acquire_arc().unwrap(), s.acquire_arc());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_blocking() {
    let s = Semaphore::new(2);
    let g1 = s.acquire_blocking();
    let _g2 = s.acquire_blocking();
    assert!(s.try_acquire().is_none());
    drop(g1);
    assert!(s.try_acquire().is_some());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
    let s = Arc::new(Semaphore::new(2));
    let g1 = s.acquire_arc_blocking();
    let _g2 = s.acquire_arc_blocking();
    assert!(s.try_acquire().is_none());
    drop(g1);
    assert!(s.try_acquire().is_some());
}

#[test]
fn add_permits() {
    static COUNTER: AtomicUsize = AtomicUsize::new(0);

    let s = Arc::new(Semaphore::new(0));
    let (tx, rx) = mpsc::channel::<()>();

    for _ in 0..50 {
        let s = s.clone();
        let tx = tx.clone();

        thread::spawn(move || {
            future::block_on(async {
                let perm = s.acquire().await;
                forget(perm);
                COUNTER.fetch_add(1, Ordering::Relaxed);
                drop(tx);
            })
        });
    }

    assert_eq!(COUNTER.load(Ordering::Relaxed), 0);

    s.add_permits(50);

    drop(tx);
    let _ = rx.recv();

    assert_eq!(COUNTER.load(Ordering::Relaxed), 50);
}

#[test]
fn add_permits_2() {
    future::block_on(AddPermitsTest);
}

struct AddPermitsTest;

impl Future for AddPermitsTest {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let s = Semaphore::new(0);
        let acq = s.acquire();
        pin!(acq);
        let acq_2 = s.acquire();
        pin!(acq_2);
        assert!(acq.as_mut().poll(cx).is_pending());
        assert!(acq_2.as_mut().poll(cx).is_pending());
        s.add_permits(1);
        let g = acq.poll(cx);
        assert!(g.is_ready());
        assert!(acq_2.poll(cx).is_pending());

        Poll::Ready(())
    }
}