tokio 0.2.19

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use crate::sync::semaphore_ll::*;

use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};

#[test]
fn basic_usage() {
    const NUM: usize = 2;

    struct Actor {
        waiter: Permit,
        shared: Arc<Shared>,
    }

    struct Shared {
        semaphore: Semaphore,
        active: AtomicUsize,
    }

    impl Future for Actor {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
            let me = &mut *self;

            ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();

            let actual = me.shared.active.fetch_add(1, SeqCst);
            assert!(actual <= NUM - 1);

            let actual = me.shared.active.fetch_sub(1, SeqCst);
            assert!(actual <= NUM);

            me.waiter.release(1, &me.shared.semaphore);

            Ready(())
        }
    }

    loom::model(|| {
        let shared = Arc::new(Shared {
            semaphore: Semaphore::new(NUM),
            active: AtomicUsize::new(0),
        });

        for _ in 0..NUM {
            let shared = shared.clone();

            thread::spawn(move || {
                block_on(Actor {
                    waiter: Permit::new(),
                    shared,
                });
            });
        }

        block_on(Actor {
            waiter: Permit::new(),
            shared,
        });
    });
}

#[test]
fn release() {
    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(1));

        {
            let semaphore = semaphore.clone();
            thread::spawn(move || {
                let mut permit = Permit::new();

                block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();

                permit.release(1, &semaphore);
            });
        }

        let mut permit = Permit::new();

        block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();

        permit.release(1, &semaphore);
    });
}

#[test]
fn basic_closing() {
    const NUM: usize = 2;

    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(1));

        for _ in 0..NUM {
            let semaphore = semaphore.clone();

            thread::spawn(move || {
                let mut permit = Permit::new();

                for _ in 0..2 {
                    block_on(poll_fn(|cx| {
                        permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
                    }))?;

                    permit.release(1, &semaphore);
                }

                Ok::<(), ()>(())
            });
        }

        semaphore.close();
    });
}

#[test]
fn concurrent_close() {
    const NUM: usize = 3;

    loom::model(|| {
        let semaphore = Arc::new(Semaphore::new(1));

        for _ in 0..NUM {
            let semaphore = semaphore.clone();

            thread::spawn(move || {
                let mut permit = Permit::new();

                block_on(poll_fn(|cx| {
                    permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
                }))?;

                permit.release(1, &semaphore);

                semaphore.close();

                Ok::<(), ()>(())
            });
        }
    });
}

#[test]
fn batch() {
    let mut b = loom::model::Builder::new();
    b.preemption_bound = Some(1);

    b.check(|| {
        let semaphore = Arc::new(Semaphore::new(10));
        let active = Arc::new(AtomicUsize::new(0));
        let mut ths = vec![];

        for _ in 0..2 {
            let semaphore = semaphore.clone();
            let active = active.clone();

            ths.push(thread::spawn(move || {
                let mut permit = Permit::new();

                for n in &[4, 10, 8] {
                    block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();

                    active.fetch_add(*n as usize, SeqCst);

                    let num_active = active.load(SeqCst);
                    assert!(num_active <= 10);

                    thread::yield_now();

                    active.fetch_sub(*n as usize, SeqCst);

                    permit.release(*n, &semaphore);
                }
            }));
        }

        for th in ths.into_iter() {
            th.join().unwrap();
        }

        assert_eq!(10, semaphore.available_permits());
    });
}