tokio 1.24.1

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;

use std::collections::VecDeque;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

struct AssertDropHandle {
    is_dropped: Arc<AtomicBool>,
}
impl AssertDropHandle {
    #[track_caller]
    fn assert_dropped(&self) {
        assert!(self.is_dropped.load(Ordering::SeqCst));
    }

    #[track_caller]
    fn assert_not_dropped(&self) {
        assert!(!self.is_dropped.load(Ordering::SeqCst));
    }
}

struct AssertDrop {
    is_dropped: Arc<AtomicBool>,
}
impl AssertDrop {
    fn new() -> (Self, AssertDropHandle) {
        let shared = Arc::new(AtomicBool::new(false));
        (
            AssertDrop {
                is_dropped: shared.clone(),
            },
            AssertDropHandle {
                is_dropped: shared.clone(),
            },
        )
    }
}
impl Drop for AssertDrop {
    fn drop(&mut self) {
        self.is_dropped.store(true, Ordering::SeqCst);
    }
}

// A Notified does not shut down on drop, but it is dropped once the ref-count
// hits zero.
#[test]
fn create_drop1() {
    let (ad, handle) = AssertDrop::new();
    let (notified, join) = unowned(
        async {
            drop(ad);
            unreachable!()
        },
        NoopSchedule,
        Id::next(),
    );
    drop(notified);
    handle.assert_not_dropped();
    drop(join);
    handle.assert_dropped();
}

#[test]
fn create_drop2() {
    let (ad, handle) = AssertDrop::new();
    let (notified, join) = unowned(
        async {
            drop(ad);
            unreachable!()
        },
        NoopSchedule,
        Id::next(),
    );
    drop(join);
    handle.assert_not_dropped();
    drop(notified);
    handle.assert_dropped();
}

#[test]
fn drop_abort_handle1() {
    let (ad, handle) = AssertDrop::new();
    let (notified, join) = unowned(
        async {
            drop(ad);
            unreachable!()
        },
        NoopSchedule,
        Id::next(),
    );
    let abort = join.abort_handle();
    drop(join);
    handle.assert_not_dropped();
    drop(notified);
    handle.assert_not_dropped();
    drop(abort);
    handle.assert_dropped();
}

#[test]
fn drop_abort_handle2() {
    let (ad, handle) = AssertDrop::new();
    let (notified, join) = unowned(
        async {
            drop(ad);
            unreachable!()
        },
        NoopSchedule,
        Id::next(),
    );
    let abort = join.abort_handle();
    drop(notified);
    handle.assert_not_dropped();
    drop(abort);
    handle.assert_not_dropped();
    drop(join);
    handle.assert_dropped();
}

// Shutting down through Notified works
#[test]
fn create_shutdown1() {
    let (ad, handle) = AssertDrop::new();
    let (notified, join) = unowned(
        async {
            drop(ad);
            unreachable!()
        },
        NoopSchedule,
        Id::next(),
    );
    drop(join);
    handle.assert_not_dropped();
    notified.shutdown();
    handle.assert_dropped();
}

#[test]
fn create_shutdown2() {
    let (ad, handle) = AssertDrop::new();
    let (notified, join) = unowned(
        async {
            drop(ad);
            unreachable!()
        },
        NoopSchedule,
        Id::next(),
    );
    handle.assert_not_dropped();
    notified.shutdown();
    handle.assert_dropped();
    drop(join);
}

#[test]
fn unowned_poll() {
    let (task, _) = unowned(async {}, NoopSchedule, Id::next());
    task.run();
}

#[test]
fn schedule() {
    with(|rt| {
        rt.spawn(async {
            crate::task::yield_now().await;
        });

        assert_eq!(2, rt.tick());
        rt.shutdown();
    })
}

#[test]
fn shutdown() {
    with(|rt| {
        rt.spawn(async {
            loop {
                crate::task::yield_now().await;
            }
        });

        rt.tick_max(1);

        rt.shutdown();
    })
}

#[test]
fn shutdown_immediately() {
    with(|rt| {
        rt.spawn(async {
            loop {
                crate::task::yield_now().await;
            }
        });

        rt.shutdown();
    })
}

#[test]
fn spawn_during_shutdown() {
    static DID_SPAWN: AtomicBool = AtomicBool::new(false);

    struct SpawnOnDrop(Runtime);
    impl Drop for SpawnOnDrop {
        fn drop(&mut self) {
            DID_SPAWN.store(true, Ordering::SeqCst);
            self.0.spawn(async {});
        }
    }

    with(|rt| {
        let rt2 = rt.clone();
        rt.spawn(async move {
            let _spawn_on_drop = SpawnOnDrop(rt2);

            loop {
                crate::task::yield_now().await;
            }
        });

        rt.tick_max(1);
        rt.shutdown();
    });

    assert!(DID_SPAWN.load(Ordering::SeqCst));
}

fn with(f: impl FnOnce(Runtime)) {
    struct Reset;

    impl Drop for Reset {
        fn drop(&mut self) {
            let _rt = CURRENT.try_lock().unwrap().take();
        }
    }

    let _reset = Reset;

    let rt = Runtime(Arc::new(Inner {
        owned: OwnedTasks::new(),
        core: TryLock::new(Core {
            queue: VecDeque::new(),
        }),
    }));

    *CURRENT.try_lock().unwrap() = Some(rt.clone());
    f(rt)
}

#[derive(Clone)]
struct Runtime(Arc<Inner>);

struct Inner {
    core: TryLock<Core>,
    owned: OwnedTasks<Runtime>,
}

struct Core {
    queue: VecDeque<task::Notified<Runtime>>,
}

static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);

impl Runtime {
    fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
    where
        T: 'static + Send + Future,
        T::Output: 'static + Send,
    {
        let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());

        if let Some(notified) = notified {
            self.schedule(notified);
        }

        handle
    }

    fn tick(&self) -> usize {
        self.tick_max(usize::MAX)
    }

    fn tick_max(&self, max: usize) -> usize {
        let mut n = 0;

        while !self.is_empty() && n < max {
            let task = self.next_task();
            n += 1;
            let task = self.0.owned.assert_owner(task);
            task.run();
        }

        n
    }

    fn is_empty(&self) -> bool {
        self.0.core.try_lock().unwrap().queue.is_empty()
    }

    fn next_task(&self) -> task::Notified<Runtime> {
        self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
    }

    fn shutdown(&self) {
        let mut core = self.0.core.try_lock().unwrap();

        self.0.owned.close_and_shutdown_all();

        while let Some(task) = core.queue.pop_back() {
            drop(task);
        }

        drop(core);

        assert!(self.0.owned.is_empty());
    }
}

impl Schedule for Runtime {
    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
        self.0.owned.remove(task)
    }

    fn schedule(&self, task: task::Notified<Self>) {
        self.0.core.try_lock().unwrap().queue.push_back(task);
    }
}