use super::REACTOR;
use async_task::{Runnable, Task};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use slab::Slab;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use wasip2::io::poll::Pollable;
#[repr(transparent)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
pub(crate) struct EventKey(pub(crate) usize);
#[derive(Debug, PartialEq, Eq, Hash)]
struct Registration {
key: EventKey,
}
impl Drop for Registration {
fn drop(&mut self) {
Reactor::current().deregister_event(self.key)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AsyncPollable(Arc<Registration>);
impl AsyncPollable {
pub fn new(pollable: Pollable) -> Self {
Reactor::current().schedule(pollable)
}
pub fn wait_for(&self) -> WaitFor {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let unique = COUNTER.fetch_add(1, Ordering::Relaxed);
WaitFor {
waitee: Waitee {
pollable: self.clone(),
unique,
},
needs_deregistration: false,
}
}
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct Waitee {
pollable: AsyncPollable,
unique: u64,
}
#[must_use = "futures do nothing unless polled or .awaited"]
#[derive(Debug)]
pub struct WaitFor {
waitee: Waitee,
needs_deregistration: bool,
}
impl Future for WaitFor {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let reactor = Reactor::current();
if reactor.ready(&self.as_ref().waitee, cx.waker()) {
Poll::Ready(())
} else {
self.as_mut().needs_deregistration = true;
Poll::Pending
}
}
}
impl Drop for WaitFor {
fn drop(&mut self) {
if self.needs_deregistration {
Reactor::current().deregister_waitee(&self.waitee)
}
}
}
#[derive(Debug, Clone)]
pub struct Reactor {
inner: Arc<InnerReactor>,
}
#[derive(Debug)]
struct InnerReactor {
pollables: Mutex<Slab<Pollable>>,
wakers: Mutex<HashMap<Waitee, Waker>>,
ready_list: Mutex<VecDeque<Runnable>>,
}
impl Reactor {
pub fn current() -> Self {
REACTOR.with(|r| {
r.borrow()
.as_ref()
.expect("Reactor::current must be called within a wstd runtime")
.clone()
})
}
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(InnerReactor {
pollables: Mutex::new(Slab::new()),
wakers: Mutex::new(HashMap::new()),
ready_list: Mutex::new(VecDeque::new()),
}),
}
}
pub(crate) fn pending_pollables_is_empty(&self) -> bool {
self.inner.wakers.lock().unwrap().is_empty()
}
pub(crate) fn block_on_pollables(&self) {
self.check_pollables(|targets| {
debug_assert_ne!(
targets.len(),
0,
"Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasip2::io::poll::poll will trap"
);
wasip2::io::poll::poll(targets)
})
}
pub(crate) fn nonblock_check_pollables(&self) {
if self.pending_pollables_is_empty() {
return;
}
use std::sync::LazyLock;
static READY_POLLABLE: LazyLock<Pollable> =
LazyLock::new(|| wasip2::clocks::monotonic_clock::subscribe_duration(0));
self.check_pollables(|targets| {
let ready_index = targets.len();
let mut new_targets = Vec::with_capacity(ready_index + 1);
new_targets.extend_from_slice(targets);
new_targets.push(&*READY_POLLABLE);
let mut ready_list = wasip2::io::poll::poll(&new_targets);
ready_list.retain(|e| *e != ready_index as u32);
ready_list
})
}
fn check_pollables<F>(&self, check_ready: F)
where
F: FnOnce(&[&Pollable]) -> Vec<u32>,
{
let wakers = self.inner.wakers.lock().unwrap();
let pollables = self.inner.pollables.lock().unwrap();
let mut indexed_wakers = Vec::with_capacity(wakers.len());
let mut targets = Vec::with_capacity(wakers.len());
for (waitee, waker) in wakers.iter() {
let pollable_index = waitee.pollable.0.key;
indexed_wakers.push(waker);
targets.push(&pollables[pollable_index.0]);
}
let ready_indexes = check_ready(&targets);
let ready_wakers = ready_indexes
.into_iter()
.map(|index| indexed_wakers[index as usize]);
for waker in ready_wakers {
waker.wake_by_ref()
}
}
pub fn schedule(&self, pollable: Pollable) -> AsyncPollable {
let mut pollables = self.inner.pollables.lock().unwrap();
let key = EventKey(pollables.insert(pollable));
AsyncPollable(Arc::new(Registration { key }))
}
fn deregister_event(&self, key: EventKey) {
let mut pollables = self.inner.pollables.lock().unwrap();
pollables.remove(key.0);
}
fn deregister_waitee(&self, waitee: &Waitee) {
let mut wakers = self.inner.wakers.lock().unwrap();
wakers.remove(waitee);
}
fn ready(&self, waitee: &Waitee, waker: &Waker) -> bool {
let ready = self
.inner
.pollables
.lock()
.unwrap()
.get(waitee.pollable.0.key.0)
.expect("only live EventKey can be checked for readiness")
.ready();
if !ready {
self.inner
.wakers
.lock()
.unwrap()
.insert(waitee.clone(), waker.clone());
}
ready
}
pub fn spawn<F, T>(&self, fut: F) -> Task<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
let this = self.clone();
let schedule = move |runnable| this.inner.ready_list.lock().unwrap().push_back(runnable);
#[allow(unsafe_code)]
let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };
self.inner.ready_list.lock().unwrap().push_back(runnable);
task
}
pub(super) fn pop_ready_list(&self) -> Option<Runnable> {
self.inner.ready_list.lock().unwrap().pop_front()
}
pub(super) fn ready_list_is_empty(&self) -> bool {
self.inner.ready_list.lock().unwrap().is_empty()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn subscribe_no_duration() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let pollable = wasip2::clocks::monotonic_clock::subscribe_duration(0);
let sched = reactor.schedule(pollable);
sched.wait_for().await;
})
}
#[test]
fn subscribe_some_duration() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let pollable = wasip2::clocks::monotonic_clock::subscribe_duration(10_000_000);
let sched = reactor.schedule(pollable);
sched.wait_for().await;
})
}
#[test]
fn subscribe_multiple_durations() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let now = wasip2::clocks::monotonic_clock::subscribe_duration(0);
let soon = wasip2::clocks::monotonic_clock::subscribe_duration(10_000_000);
let now = reactor.schedule(now);
let soon = reactor.schedule(soon);
soon.wait_for().await;
drop(now)
})
}
#[test]
fn subscribe_multiple_durations_zipped() {
crate::runtime::block_on(async {
let reactor = Reactor::current();
let start = wasip2::clocks::monotonic_clock::now();
let soon = wasip2::clocks::monotonic_clock::subscribe_duration(10_000_000);
let later = wasip2::clocks::monotonic_clock::subscribe_duration(40_000_000);
let soon = reactor.schedule(soon);
let later = reactor.schedule(later);
futures_lite::future::zip(
async move {
soon.wait_for().await;
println!(
"*** subscribe_duration(soon) ready ({})",
wasip2::clocks::monotonic_clock::now() - start
);
},
async move {
later.wait_for().await;
println!(
"*** subscribe_duration(later) ready ({})",
wasip2::clocks::monotonic_clock::now() - start
);
},
)
.await;
})
}
#[test]
fn progresses_wasi_independent_futures() {
crate::runtime::block_on(async {
let start = wasip2::clocks::monotonic_clock::now();
let reactor = Reactor::current();
const LONG_DURATION: u64 = 1_000_000_000;
let later = wasip2::clocks::monotonic_clock::subscribe_duration(LONG_DURATION);
let later = reactor.schedule(later);
let mut polled_before = false;
let wasi_independent_future = futures_lite::future::poll_fn(|cx| {
if polled_before {
std::task::Poll::Ready(true)
} else {
polled_before = true;
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
});
let later = async {
later.wait_for().await;
false
};
let wasi_independent_future_won =
futures_lite::future::race(wasi_independent_future, later).await;
assert!(
wasi_independent_future_won,
"wasi_independent_future should win the race"
);
const SHORT_DURATION: u64 = LONG_DURATION / 100;
let soon = wasip2::clocks::monotonic_clock::subscribe_duration(SHORT_DURATION);
let soon = reactor.schedule(soon);
soon.wait_for().await;
let end = wasip2::clocks::monotonic_clock::now();
let duration = end - start;
assert!(
duration > SHORT_DURATION,
"{duration} greater than short duration shows awaited for `soon` properly"
);
assert!(
duration < (5 * SHORT_DURATION),
"{duration} less than a reasonable multiple of short duration {SHORT_DURATION} shows did not await for `later`"
);
})
}
#[test]
fn cooperative_concurrency() {
crate::runtime::block_on(async {
let cpu_heavy = async move {
for _ in 0..10 {
std::thread::sleep(std::time::Duration::from_millis(100));
futures_lite::future::yield_now().await;
}
true
};
let timeout = async move {
crate::time::Timer::after(crate::time::Duration::from_millis(200))
.wait()
.await;
false
};
let mut future_group = futures_concurrency::future::FutureGroup::<
Pin<Box<dyn std::future::Future<Output = bool>>>,
>::new();
future_group.insert(Box::pin(cpu_heavy));
future_group.insert(Box::pin(timeout));
let result = futures_lite::StreamExt::next(&mut future_group).await;
assert_eq!(result, Some(false), "cpu_heavy task should have timed out");
});
}
}