spin_executor/
lib.rs

1use bindings::wasi::io;
2use std::future::Future;
3use std::mem;
4use std::ops::DerefMut;
5use std::sync::{Arc, Mutex};
6use std::task::{Context, Poll, Wake, Waker};
7
8/// Module containing the generated WIT bindings.
9pub mod bindings {
10    wit_bindgen::generate!({
11        world: "imports",
12        path: "io.wit",
13    });
14}
15
16impl std::fmt::Display for io::streams::Error {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        f.write_str(&self.to_debug_string())
19    }
20}
21
22impl std::error::Error for io::streams::Error {}
23
24type Wrapped = Arc<Mutex<Option<io::poll::Pollable>>>;
25
26static WAKERS: Mutex<Vec<(Wrapped, Waker)>> = Mutex::new(Vec::new());
27
28/// Handle to a Pollable registered using `push_waker_and_get_token` which may
29/// be used to cancel and drop the Pollable.
30pub struct CancelToken(Wrapped);
31
32impl CancelToken {
33    /// Cancel and drop the Pollable.
34    pub fn cancel(self) {
35        drop(self.0.lock().unwrap().take())
36    }
37}
38
39/// Handle to a Pollable registered using `push_waker_and_get_token` which, when
40/// dropped, will cancel and drop the Pollable.
41pub struct CancelOnDropToken(Wrapped);
42
43impl From<CancelToken> for CancelOnDropToken {
44    fn from(token: CancelToken) -> Self {
45        Self(token.0)
46    }
47}
48
49impl Drop for CancelOnDropToken {
50    fn drop(&mut self) {
51        drop(self.0.lock().unwrap().take())
52    }
53}
54
55/// Register a `Pollable` and `Waker` to be polled as part of the [`run`] event
56/// loop.
57pub fn push_waker(pollable: io::poll::Pollable, waker: Waker) {
58    _ = push_waker_and_get_token(pollable, waker);
59}
60
61/// Register a `Pollable` and `Waker` to be polled as part of the [`run`] event
62/// loop and retrieve a [`CancelToken`] to cancel the registration later, if
63/// desired.
64pub fn push_waker_and_get_token(pollable: io::poll::Pollable, waker: Waker) -> CancelToken {
65    let wrapped = Arc::new(Mutex::new(Some(pollable)));
66    WAKERS.lock().unwrap().push((wrapped.clone(), waker));
67    CancelToken(wrapped)
68}
69
70/// Run the specified future to completion, blocking until it yields a result.
71///
72/// This will alternate between polling the specified future and polling any
73/// `Pollable`s registered using [`push_waker`] or [`push_waker_and_get_token`]
74/// using `wasi::io/poll/poll-list`.  It will panic if the future returns
75/// `Poll::Pending` without having registered at least one `Pollable`.
76pub fn run<T>(future: impl Future<Output = T>) -> T {
77    futures::pin_mut!(future);
78    struct DummyWaker;
79
80    impl Wake for DummyWaker {
81        fn wake(self: Arc<Self>) {}
82    }
83
84    let waker = Arc::new(DummyWaker).into();
85
86    loop {
87        match future.as_mut().poll(&mut Context::from_waker(&waker)) {
88            Poll::Pending => {
89                let mut new_wakers = Vec::new();
90
91                let wakers = mem::take(WAKERS.lock().unwrap().deref_mut())
92                    .into_iter()
93                    .filter_map(|(wrapped, waker)| {
94                        let pollable = wrapped.lock().unwrap().take();
95                        pollable.map(|pollable| (wrapped, pollable, waker))
96                    })
97                    .collect::<Vec<_>>();
98
99                assert!(!wakers.is_empty());
100
101                let pollables = wakers
102                    .iter()
103                    .map(|(_, pollable, _)| pollable)
104                    .collect::<Vec<_>>();
105
106                let mut ready = vec![false; wakers.len()];
107
108                for index in io::poll::poll(&pollables) {
109                    ready[usize::try_from(index).unwrap()] = true;
110                }
111
112                for (ready, (wrapped, pollable, waker)) in ready.into_iter().zip(wakers) {
113                    if ready {
114                        waker.wake()
115                    } else {
116                        *wrapped.lock().unwrap() = Some(pollable);
117                        new_wakers.push((wrapped, waker));
118                    }
119                }
120
121                *WAKERS.lock().unwrap() = new_wakers;
122            }
123            Poll::Ready(result) => break result,
124        }
125    }
126}