wasi_async_runtime/
reactor.rs

1use super::polling::{EventKey, Poller};
2
3use alloc::rc::Rc;
4use core::cell::RefCell;
5use core::future;
6use core::task::Poll;
7use core::task::Waker;
8#[cfg(not(feature = "std"))]
9use hashbrown::HashMap;
10#[cfg(feature = "std")]
11use std::collections::HashMap;
12use wasi::io::poll::Pollable;
13
14/// Manage async system resources for WASI 0.2
15#[derive(Debug, Clone)]
16pub struct Reactor {
17    inner: Rc<RefCell<InnerReactor>>,
18}
19
20/// The private, internal `Reactor` implementation - factored out so we can take
21/// a lock of the whole.
22#[derive(Debug)]
23struct InnerReactor {
24    poller: Poller,
25    wakers: HashMap<EventKey, Waker>,
26}
27
28impl Reactor {
29    /// Create a new instance of `Reactor`
30    pub(crate) fn new() -> Self {
31        Self {
32            inner: Rc::new(RefCell::new(InnerReactor {
33                poller: Poller::new(),
34                wakers: HashMap::new(),
35            })),
36        }
37    }
38
39    /// Block until new events are ready. Calls the respective wakers once done.
40    ///
41    /// # On Wakers and single-threaded runtimes
42    ///
43    /// At first glance it might seem silly that this goes through the motions
44    /// of calling the wakers. The main waker we create here is a `noop` waker:
45    /// it does nothing. However, it is common and encouraged to use wakers to
46    /// distinguish between events. Concurrency primitives may construct their
47    /// own wakers to keep track of identity and wake more precisely. We do not
48    /// control the wakers construted by other libraries, and it is for this
49    /// reason that we have to call all the wakers - even if by default they
50    /// will do nothing.
51    pub(crate) fn block_until(&self) {
52        let mut reactor = self.inner.borrow_mut();
53        for key in reactor.poller.block_until() {
54            match reactor.wakers.get(&key) {
55                Some(waker) => waker.wake_by_ref(),
56                None => panic!("tried to wake the waker for non-existent `{key:?}`"),
57            }
58        }
59    }
60
61    /// Wait for the pollable to resolve.
62    pub async fn wait_for(&self, pollable: Pollable) {
63        let mut pollable = Some(pollable);
64        let mut key = None;
65
66        // This function is the core loop of our function; it will be called
67        // multiple times as the future is resolving.
68        future::poll_fn(|cx| {
69            // Start by taking a lock on the reactor. This is single-threaded
70            // and short-lived, so it will never be contended.
71            let mut reactor = self.inner.borrow_mut();
72
73            // Schedule interest in the `pollable` on the first iteration. On
74            // every iteration, register the waker with the reactor.
75            let key = key.get_or_insert_with(|| reactor.poller.insert(pollable.take().unwrap()));
76            reactor.wakers.insert(*key, cx.waker().clone());
77
78            // Check whether we're ready or need to keep waiting. If we're
79            // ready, we clean up after ourselves.
80            if reactor.poller.get(key).unwrap().ready() {
81                reactor.poller.remove(*key);
82                reactor.wakers.remove(key);
83                Poll::Ready(())
84            } else {
85                Poll::Pending
86            }
87        })
88        .await
89    }
90}