tokio 1.37.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads

use tokio::{runtime, task, time};
use tokio_test::assert_ok;

use std::thread;
use std::time::Duration;

mod support {
    pub(crate) mod mpsc_stream;
}

#[tokio::test]
async fn basic_blocking() {
    // Run a few times
    for _ in 0..100 {
        let out = assert_ok!(
            tokio::spawn(async {
                assert_ok!(
                    task::spawn_blocking(|| {
                        thread::sleep(Duration::from_millis(5));
                        "hello"
                    })
                    .await
                )
            })
            .await
        );

        assert_eq!(out, "hello");
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn block_in_blocking() {
    // Run a few times
    for _ in 0..100 {
        let out = assert_ok!(
            tokio::spawn(async {
                assert_ok!(
                    task::spawn_blocking(|| {
                        task::block_in_place(|| {
                            thread::sleep(Duration::from_millis(5));
                        });
                        "hello"
                    })
                    .await
                )
            })
            .await
        );

        assert_eq!(out, "hello");
    }
}

#[tokio::test(flavor = "multi_thread")]
async fn block_in_block() {
    // Run a few times
    for _ in 0..100 {
        let out = assert_ok!(
            tokio::spawn(async {
                task::block_in_place(|| {
                    task::block_in_place(|| {
                        thread::sleep(Duration::from_millis(5));
                    });
                    "hello"
                })
            })
            .await
        );

        assert_eq!(out, "hello");
    }
}

#[tokio::test(flavor = "current_thread")]
#[should_panic]
async fn no_block_in_current_thread_scheduler() {
    task::block_in_place(|| {});
}

#[test]
fn yes_block_in_threaded_block_on() {
    let rt = runtime::Runtime::new().unwrap();
    rt.block_on(async {
        task::block_in_place(|| {});
    });
}

#[test]
#[should_panic]
fn no_block_in_current_thread_block_on() {
    let rt = runtime::Builder::new_current_thread().build().unwrap();
    rt.block_on(async {
        task::block_in_place(|| {});
    });
}

#[test]
fn can_enter_current_thread_rt_from_within_block_in_place() {
    let outer = tokio::runtime::Runtime::new().unwrap();

    outer.block_on(async {
        tokio::task::block_in_place(|| {
            let inner = tokio::runtime::Builder::new_current_thread()
                .build()
                .unwrap();

            inner.block_on(async {})
        })
    });
}

#[test]
#[cfg(panic = "unwind")]
fn useful_panic_message_when_dropping_rt_in_rt() {
    use std::panic::{catch_unwind, AssertUnwindSafe};

    let outer = tokio::runtime::Runtime::new().unwrap();

    let result = catch_unwind(AssertUnwindSafe(|| {
        outer.block_on(async {
            let _ = tokio::runtime::Builder::new_current_thread()
                .build()
                .unwrap();
        });
    }));

    assert!(result.is_err());
    let err = result.unwrap_err();
    let err: &'static str = err.downcast_ref::<&'static str>().unwrap();

    assert!(
        err.contains("Cannot drop a runtime"),
        "Wrong panic message: {:?}",
        err
    );
}

#[test]
fn can_shutdown_with_zero_timeout_in_runtime() {
    let outer = tokio::runtime::Runtime::new().unwrap();

    outer.block_on(async {
        let rt = tokio::runtime::Builder::new_current_thread()
            .build()
            .unwrap();
        rt.shutdown_timeout(Duration::from_nanos(0));
    });
}

#[test]
fn can_shutdown_now_in_runtime() {
    let outer = tokio::runtime::Runtime::new().unwrap();

    outer.block_on(async {
        let rt = tokio::runtime::Builder::new_current_thread()
            .build()
            .unwrap();
        rt.shutdown_background();
    });
}

#[test]
fn coop_disabled_in_block_in_place() {
    let outer = tokio::runtime::Builder::new_multi_thread()
        .enable_time()
        .build()
        .unwrap();

    let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();

    for i in 0..200 {
        tx.send(i).unwrap();
    }
    drop(tx);

    outer.block_on(async move {
        let jh = tokio::spawn(async move {
            tokio::task::block_in_place(move || {
                futures::executor::block_on(async move {
                    use tokio_stream::StreamExt;
                    assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
                })
            })
        });

        tokio::time::timeout(Duration::from_secs(1), jh)
            .await
            .expect("timed out (probably hanging)")
            .unwrap()
    });
}

#[test]
fn coop_disabled_in_block_in_place_in_block_on() {
    let (done_tx, done_rx) = std::sync::mpsc::channel();
    let done = done_tx.clone();
    thread::spawn(move || {
        let outer = tokio::runtime::Runtime::new().unwrap();

        let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();

        for i in 0..200 {
            tx.send(i).unwrap();
        }
        drop(tx);

        outer.block_on(async move {
            tokio::task::block_in_place(move || {
                futures::executor::block_on(async move {
                    use tokio_stream::StreamExt;
                    assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
                })
            })
        });

        let _ = done.send(Ok(()));
    });

    thread::spawn(move || {
        thread::sleep(Duration::from_secs(1));
        let _ = done_tx.send(Err("timed out (probably hanging)"));
    });

    done_rx.recv().unwrap().unwrap();
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_when_paused() {
    // Do not auto-advance time when we have started a blocking task that has
    // not yet finished.
    time::timeout(
        Duration::from_secs(3),
        task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
    )
    .await
    .expect("timeout should not trigger")
    .expect("blocking task should finish");

    // Really: Do not auto-advance time, even if the timeout is short and the
    // blocking task runs for longer than that. It doesn't matter: Tokio time
    // is paused; system time is not.
    time::timeout(
        Duration::from_millis(1),
        task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
    )
    .await
    .expect("timeout should not trigger")
    .expect("blocking task should finish");
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_task_wakes_paused_runtime() {
    let t0 = std::time::Instant::now();
    time::timeout(
        Duration::from_secs(15),
        task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
    )
    .await
    .expect("timeout should not trigger")
    .expect("blocking task should finish");
    assert!(
        t0.elapsed() < Duration::from_secs(10),
        "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
    );
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn unawaited_blocking_task_wakes_paused_runtime() {
    let t0 = std::time::Instant::now();

    // When this task finishes, time should auto-advance, even though the
    // JoinHandle has not been awaited yet.
    let a = task::spawn_blocking(|| {
        thread::sleep(Duration::from_millis(1));
    });

    crate::time::sleep(Duration::from_secs(15)).await;
    a.await.expect("blocking task should finish");
    assert!(
        t0.elapsed() < Duration::from_secs(10),
        "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
    );
}

#[cfg(panic = "unwind")]
#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn panicking_blocking_task_wakes_paused_runtime() {
    let t0 = std::time::Instant::now();
    let result = time::timeout(
        Duration::from_secs(15),
        task::spawn_blocking(|| {
            thread::sleep(Duration::from_millis(1));
            panic!("blocking task panicked");
        }),
    )
    .await
    .expect("timeout should not trigger");
    assert!(result.is_err(), "blocking task should have panicked");
    assert!(
        t0.elapsed() < Duration::from_secs(10),
        "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
    );
}