use super::REACTOR;
use core::cell::RefCell;
use core::future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use slab::Slab;
use std::collections::HashMap;
use std::rc::Rc;
use wasi::io::poll::Pollable;
#[repr(transparent)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(crate) struct EventKey(pub(crate) usize);
#[derive(Debug, PartialEq, Eq, Hash)]
struct Registration {
key: EventKey,
}
impl Drop for Registration {
fn drop(&mut self) {
Reactor::current().deregister_event(self.key)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AsyncPollable(Rc<Registration>);
impl AsyncPollable {
pub fn new(pollable: Pollable) -> Self {
Reactor::current().schedule(pollable)
}
pub fn wait_for(&self) -> WaitFor {
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
WaitFor {
waitee: Waitee {
pollable: self.clone(),
unique,
},
needs_deregistration: false,
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct Waitee {
pollable: AsyncPollable,
unique: usize,
}
#[must_use = "futures do nothing unless polled or .awaited"]
#[derive(Debug)]
pub struct WaitFor {
waitee: Waitee,
needs_deregistration: bool,
}
impl future::Future for WaitFor {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let reactor = Reactor::current();
if reactor.ready(&self.as_ref().waitee, cx.waker()) {
Poll::Ready(())
} else {
self.as_mut().needs_deregistration = true;
Poll::Pending
}
}
}
impl Drop for WaitFor {
fn drop(&mut self) {
if self.needs_deregistration {
Reactor::current().deregister_waitee(&self.waitee)
}
}
}
#[derive(Debug, Clone)]
pub struct Reactor {
inner: Rc<RefCell<InnerReactor>>,
}
#[derive(Debug)]
struct InnerReactor {
pollables: Slab<Pollable>,
wakers: HashMap<Waitee, Waker>,
}
impl Reactor {
pub fn current() -> Self {
REACTOR.with(|r| {
r.borrow()
.as_ref()
.expect("Reactor::current must be called within a wstd runtime")
.clone()
})
}
pub(crate) fn new() -> Self {
Self {
inner: Rc::new(RefCell::new(InnerReactor {
pollables: Slab::new(),
wakers: HashMap::new(),
})),
}
}
pub(crate) fn block_until(&self) {
let reactor = self.inner.borrow();
let mut indexed_wakers = Vec::with_capacity(reactor.wakers.len());
let mut targets = Vec::with_capacity(reactor.wakers.len());
for (waitee, waker) in reactor.wakers.iter() {
let pollable_index = waitee.pollable.0.key;
indexed_wakers.push(waker);
targets.push(&reactor.pollables[pollable_index.0]);
}
debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap"
);
let ready_indexes = wasi::io::poll::poll(&targets);
let ready_wakers = ready_indexes
.into_iter()
.map(|index| indexed_wakers[index as usize]);
for waker in ready_wakers {
waker.wake_by_ref()
}
}
pub fn schedule(&self, pollable: Pollable) -> AsyncPollable {
let mut reactor = self.inner.borrow_mut();
let key = EventKey(reactor.pollables.insert(pollable));
AsyncPollable(Rc::new(Registration { key }))
}
fn deregister_event(&self, key: EventKey) {
let mut reactor = self.inner.borrow_mut();
reactor.pollables.remove(key.0);
}
fn deregister_waitee(&self, waitee: &Waitee) {
let mut reactor = self.inner.borrow_mut();
reactor.wakers.remove(waitee);
}
fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool {
let mut reactor = self.inner.borrow_mut();
let ready = reactor
.pollables
.get(waitee.pollable.0.key.0)
.expect("only live EventKey can be checked for readiness")
.ready();
if !ready {
reactor.wakers.insert(waitee.clone(), waker.clone());
}
ready
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn subscribe_no_duration() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let pollable = wasi::clocks::monotonic_clock::subscribe_duration(0);
let sched = reactor.schedule(pollable);
sched.wait_for().await;
})
}
#[test]
fn subscribe_some_duration() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let pollable = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let sched = reactor.schedule(pollable);
sched.wait_for().await;
})
}
#[test]
fn subscribe_multiple_durations() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let now = wasi::clocks::monotonic_clock::subscribe_duration(0);
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let now = reactor.schedule(now);
let soon = reactor.schedule(soon);
soon.wait_for().await;
drop(now)
})
}
#[test]
fn subscribe_multiple_durations_zipped() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let start = wasi::clocks::monotonic_clock::now();
let soon = wasi::clocks::monotonic_clock::subscribe_duration(10_000_000);
let later = wasi::clocks::monotonic_clock::subscribe_duration(40_000_000);
let soon = reactor.schedule(soon);
let later = reactor.schedule(later);
futures_lite::future::zip(
async move {
soon.wait_for().await;
println!(
"*** subscribe_duration(soon) ready ({})",
wasi::clocks::monotonic_clock::now() - start
);
},
async move {
later.wait_for().await;
println!(
"*** subscribe_duration(later) ready ({})",
wasi::clocks::monotonic_clock::now() - start
);
},
)
.await;
})
}
}