wasi_async_runtime/reactor.rs
1use super::polling::{EventKey, Poller};
2
3use alloc::rc::Rc;
4use core::cell::RefCell;
5use core::future;
6use core::task::Poll;
7use core::task::Waker;
8#[cfg(not(feature = "std"))]
9use hashbrown::HashMap;
10#[cfg(feature = "std")]
11use std::collections::HashMap;
12use wasi::io::poll::Pollable;
13
14/// Manage async system resources for WASI 0.2
15#[derive(Debug, Clone)]
16pub struct Reactor {
17 inner: Rc<RefCell<InnerReactor>>,
18}
19
20/// The private, internal `Reactor` implementation - factored out so we can take
21/// a lock of the whole.
22#[derive(Debug)]
23struct InnerReactor {
24 poller: Poller,
25 wakers: HashMap<EventKey, Waker>,
26}
27
28impl Reactor {
29 /// Create a new instance of `Reactor`
30 pub(crate) fn new() -> Self {
31 Self {
32 inner: Rc::new(RefCell::new(InnerReactor {
33 poller: Poller::new(),
34 wakers: HashMap::new(),
35 })),
36 }
37 }
38
39 /// Block until new events are ready. Calls the respective wakers once done.
40 ///
41 /// # On Wakers and single-threaded runtimes
42 ///
43 /// At first glance it might seem silly that this goes through the motions
44 /// of calling the wakers. The main waker we create here is a `noop` waker:
45 /// it does nothing. However, it is common and encouraged to use wakers to
46 /// distinguish between events. Concurrency primitives may construct their
47 /// own wakers to keep track of identity and wake more precisely. We do not
48 /// control the wakers construted by other libraries, and it is for this
49 /// reason that we have to call all the wakers - even if by default they
50 /// will do nothing.
51 pub(crate) fn block_until(&self) {
52 let mut reactor = self.inner.borrow_mut();
53 for key in reactor.poller.block_until() {
54 match reactor.wakers.get(&key) {
55 Some(waker) => waker.wake_by_ref(),
56 None => panic!("tried to wake the waker for non-existent `{key:?}`"),
57 }
58 }
59 }
60
61 /// Wait for the pollable to resolve.
62 pub async fn wait_for(&self, pollable: Pollable) {
63 let mut pollable = Some(pollable);
64 let mut key = None;
65
66 // This function is the core loop of our function; it will be called
67 // multiple times as the future is resolving.
68 future::poll_fn(|cx| {
69 // Start by taking a lock on the reactor. This is single-threaded
70 // and short-lived, so it will never be contended.
71 let mut reactor = self.inner.borrow_mut();
72
73 // Schedule interest in the `pollable` on the first iteration. On
74 // every iteration, register the waker with the reactor.
75 let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
76 reactor.wakers.insert(*key, cx.waker().clone());
77
78 // Check whether we're ready or need to keep waiting. If we're
79 // ready, we clean up after ourselves.
80 if reactor.poller.get(key).unwrap().ready() {
81 reactor.poller.remove(*key);
82 reactor.wakers.remove(key);
83 Poll::Ready(())
84 } else {
85 Poll::Pending
86 }
87 })
88 .await
89 }
90}