tokio 1.13.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use std::future::Future;
use std::panic;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::runtime::Builder;
use crate::sync::oneshot;
use crate::task::JoinHandle;

use futures::future::FutureExt;

// Enums for each option in the combinations being tested

#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiRuntime {
    CurrentThread,
    Multi1,
    Multi2,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiLocalSet {
    Yes,
    No,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiTask {
    PanicOnRun,
    PanicOnDrop,
    PanicOnRunAndDrop,
    NoPanic,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiOutput {
    PanicOnDrop,
    NoPanic,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiJoinInterest {
    Polled,
    NotPolled,
}
#[allow(clippy::enum_variant_names)] // we aren't using glob imports
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiJoinHandle {
    DropImmediately = 1,
    DropFirstPoll = 2,
    DropAfterNoConsume = 3,
    DropAfterConsume = 4,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiAbort {
    NotAborted = 0,
    AbortedImmediately = 1,
    AbortedFirstPoll = 2,
    AbortedAfterFinish = 3,
    AbortedAfterConsumeOutput = 4,
}

#[test]
fn test_combinations() {
    let mut rt = &[
        CombiRuntime::CurrentThread,
        CombiRuntime::Multi1,
        CombiRuntime::Multi2,
    ][..];

    if cfg!(miri) {
        rt = &[CombiRuntime::CurrentThread];
    }

    let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
    let task = [
        CombiTask::NoPanic,
        CombiTask::PanicOnRun,
        CombiTask::PanicOnDrop,
        CombiTask::PanicOnRunAndDrop,
    ];
    let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
    let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
    let jh = [
        CombiJoinHandle::DropImmediately,
        CombiJoinHandle::DropFirstPoll,
        CombiJoinHandle::DropAfterNoConsume,
        CombiJoinHandle::DropAfterConsume,
    ];
    let abort = [
        CombiAbort::NotAborted,
        CombiAbort::AbortedImmediately,
        CombiAbort::AbortedFirstPoll,
        CombiAbort::AbortedAfterFinish,
        CombiAbort::AbortedAfterConsumeOutput,
    ];

    for rt in rt.iter().copied() {
        for ls in ls.iter().copied() {
            for task in task.iter().copied() {
                for output in output.iter().copied() {
                    for ji in ji.iter().copied() {
                        for jh in jh.iter().copied() {
                            for abort in abort.iter().copied() {
                                test_combination(rt, ls, task, output, ji, jh, abort);
                            }
                        }
                    }
                }
            }
        }
    }
}

fn test_combination(
    rt: CombiRuntime,
    ls: CombiLocalSet,
    task: CombiTask,
    output: CombiOutput,
    ji: CombiJoinInterest,
    jh: CombiJoinHandle,
    abort: CombiAbort,
) {
    if (jh as usize) < (abort as usize) {
        // drop before abort not possible
        return;
    }
    if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
        // this causes double panic
        return;
    }
    if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
        // this causes double panic
        return;
    }

    println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort);

    // A runtime optionally with a LocalSet
    struct Rt {
        rt: crate::runtime::Runtime,
        ls: Option<crate::task::LocalSet>,
    }
    impl Rt {
        fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
            let rt = match rt {
                CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
                CombiRuntime::Multi1 => Builder::new_multi_thread()
                    .worker_threads(1)
                    .build()
                    .unwrap(),
                CombiRuntime::Multi2 => Builder::new_multi_thread()
                    .worker_threads(2)
                    .build()
                    .unwrap(),
            };

            let ls = match ls {
                CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
                CombiLocalSet::No => None,
            };

            Self { rt, ls }
        }
        fn block_on<T>(&self, task: T) -> T::Output
        where
            T: Future,
        {
            match &self.ls {
                Some(ls) => ls.block_on(&self.rt, task),
                None => self.rt.block_on(task),
            }
        }
        fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
        where
            T: Future + Send + 'static,
            T::Output: Send + 'static,
        {
            match &self.ls {
                Some(ls) => ls.spawn_local(task),
                None => self.rt.spawn(task),
            }
        }
    }

    // The type used for the output of the future
    struct Output {
        panic_on_drop: bool,
        on_drop: Option<oneshot::Sender<()>>,
    }
    impl Output {
        fn disarm(&mut self) {
            self.panic_on_drop = false;
        }
    }
    impl Drop for Output {
        fn drop(&mut self) {
            let _ = self.on_drop.take().unwrap().send(());
            if self.panic_on_drop {
                panic!("Panicking in Output");
            }
        }
    }

    // A wrapper around the future that is spawned
    struct FutWrapper<F> {
        inner: F,
        on_drop: Option<oneshot::Sender<()>>,
        panic_on_drop: bool,
    }
    impl<F: Future> Future for FutWrapper<F> {
        type Output = F::Output;
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
            unsafe {
                let me = Pin::into_inner_unchecked(self);
                let inner = Pin::new_unchecked(&mut me.inner);
                inner.poll(cx)
            }
        }
    }
    impl<F> Drop for FutWrapper<F> {
        fn drop(&mut self) {
            let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
            if self.panic_on_drop {
                panic!("Panicking in FutWrapper");
            }
        }
    }

    // The channels passed to the task
    struct Signals {
        on_first_poll: Option<oneshot::Sender<()>>,
        wait_complete: Option<oneshot::Receiver<()>>,
        on_output_drop: Option<oneshot::Sender<()>>,
    }

    // The task we will spawn
    async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
        // Signal that we have been polled once
        let _ = signal.on_first_poll.take().unwrap().send(());

        // Wait for a signal, then complete the future
        let _ = signal.wait_complete.take().unwrap().await;

        // If the task gets past wait_complete without yielding, then aborts
        // may not be caught without this yield_now.
        crate::task::yield_now().await;

        if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
            panic!("Panicking in my_task on {:?}", std::thread::current().id());
        }

        Output {
            panic_on_drop: out == CombiOutput::PanicOnDrop,
            on_drop: signal.on_output_drop.take(),
        }
    }

    let rt = Rt::new(rt, ls);

    let (on_first_poll, wait_first_poll) = oneshot::channel();
    let (on_complete, wait_complete) = oneshot::channel();
    let (on_future_drop, wait_future_drop) = oneshot::channel();
    let (on_output_drop, wait_output_drop) = oneshot::channel();
    let signal = Signals {
        on_first_poll: Some(on_first_poll),
        wait_complete: Some(wait_complete),
        on_output_drop: Some(on_output_drop),
    };

    // === Spawn task ===
    let mut handle = Some(rt.spawn(FutWrapper {
        inner: my_task(signal, task, output),
        on_drop: Some(on_future_drop),
        panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
    }));

    // Keep track of whether the task has been killed with an abort
    let mut aborted = false;

    // If we want to poll the JoinHandle, do it now
    if ji == CombiJoinInterest::Polled {
        assert!(
            handle.as_mut().unwrap().now_or_never().is_none(),
            "Polling handle succeeded"
        );
    }

    if abort == CombiAbort::AbortedImmediately {
        handle.as_mut().unwrap().abort();
        aborted = true;
    }
    if jh == CombiJoinHandle::DropImmediately {
        drop(handle.take().unwrap());
    }

    // === Wait for first poll ===
    let got_polled = rt.block_on(wait_first_poll).is_ok();
    if !got_polled {
        // it's possible that we are aborted but still got polled
        assert!(
            aborted,
            "Task completed without ever being polled but was not aborted."
        );
    }

    if abort == CombiAbort::AbortedFirstPoll {
        handle.as_mut().unwrap().abort();
        aborted = true;
    }
    if jh == CombiJoinHandle::DropFirstPoll {
        drop(handle.take().unwrap());
    }

    // Signal the future that it can return now
    let _ = on_complete.send(());
    // === Wait for future to be dropped ===
    assert!(
        rt.block_on(wait_future_drop).is_ok(),
        "The future should always be dropped."
    );

    if abort == CombiAbort::AbortedAfterFinish {
        // Don't set aborted to true here as the task already finished
        handle.as_mut().unwrap().abort();
    }
    if jh == CombiJoinHandle::DropAfterNoConsume {
        // The runtime will usually have dropped every ref-count at this point,
        // in which case dropping the JoinHandle drops the output.
        //
        // (But it might race and still hold a ref-count)
        let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
            drop(handle.take().unwrap());
        }));
        if panic.is_err() {
            assert!(
                (output == CombiOutput::PanicOnDrop)
                    && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
                    && !aborted,
                "Dropping JoinHandle shouldn't panic here"
            );
        }
    }

    // Check whether we drop after consuming the output
    if jh == CombiJoinHandle::DropAfterConsume {
        // Using as_mut() to not immediately drop the handle
        let result = rt.block_on(handle.as_mut().unwrap());

        match result {
            Ok(mut output) => {
                // Don't panic here.
                output.disarm();
                assert!(!aborted, "Task was aborted but returned output");
            }
            Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
            Err(err) if err.is_panic() => {
                assert!(
                    (task == CombiTask::PanicOnRun)
                        || (task == CombiTask::PanicOnDrop)
                        || (task == CombiTask::PanicOnRunAndDrop)
                        || (output == CombiOutput::PanicOnDrop),
                    "Panic but nothing should panic"
                );
            }
            _ => unreachable!(),
        }

        let handle = handle.take().unwrap();
        if abort == CombiAbort::AbortedAfterConsumeOutput {
            handle.abort();
        }
        drop(handle);
    }

    // The output should have been dropped now. Check whether the output
    // object was created at all.
    let output_created = rt.block_on(wait_output_drop).is_ok();
    assert_eq!(
        output_created,
        (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
        "Creation of output object"
    );
}