1use std::{
8 any::TypeId,
9 pin::{Pin, pin},
10 task::{Context, Poll},
11};
12
13use slab::Slab;
14use tempest_io::Io;
15
16use crate::{
17 context::{CURRENT_CONTEXT, RuntimeContext, TaskId, WakeSets, make_waker, parse_op_handle},
18 task::Tasks,
19};
20
21pub struct Runtime<I: Io> {
23 io: I,
24 tasks: Tasks,
25 finished_tasks: Vec<usize>,
26 wake_sets: WakeSets,
27 next_op: u64,
28}
29
30impl<I: Io> Runtime<I> {
31 pub fn new(io: I) -> Self {
32 Self {
33 io,
34 tasks: Slab::new(),
35 finished_tasks: Vec::new(),
36 wake_sets: WakeSets::default(),
37 next_op: 0,
38 }
39 }
40
41 pub fn inspect_io(&mut self) -> &mut I {
43 &mut self.io
44 }
45
46 fn wake_active_by_io_completions(&mut self) {
47 for (handle, _) in self.io.completions() {
48 let (task_id, _) = parse_op_handle(*handle);
49 self.wake_sets.active.insert(task_id);
50 }
51 }
52
53 pub fn tick<F: Future>(&mut self, fut: &mut Pin<&mut F>) -> Poll<F::Output> {
54 self.wake_sets.swap();
55
56 let ctx = RuntimeContext {
57 type_id: TypeId::of::<I>(),
58 io: &mut self.io as *mut I as *mut (),
59 tasks: &mut self.tasks as *mut _,
60 wake_sets: &mut self.wake_sets as *mut _,
62 next_op: &mut self.next_op as *mut _,
63 };
64 CURRENT_CONTEXT.set(Some(ctx));
65
66 self.io.poll().expect("fatal: io poll failed");
68 self.wake_active_by_io_completions();
69
70 assert!(
71 !self.wake_sets.active.is_empty() || self.io.in_flight() > 0,
72 "deadlock: wake set is empty and no I/O in flight"
73 );
74
75 if self.wake_sets.active.is_empty() {
77 self.io.park().expect("fatal: io park failed");
78 self.wake_active_by_io_completions();
79 }
80
81 let mut result = Poll::Pending;
82 let mut active = std::mem::take(&mut self.wake_sets.active);
86 for &task in &active {
87 let waker = make_waker(task);
88 let mut cx = Context::from_waker(&waker);
89 match task {
90 TaskId::Main => {
91 result = fut.as_mut().poll(&mut cx);
92 }
93 TaskId::Task(id) => {
94 let index = id.get() as usize;
95 if let Poll::Ready(()) = self.tasks[index].as_mut().poll(&mut cx) {
96 self.finished_tasks.push(index);
97 }
98 }
99 }
100 }
101 active.clear();
103 self.wake_sets.active = active;
104
105 for key in self.finished_tasks.drain(..) {
107 let _ = self.tasks.remove(key);
109 }
110
111 assert!(self.io.completions().is_empty(), "leaked io completions");
112
113 CURRENT_CONTEXT.set(None);
114
115 result
116 }
117
118 pub fn block_on<F: Future>(&mut self, fut: F) -> F::Output {
120 let mut fut = pin!(fut);
121 self.wake_sets.staging.insert(TaskId::Main);
122
123 loop {
124 if let Poll::Ready(value) = self.tick(&mut fut) {
126 return value;
127 }
128 }
129 }
130}
131
132impl<I> Default for Runtime<I>
133where
134 I: Io + Default,
135{
136 fn default() -> Self {
137 Self::new(I::default())
138 }
139}
140
141pub fn block_on<I: Io, F: Future>(io: I, fut: F) -> F::Output {
143 Runtime::new(io).block_on(fut)
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use tempest_io::VirtualIo;
150
151 #[test]
152 fn immediate_ready() {
153 let io = VirtualIo::default();
154 let result = block_on(io, async { 42 });
155 assert_eq!(result, 42);
156 }
157
158 #[test]
159 fn multiple_ticks() {
160 let io = VirtualIo::default();
161 let mut polls = 0u32;
162 let result = block_on(
163 io,
164 std::future::poll_fn(|cx| {
165 polls += 1;
166 if polls >= 3 {
167 Poll::Ready("done")
168 } else {
169 cx.waker().wake_by_ref();
170 Poll::Pending
171 }
172 }),
173 );
174 assert_eq!(result, "done");
175 assert_eq!(polls, 3);
176 }
177}