tokio 1.22.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]

#[cfg(tokio_wasm_not_wasi)]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[cfg(tokio_wasm_not_wasi)]
use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;

#[cfg(not(tokio_wasm_not_wasi))]
use tokio::test as maybe_tokio_test;

use std::task::Poll;

use futures::future::FutureExt;

use tokio::sync::RwLock;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};

#[test]
fn into_inner() {
    let rwlock = RwLock::new(42);
    assert_eq!(rwlock.into_inner(), 42);
}

// multiple reads should be Ready
#[test]
fn read_shared() {
    let rwlock = RwLock::new(100);

    let mut t1 = spawn(rwlock.read());
    let _g1 = assert_ready!(t1.poll());
    let mut t2 = spawn(rwlock.read());
    let _g2 = assert_ready!(t2.poll());
}

// When there is an active shared owner, exclusive access should not be possible
#[test]
fn write_shared_pending() {
    let rwlock = RwLock::new(100);
    let mut t1 = spawn(rwlock.read());

    let _g1 = assert_ready!(t1.poll());
    let mut t2 = spawn(rwlock.write());
    assert_pending!(t2.poll());
}

// When there is an active exclusive owner, subsequent exclusive access should not be possible
#[test]
fn read_exclusive_pending() {
    let rwlock = RwLock::new(100);
    let mut t1 = spawn(rwlock.write());

    let _g1 = assert_ready!(t1.poll());
    let mut t2 = spawn(rwlock.read());
    assert_pending!(t2.poll());
}

// If the max shared access is reached and subsequent shared access is pending
// should be made available when one of the shared accesses is dropped
#[test]
fn exhaust_reading() {
    let rwlock = RwLock::with_max_readers(100, 1024);
    let mut reads = Vec::new();
    loop {
        let mut t = spawn(rwlock.read());
        match t.poll() {
            Poll::Ready(guard) => reads.push(guard),
            Poll::Pending => break,
        }
    }

    let mut t1 = spawn(rwlock.read());
    assert_pending!(t1.poll());
    let g2 = reads.pop().unwrap();
    drop(g2);
    assert!(t1.is_woken());
    let _g1 = assert_ready!(t1.poll());
}

// When there is an active exclusive owner, subsequent exclusive access should not be possible
#[test]
fn write_exclusive_pending() {
    let rwlock = RwLock::new(100);
    let mut t1 = spawn(rwlock.write());

    let _g1 = assert_ready!(t1.poll());
    let mut t2 = spawn(rwlock.write());
    assert_pending!(t2.poll());
}

// When there is an active shared owner, exclusive access should be possible after shared is dropped
#[test]
fn write_shared_drop() {
    let rwlock = RwLock::new(100);
    let mut t1 = spawn(rwlock.read());

    let g1 = assert_ready!(t1.poll());
    let mut t2 = spawn(rwlock.write());
    assert_pending!(t2.poll());
    drop(g1);
    assert!(t2.is_woken());
    let _g2 = assert_ready!(t2.poll());
}

// when there is an active shared owner, and exclusive access is triggered,
// subsequent shared access should not be possible as write gathers all the available semaphore permits
#[test]
fn write_read_shared_pending() {
    let rwlock = RwLock::new(100);
    let mut t1 = spawn(rwlock.read());
    let _g1 = assert_ready!(t1.poll());

    let mut t2 = spawn(rwlock.read());
    let _g2 = assert_ready!(t2.poll());

    let mut t3 = spawn(rwlock.write());
    assert_pending!(t3.poll());

    let mut t4 = spawn(rwlock.read());
    assert_pending!(t4.poll());
}

// when there is an active shared owner, and exclusive access is triggered,
// reading should be possible after pending exclusive access is dropped
#[test]
fn write_read_shared_drop_pending() {
    let rwlock = RwLock::new(100);
    let mut t1 = spawn(rwlock.read());
    let _g1 = assert_ready!(t1.poll());

    let mut t2 = spawn(rwlock.write());
    assert_pending!(t2.poll());

    let mut t3 = spawn(rwlock.read());
    assert_pending!(t3.poll());
    drop(t2);

    assert!(t3.is_woken());
    let _t3 = assert_ready!(t3.poll());
}

// Acquire an RwLock nonexclusively by a single task
#[maybe_tokio_test]
async fn read_uncontested() {
    let rwlock = RwLock::new(100);
    let result = *rwlock.read().await;

    assert_eq!(result, 100);
}

// Acquire an uncontested RwLock in exclusive mode
#[maybe_tokio_test]
async fn write_uncontested() {
    let rwlock = RwLock::new(100);
    let mut result = rwlock.write().await;
    *result += 50;
    assert_eq!(*result, 150);
}

// RwLocks should be acquired in the order that their Futures are waited upon.
#[maybe_tokio_test]
async fn write_order() {
    let rwlock = RwLock::<Vec<u32>>::new(vec![]);
    let fut2 = rwlock.write().map(|mut guard| guard.push(2));
    let fut1 = rwlock.write().map(|mut guard| guard.push(1));
    fut1.await;
    fut2.await;

    let g = rwlock.read().await;
    assert_eq!(*g, vec![1, 2]);
}

// A single RwLock is contested by tasks in multiple threads
#[cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn multithreaded() {
    use futures::stream::{self, StreamExt};
    use std::sync::Arc;
    use tokio::sync::Barrier;

    let barrier = Arc::new(Barrier::new(5));
    let rwlock = Arc::new(RwLock::<u32>::new(0));
    let rwclone1 = rwlock.clone();
    let rwclone2 = rwlock.clone();
    let rwclone3 = rwlock.clone();
    let rwclone4 = rwlock.clone();

    let b1 = barrier.clone();
    tokio::spawn(async move {
        stream::iter(0..1000)
            .for_each(move |_| {
                let rwlock = rwclone1.clone();
                async move {
                    let mut guard = rwlock.write().await;
                    *guard += 2;
                }
            })
            .await;
        b1.wait().await;
    });

    let b2 = barrier.clone();
    tokio::spawn(async move {
        stream::iter(0..1000)
            .for_each(move |_| {
                let rwlock = rwclone2.clone();
                async move {
                    let mut guard = rwlock.write().await;
                    *guard += 3;
                }
            })
            .await;
        b2.wait().await;
    });

    let b3 = barrier.clone();
    tokio::spawn(async move {
        stream::iter(0..1000)
            .for_each(move |_| {
                let rwlock = rwclone3.clone();
                async move {
                    let mut guard = rwlock.write().await;
                    *guard += 5;
                }
            })
            .await;
        b3.wait().await;
    });

    let b4 = barrier.clone();
    tokio::spawn(async move {
        stream::iter(0..1000)
            .for_each(move |_| {
                let rwlock = rwclone4.clone();
                async move {
                    let mut guard = rwlock.write().await;
                    *guard += 7;
                }
            })
            .await;
        b4.wait().await;
    });

    barrier.wait().await;
    let g = rwlock.read().await;
    assert_eq!(*g, 17_000);
}

#[maybe_tokio_test]
async fn try_write() {
    let lock = RwLock::new(0);
    let read_guard = lock.read().await;
    assert!(lock.try_write().is_err());
    drop(read_guard);
    assert!(lock.try_write().is_ok());
}

#[test]
fn try_read_try_write() {
    let lock: RwLock<usize> = RwLock::new(15);

    {
        let rg1 = lock.try_read().unwrap();
        assert_eq!(*rg1, 15);

        assert!(lock.try_write().is_err());

        let rg2 = lock.try_read().unwrap();
        assert_eq!(*rg2, 15)
    }

    {
        let mut wg = lock.try_write().unwrap();
        *wg = 1515;

        assert!(lock.try_read().is_err())
    }

    assert_eq!(*lock.try_read().unwrap(), 1515);
}