tempest-rt 0.0.1

TempestDB Deterministic Async Runtime
Documentation
//! Single-threaded async runtime that drives futures and I/O to completion.
//!
//! The runtime polls all spawned tasks and the root future on every tick, then calls
//! [`Io::poll`] to process I/O completions. Use [`block_on`] for one-shot execution or
//! [`Runtime`] directly when you need access to the underlying I/O instance between runs.

use std::{
    any::TypeId,
    pin::{Pin, pin},
    task::{Context, Poll},
};

use slab::Slab;
use tempest_io::Io;

use crate::{
    context::{CURRENT_CONTEXT, RuntimeContext, TaskId, WakeSets, make_waker, parse_op_handle},
    task::Tasks,
};

/// Single-threaded async runtime owning an I/O backend and a set of spawned tasks.
pub struct Runtime<I: Io> {
    io: I,
    tasks: Tasks,
    finished_tasks: Vec<usize>,
    wake_sets: WakeSets,
    next_op: u64,
}

impl<I: Io> Runtime<I> {
    pub fn new(io: I) -> Self {
        Self {
            io,
            tasks: Slab::new(),
            finished_tasks: Vec::new(),
            wake_sets: WakeSets::default(),
            next_op: 0,
        }
    }

    /// Returns a mutable reference to the underlying I/O instance.
    pub fn inspect_io(&mut self) -> &mut I {
        &mut self.io
    }

    fn wake_active_by_io_completions(&mut self) {
        for (handle, _) in self.io.completions() {
            let (task_id, _) = parse_op_handle(*handle);
            self.wake_sets.active.insert(task_id);
        }
    }

    pub fn tick<F: Future>(&mut self, fut: &mut Pin<&mut F>) -> Poll<F::Output> {
        self.wake_sets.swap();

        let ctx = RuntimeContext {
            type_id: TypeId::of::<I>(),
            io: &mut self.io as *mut I as *mut (),
            tasks: &mut self.tasks as *mut _,
            // TODO: only inject the staging wake set here?
            wake_sets: &mut self.wake_sets as *mut _,
            next_op: &mut self.next_op as *mut _,
        };
        CURRENT_CONTEXT.set(Some(ctx));

        // wake up anything that was completed after poll
        self.io.poll().expect("fatal: io poll failed");
        self.wake_active_by_io_completions();

        assert!(
            !self.wake_sets.active.is_empty() || self.io.in_flight() > 0,
            "deadlock: wake set is empty and no I/O in flight"
        );

        // park runtime until io completes something
        if self.wake_sets.active.is_empty() {
            self.io.park().expect("fatal: io park failed");
            self.wake_active_by_io_completions();
        }

        let mut result = Poll::Pending;
        // NB: take out active because it does not support drain, but put it back afterwards
        // PERF: take() uses BTreeMap::default(), which does not allocate unless inserting, so it's
        // fine here, as long as we put active back afterwards
        let mut active = std::mem::take(&mut self.wake_sets.active);
        for &task in &active {
            let waker = make_waker(task);
            let mut cx = Context::from_waker(&waker);
            match task {
                TaskId::Main => {
                    result = fut.as_mut().poll(&mut cx);
                }
                TaskId::Task(id) => {
                    let index = id.get() as usize;
                    if let Poll::Ready(()) = self.tasks[index].as_mut().poll(&mut cx) {
                        self.finished_tasks.push(index);
                    }
                }
            }
        }
        // clear here, because .drain() is missing
        active.clear();
        self.wake_sets.active = active;

        // -- drop all finished tasks --
        for key in self.finished_tasks.drain(..) {
            // discard the future, since it resolved
            let _ = self.tasks.remove(key);
        }

        assert!(self.io.completions().is_empty(), "leaked io completions");

        CURRENT_CONTEXT.set(None);

        result
    }

    /// Runs `fut` to completion, driving all spawned tasks and I/O on each tick.
    pub fn block_on<F: Future>(&mut self, fut: F) -> F::Output {
        let mut fut = pin!(fut);
        self.wake_sets.staging.insert(TaskId::Main);

        loop {
            // drive the main future
            if let Poll::Ready(value) = self.tick(&mut fut) {
                return value;
            }
        }
    }
}

impl<I> Default for Runtime<I>
where
    I: Io + Default,
{
    fn default() -> Self {
        Self::new(I::default())
    }
}

/// Convenience wrapper that creates a [`Runtime`] and runs `fut` to completion.
pub fn block_on<I: Io, F: Future>(io: I, fut: F) -> F::Output {
    Runtime::new(io).block_on(fut)
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempest_io::VirtualIo;

    #[test]
    fn immediate_ready() {
        let io = VirtualIo::default();
        let result = block_on(io, async { 42 });
        assert_eq!(result, 42);
    }

    #[test]
    fn multiple_ticks() {
        let io = VirtualIo::default();
        let mut polls = 0u32;
        let result = block_on(
            io,
            std::future::poll_fn(|cx| {
                polls += 1;
                if polls >= 3 {
                    Poll::Ready("done")
                } else {
                    cx.waker().wake_by_ref();
                    Poll::Pending
                }
            }),
        );
        assert_eq!(result, "done");
        assert_eq!(polls, 3);
    }
}