Skip to main content

tempest_rt/
runtime.rs

1//! Single-threaded async runtime that drives futures and I/O to completion.
2//!
3//! The runtime polls all spawned tasks and the root future on every tick, then calls
4//! [`Io::poll`] to process I/O completions. Use [`block_on`] for one-shot execution or
5//! [`Runtime`] directly when you need access to the underlying I/O instance between runs.
6
7use std::{
8    any::TypeId,
9    pin::{Pin, pin},
10    task::{Context, Poll},
11};
12
13use slab::Slab;
14use tempest_io::Io;
15
16use crate::{
17    context::{CURRENT_CONTEXT, RuntimeContext, TaskId, WakeSets, make_waker, parse_op_handle},
18    task::Tasks,
19};
20
21/// Single-threaded async runtime owning an I/O backend and a set of spawned tasks.
22pub struct Runtime<I: Io> {
23    io: I,
24    tasks: Tasks,
25    finished_tasks: Vec<usize>,
26    wake_sets: WakeSets,
27    next_op: u64,
28}
29
30impl<I: Io> Runtime<I> {
31    pub fn new(io: I) -> Self {
32        Self {
33            io,
34            tasks: Slab::new(),
35            finished_tasks: Vec::new(),
36            wake_sets: WakeSets::default(),
37            next_op: 0,
38        }
39    }
40
41    /// Returns a mutable reference to the underlying I/O instance.
42    pub fn inspect_io(&mut self) -> &mut I {
43        &mut self.io
44    }
45
46    fn wake_active_by_io_completions(&mut self) {
47        for (handle, _) in self.io.completions() {
48            let (task_id, _) = parse_op_handle(*handle);
49            self.wake_sets.active.insert(task_id);
50        }
51    }
52
53    pub fn tick<F: Future>(&mut self, fut: &mut Pin<&mut F>) -> Poll<F::Output> {
54        self.wake_sets.swap();
55
56        let ctx = RuntimeContext {
57            type_id: TypeId::of::<I>(),
58            io: &mut self.io as *mut I as *mut (),
59            tasks: &mut self.tasks as *mut _,
60            // TODO: only inject the staging wake set here?
61            wake_sets: &mut self.wake_sets as *mut _,
62            next_op: &mut self.next_op as *mut _,
63        };
64        CURRENT_CONTEXT.set(Some(ctx));
65
66        // wake up anything that was completed after poll
67        self.io.poll().expect("fatal: io poll failed");
68        self.wake_active_by_io_completions();
69
70        assert!(
71            !self.wake_sets.active.is_empty() || self.io.in_flight() > 0,
72            "deadlock: wake set is empty and no I/O in flight"
73        );
74
75        // park runtime until io completes something
76        if self.wake_sets.active.is_empty() {
77            self.io.park().expect("fatal: io park failed");
78            self.wake_active_by_io_completions();
79        }
80
81        let mut result = Poll::Pending;
82        // NB: take out active because it does not support drain, but put it back afterwards
83        // PERF: take() uses BTreeMap::default(), which does not allocate unless inserting, so it's
84        // fine here, as long as we put active back afterwards
85        let mut active = std::mem::take(&mut self.wake_sets.active);
86        for &task in &active {
87            let waker = make_waker(task);
88            let mut cx = Context::from_waker(&waker);
89            match task {
90                TaskId::Main => {
91                    result = fut.as_mut().poll(&mut cx);
92                }
93                TaskId::Task(id) => {
94                    let index = id.get() as usize;
95                    if let Poll::Ready(()) = self.tasks[index].as_mut().poll(&mut cx) {
96                        self.finished_tasks.push(index);
97                    }
98                }
99            }
100        }
101        // clear here, because .drain() is missing
102        active.clear();
103        self.wake_sets.active = active;
104
105        // -- drop all finished tasks --
106        for key in self.finished_tasks.drain(..) {
107            // discard the future, since it resolved
108            let _ = self.tasks.remove(key);
109        }
110
111        assert!(self.io.completions().is_empty(), "leaked io completions");
112
113        CURRENT_CONTEXT.set(None);
114
115        result
116    }
117
118    /// Runs `fut` to completion, driving all spawned tasks and I/O on each tick.
119    pub fn block_on<F: Future>(&mut self, fut: F) -> F::Output {
120        let mut fut = pin!(fut);
121        self.wake_sets.staging.insert(TaskId::Main);
122
123        loop {
124            // drive the main future
125            if let Poll::Ready(value) = self.tick(&mut fut) {
126                return value;
127            }
128        }
129    }
130}
131
132impl<I> Default for Runtime<I>
133where
134    I: Io + Default,
135{
136    fn default() -> Self {
137        Self::new(I::default())
138    }
139}
140
141/// Convenience wrapper that creates a [`Runtime`] and runs `fut` to completion.
142pub fn block_on<I: Io, F: Future>(io: I, fut: F) -> F::Output {
143    Runtime::new(io).block_on(fut)
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use tempest_io::VirtualIo;
150
151    #[test]
152    fn immediate_ready() {
153        let io = VirtualIo::default();
154        let result = block_on(io, async { 42 });
155        assert_eq!(result, 42);
156    }
157
158    #[test]
159    fn multiple_ticks() {
160        let io = VirtualIo::default();
161        let mut polls = 0u32;
162        let result = block_on(
163            io,
164            std::future::poll_fn(|cx| {
165                polls += 1;
166                if polls >= 3 {
167                    Poll::Ready("done")
168                } else {
169                    cx.waker().wake_by_ref();
170                    Poll::Pending
171                }
172            }),
173        );
174        assert_eq!(result, "done");
175        assert_eq!(polls, 3);
176    }
177}