use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
use futures::task::noop_waker;
use wasi::clocks::monotonic_clock;
use wasi::io::poll;
use super::{BoxFuture, Runtime, SpawnHandle, SpawnHandleInner};
#[derive(Clone, Default)]
pub struct WasiRuntime {
inner: Arc<WasiInner>,
}
#[derive(Default)]
struct WasiInner {
tasks: Mutex<Vec<TaskSlot>>,
}
struct TaskSlot {
fut: BoxFuture<()>,
aborted: Arc<AtomicBool>,
finished: Arc<AtomicBool>,
}
impl WasiRuntime {
pub fn new() -> Self {
Self::default()
}
pub fn drive(&self) -> usize {
#[allow(clippy::expect_used)]
let mut tasks = self
.inner
.tasks
.lock()
.expect("WasiRuntime task queue mutex poisoned");
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
tasks.retain_mut(|task| {
if task.aborted.load(Ordering::SeqCst) {
task.finished.store(true, Ordering::SeqCst);
return false;
}
match task.fut.as_mut().poll(&mut cx) {
Poll::Ready(()) => {
task.finished.store(true, Ordering::SeqCst);
false
}
Poll::Pending => true,
}
});
tasks.len()
}
pub fn poll_until_progress(&self, max_wait: Duration) {
let nanos = u64::try_from(max_wait.as_nanos()).unwrap_or(u64::MAX);
let deadline = monotonic_clock::subscribe_duration(nanos);
let pollables = [&deadline];
let _ready = poll::poll(&pollables);
}
pub fn tasks_pending(&self) -> usize {
#[allow(clippy::expect_used)]
self.inner
.tasks
.lock()
.expect("WasiRuntime task queue mutex poisoned")
.len()
}
}
impl Runtime for WasiRuntime {
fn spawn(&self, fut: BoxFuture<()>) -> SpawnHandle {
let aborted = Arc::new(AtomicBool::new(false));
let finished = Arc::new(AtomicBool::new(false));
#[allow(clippy::expect_used)]
let mut tasks = self
.inner
.tasks
.lock()
.expect("WasiRuntime task queue mutex poisoned");
tasks.push(TaskSlot {
fut,
aborted: Arc::clone(&aborted),
finished: Arc::clone(&finished),
});
SpawnHandle::from_inner(WasiSpawnHandle { aborted, finished })
}
fn sleep(&self, duration: Duration) -> BoxFuture<()> {
Box::pin(WasiSleep::new(duration))
}
fn now_monotonic(&self) -> Instant {
Instant::now()
}
fn now_wall_clock(&self) -> SystemTime {
SystemTime::now()
}
}
pub(super) struct WasiSpawnHandle {
aborted: Arc<AtomicBool>,
finished: Arc<AtomicBool>,
}
impl SpawnHandleInner for WasiSpawnHandle {
fn abort(&self) {
self.aborted.store(true, Ordering::SeqCst);
}
fn is_finished(&self) -> bool {
self.finished.load(Ordering::SeqCst)
}
}
struct WasiSleep {
deadline: Instant,
}
impl WasiSleep {
fn new(duration: Duration) -> Self {
Self {
deadline: Instant::now() + duration,
}
}
}
impl Future for WasiSleep {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.deadline {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn wasi_runtime_is_object_safe() {
fn assert_runtime_obj_safe(_: &dyn Runtime) {}
let rt = WasiRuntime::new();
assert_runtime_obj_safe(&rt);
}
#[test]
fn monotonic_clock_does_not_go_backwards() {
let rt = WasiRuntime::new();
let a = rt.now_monotonic();
for _ in 0..1_000 {
std::hint::black_box(a);
}
let b = rt.now_monotonic();
assert!(b >= a, "monotonic clock went backwards: {:?} → {:?}", a, b);
}
#[test]
fn wall_clock_is_after_unix_epoch() {
let rt = WasiRuntime::new();
let now = rt.now_wall_clock();
assert!(now > SystemTime::UNIX_EPOCH);
}
#[test]
fn spawn_noop_completes_on_first_drive() {
let rt = WasiRuntime::new();
let handle = rt.spawn(Box::pin(async {}));
assert_eq!(rt.tasks_pending(), 1);
rt.drive();
assert_eq!(rt.tasks_pending(), 0);
assert!(handle.is_finished());
}
#[test]
fn abort_short_circuits_pending_future() {
let rt = WasiRuntime::new();
let handle = rt.spawn(Box::pin(WasiSleep::new(Duration::from_secs(60))));
rt.drive(); assert_eq!(rt.tasks_pending(), 1);
handle.abort();
rt.drive(); assert_eq!(rt.tasks_pending(), 0);
assert!(handle.is_finished());
}
}