tokio 0.2.19

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use crate::sync::batch_semaphore::*;

use futures::future::poll_fn;
use loom::future::block_on;
use loom::sync::atomic::AtomicUsize;
use loom::thread;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};

#[test]
fn basic_usage() {
    const NUM: usize = 2;

    struct Shared {
        semaphore: Semaphore,
        active: AtomicUsize,
    }

    async fn actor(shared: Arc<Shared>) {
        shared.semaphore.acquire(1).await.unwrap();
        let actual = shared.active.fetch_add(1, SeqCst);
        assert!(actual <= NUM - 1);

        let actual = shared.active.fetch_sub(1, SeqCst);
        assert!(actual <= NUM);
        shared.semaphore.release(1);
    }

    loom::model(|| {
        let shared = Arc::new(Shared {
            semaphore: Semaphore::new(NUM),
            active: AtomicUsize::new(0),
        });

        for _ in 0..NUM {
            let shared = shared.clone();

            thread::spawn(move || {
                block_on(actor(shared));
            });
        }

        block_on(actor(shared));
    });
}

#[test]
fn release() {
    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(1));

        {
            let semaphore = semaphore.clone();
            thread::spawn(move || {
                block_on(semaphore.acquire(1)).unwrap();
                semaphore.release(1);
            });
        }

        block_on(semaphore.acquire(1)).unwrap();

        semaphore.release(1);
    });
}

#[test]
fn basic_closing() {
    const NUM: usize = 2;

    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(1));

        for _ in 0..NUM {
            let semaphore = semaphore.clone();

            thread::spawn(move || {
                for _ in 0..2 {
                    block_on(semaphore.acquire(1)).map_err(|_| ())?;

                    semaphore.release(1);
                }

                Ok::<(), ()>(())
            });
        }

        semaphore.close();
    });
}

#[test]
fn concurrent_close() {
    const NUM: usize = 3;

    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(1));

        for _ in 0..NUM {
            let semaphore = semaphore.clone();

            thread::spawn(move || {
                block_on(semaphore.acquire(1)).map_err(|_| ())?;
                semaphore.release(1);
                semaphore.close();

                Ok::<(), ()>(())
            });
        }
    });
}

#[test]
fn concurrent_cancel() {
    async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
        let mut acquire1 = Some(semaphore.acquire(1));
        let mut acquire2 = Some(semaphore.acquire(1));
        poll_fn(|cx| {
            // poll the acquire future once, and then immediately throw
            // it away. this simulates a situation where a future is
            // polled and then cancelled, such as by a timeout.
            if let Some(acquire) = acquire1.take() {
                pin!(acquire);
                let _ = acquire.poll(cx);
            }
            if let Some(acquire) = acquire2.take() {
                pin!(acquire);
                let _ = acquire.poll(cx);
            }
            Poll::Ready(())
        })
        .await
    }

    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(0));
        let t1 = {
            let semaphore = semaphore.clone();
            thread::spawn(move || block_on(poll_and_cancel(semaphore)))
        };
        let t2 = {
            let semaphore = semaphore.clone();
            thread::spawn(move || block_on(poll_and_cancel(semaphore)))
        };
        let t3 = {
            let semaphore = semaphore.clone();
            thread::spawn(move || block_on(poll_and_cancel(semaphore)))
        };

        t1.join().unwrap();
        semaphore.release(10);
        t2.join().unwrap();
        t3.join().unwrap();
    });
}

#[test]
fn batch() {
    let mut b = loom::model::Builder::new();
    b.preemption_bound = Some(1);

    b.check(|| {
        let semaphore = Arc::new(Semaphore::new(10));
        let active = Arc::new(AtomicUsize::new(0));
        let mut ths = vec![];

        for _ in 0..2 {
            let semaphore = semaphore.clone();
            let active = active.clone();

            ths.push(thread::spawn(move || {
                for n in &[4, 10, 8] {
                    block_on(semaphore.acquire(*n)).unwrap();

                    active.fetch_add(*n as usize, SeqCst);

                    let num_active = active.load(SeqCst);
                    assert!(num_active <= 10);

                    thread::yield_now();

                    active.fetch_sub(*n as usize, SeqCst);

                    semaphore.release(*n as usize);
                }
            }));
        }

        for th in ths.into_iter() {
            th.join().unwrap();
        }

        assert_eq!(10, semaphore.available_permits());
    });
}

#[test]
fn release_during_acquire() {
    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(10));
        semaphore
            .try_acquire(8)
            .expect("try_acquire should succeed; semaphore uncontended");
        let semaphore2 = semaphore.clone();
        let thread = thread::spawn(move || block_on(semaphore2.acquire(4)).unwrap());

        semaphore.release(8);
        thread.join().unwrap();
        semaphore.release(4);
        assert_eq!(10, semaphore.available_permits());
    })
}