spin_executor/
lib.rs

1use std::future::Future;
2use std::mem;
3use std::ops::DerefMut;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll, Wake, Waker};
6use wasi::io;
7
8type Wrapped = Arc<Mutex<Option<io::poll::Pollable>>>;
9
10static WAKERS: Mutex<Vec<(Wrapped, Waker)>> = Mutex::new(Vec::new());
11
12/// Handle to a Pollable registered using `push_waker_and_get_token` which may
13/// be used to cancel and drop the Pollable.
14pub struct CancelToken(Wrapped);
15
16impl CancelToken {
17    /// Cancel and drop the Pollable.
18    pub fn cancel(self) {
19        drop(self.0.lock().unwrap().take())
20    }
21}
22
23/// Handle to a Pollable registered using `push_waker_and_get_token` which, when
24/// dropped, will cancel and drop the Pollable.
25pub struct CancelOnDropToken(Wrapped);
26
27impl From<CancelToken> for CancelOnDropToken {
28    fn from(token: CancelToken) -> Self {
29        Self(token.0)
30    }
31}
32
33impl Drop for CancelOnDropToken {
34    fn drop(&mut self) {
35        drop(self.0.lock().unwrap().take())
36    }
37}
38
39/// Register a `Pollable` and `Waker` to be polled as part of the [`run`] event
40/// loop.
41pub fn push_waker(pollable: io::poll::Pollable, waker: Waker) {
42    _ = push_waker_and_get_token(pollable, waker);
43}
44
45/// Register a `Pollable` and `Waker` to be polled as part of the [`run`] event
46/// loop and retrieve a [`CancelToken`] to cancel the registration later, if
47/// desired.
48pub fn push_waker_and_get_token(pollable: io::poll::Pollable, waker: Waker) -> CancelToken {
49    let wrapped = Arc::new(Mutex::new(Some(pollable)));
50    WAKERS.lock().unwrap().push((wrapped.clone(), waker));
51    CancelToken(wrapped)
52}
53
54/// Run the specified future to completion, blocking until it yields a result.
55///
56/// This will alternate between polling the specified future and polling any
57/// `Pollable`s registered using [`push_waker`] or [`push_waker_and_get_token`]
58/// using `wasi::io/poll/poll-list`.  It will panic if the future returns
59/// `Poll::Pending` without having registered at least one `Pollable`.
60pub fn run<T>(future: impl Future<Output = T>) -> T {
61    futures::pin_mut!(future);
62    struct DummyWaker;
63
64    impl Wake for DummyWaker {
65        fn wake(self: Arc<Self>) {}
66    }
67
68    let waker = Arc::new(DummyWaker).into();
69
70    loop {
71        match future.as_mut().poll(&mut Context::from_waker(&waker)) {
72            Poll::Pending => {
73                let mut new_wakers = Vec::new();
74
75                let wakers = mem::take(WAKERS.lock().unwrap().deref_mut())
76                    .into_iter()
77                    .filter_map(|(wrapped, waker)| {
78                        let pollable = wrapped.lock().unwrap().take();
79                        pollable.map(|pollable| (wrapped, pollable, waker))
80                    })
81                    .collect::<Vec<_>>();
82
83                assert!(!wakers.is_empty());
84
85                let pollables = wakers
86                    .iter()
87                    .map(|(_, pollable, _)| pollable)
88                    .collect::<Vec<_>>();
89
90                let mut ready = vec![false; wakers.len()];
91
92                for index in io::poll::poll(&pollables) {
93                    ready[usize::try_from(index).unwrap()] = true;
94                }
95
96                for (ready, (wrapped, pollable, waker)) in ready.into_iter().zip(wakers) {
97                    if ready {
98                        waker.wake()
99                    } else {
100                        *wrapped.lock().unwrap() = Some(pollable);
101                        new_wakers.push((wrapped, waker));
102                    }
103                }
104
105                *WAKERS.lock().unwrap() = new_wakers;
106            }
107            Poll::Ready(result) => break result,
108        }
109    }
110}