vexide-async 0.2.0

Tiny async executor for vexide.
Documentation
use std::{
    cell::RefCell,
    collections::VecDeque,
    future::Future,
    rc::Rc,
    sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    },
    task::{Context, Poll},
    time::Duration,
};

use waker_fn::waker_fn;

use super::reactor::Reactor;
use crate::{
    local::TaskLocalStorage,
    task::{Task, TaskMetadata},
};

type Runnable = async_task::Runnable<TaskMetadata>;

thread_local! {
    pub(crate) static EXECUTOR: Executor = const { Executor::new() };
}

pub(crate) struct Executor {
    queue: RefCell<VecDeque<Runnable>>,
    reactor: RefCell<Reactor>,
    pub(crate) tls: RefCell<Option<Rc<TaskLocalStorage>>>,
}

impl Executor {
    pub const fn new() -> Self {
        Self {
            queue: RefCell::new(VecDeque::new()),
            reactor: RefCell::new(Reactor::new()),
            tls: RefCell::new(None),
        }
    }

    pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
        let metadata = TaskMetadata {
            tls: Rc::new(TaskLocalStorage::new()),
        };

        // SAFETY: `runnable` will never be moved off this thread or shared with another thread
        // because of the `!Send + !Sync` bounds on `Self`. Both `future` and `schedule` are
        // `'static` so they cannot be used after being freed.
        //
        // TODO: Makesure that the waker can never be sent off the thread.
        let (runnable, task) = unsafe {
            async_task::Builder::new()
                .metadata(metadata)
                .spawn_unchecked(
                    move |_| future,
                    |runnable| {
                        self.queue.borrow_mut().push_back(runnable);
                    },
                )
        };

        runnable.schedule();

        task
    }

    /// Run the provided closure with the reactor.
    /// Used to ensure the thread safety of the executor.
    pub(crate) fn with_reactor(&self, f: impl FnOnce(&mut Reactor)) {
        f(&mut self.reactor.borrow_mut());
    }

    /// Wakes any expired sleepers, then polls a single task. If all tasks were sleeping,
    /// returns how long it will be until one is awake.
    pub(crate) fn tick(&self) -> Option<Duration> {
        let next_wake = self.reactor.borrow_mut().tick();

        let runnable = {
            let mut queue = self.queue.borrow_mut();
            queue.pop_front()
        };

        if let Some(runnable) = runnable {
            TaskLocalStorage::scope(runnable.metadata().tls.clone(), || {
                runnable.run();
            });

            None
        } else {
            Some(next_wake)
        }
    }

    pub fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
        let woken = Arc::new(AtomicBool::new(false));
        let waker = waker_fn({
            let woken = woken.clone();
            move || woken.store(true, Ordering::Relaxed)
        });
        let mut cx = Context::from_waker(&waker);

        futures_util::pin_mut!(future);

        let mut was_woken = true;
        loop {
            if was_woken && let Poll::Ready(output) = future.as_mut().poll(&mut cx) {
                return output;
            }

            unsafe {
                vex_sdk::vexTasksRun();
            }

            let next_wake = self.tick();
            // This is updated only after the tick because another task could have woken up the
            // future in that time.
            was_woken = woken.swap(false, Ordering::Relaxed);

            // Yield to OS on desktop platforms to avoid high CPU usage while all tasks are
            // sleeping. On VEXos, this behavior is disabled so that devices are updated
            // as fast as possible.
            if cfg!(not(target_os = "vexos"))
                && let Some(next_wake) = next_wake
                && !was_woken
            {
                // We should still be polling vexTasksRun fairly often.
                const MAX_YIELD: Duration = Duration::from_millis(5);

                // N.B. `next_wake` takes into account when the future passed into
                // this function will wake, if it is sleeping.
                let time_to_yield = Duration::min(MAX_YIELD, next_wake);

                std::thread::sleep(time_to_yield);
            }
        }
    }
}

#[cfg(test)]
mod test {
    use vex_sdk_mock as _;

    use super::*;

    #[test]
    fn spawns_task() {
        let executor = Executor::new();

        let result = executor.block_on(executor.spawn(async { 1 }));

        assert_eq!(result, 1);
    }
}