#![deny(missing_docs)]
#![allow(static_mut_refs)]
extern crate std;
use core::sync::atomic::{AtomicBool, Ordering};
use std::boxed::Box;
use std::collections::BTreeMap;
use std::ffi::c_void;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use std::task::{Context, Poll, Wake, Waker};
use std::vec::Vec;
use futures::channel::oneshot;
use futures::future::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
macro_rules! rtdebug {
($($f:tt)*) => {
if false {
std::eprintln!($($f)*);
}
}
}
mod abi_buffer;
mod cabi;
mod error_context;
mod future_support;
mod stream_support;
mod subtask;
mod waitable;
mod waitable_set;
use self::waitable_set::WaitableSet;
pub use abi_buffer::*;
pub use error_context::*;
pub use future_support::*;
pub use stream_support::*;
#[doc(hidden)]
pub use subtask::Subtask;
pub use futures;
type BoxFuture = Pin<Box<dyn Future<Output = ()> + 'static>>;
struct FutureState {
tasks: FuturesUnordered<BoxFuture>,
waitable_set: Option<WaitableSet>,
waitables: BTreeMap<u32, (*mut c_void, unsafe extern "C" fn(*mut c_void, u32))>,
wasip3_task: cabi::wasip3_task,
waker: Arc<FutureWaker>,
waker_clone: Waker,
}
impl FutureState {
fn new(future: BoxFuture) -> FutureState {
let waker = Arc::new(FutureWaker::default());
FutureState {
waker_clone: waker.clone().into(),
waker,
tasks: [future].into_iter().collect(),
waitable_set: None,
waitables: BTreeMap::new(),
wasip3_task: cabi::wasip3_task {
ptr: ptr::null_mut(),
version: cabi::WASIP3_TASK_V1,
waitable_register,
waitable_unregister,
},
}
}
fn get_or_create_waitable_set(&mut self) -> &WaitableSet {
self.waitable_set.get_or_insert_with(WaitableSet::new)
}
fn add_waitable(&mut self, waitable: u32) {
self.get_or_create_waitable_set().join(waitable)
}
fn remove_waitable(&mut self, waitable: u32) {
WaitableSet::remove_waitable_from_all_sets(waitable)
}
fn remaining_work(&self) -> bool {
!self.waitables.is_empty()
}
fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> (u32, bool) {
match event0 {
EVENT_NONE => rtdebug!("EVENT_NONE"),
EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"),
EVENT_STREAM_READ => rtdebug!("EVENT_STREAM_READ({event1:#x}, {event2:#x})"),
EVENT_STREAM_WRITE => rtdebug!("EVENT_STREAM_WRITE({event1:#x}, {event2:#x})"),
EVENT_FUTURE_READ => rtdebug!("EVENT_FUTURE_READ({event1:#x}, {event2:#x})"),
EVENT_FUTURE_WRITE => rtdebug!("EVENT_FUTURE_WRITE({event1:#x}, {event2:#x})"),
EVENT_CANCEL => {
rtdebug!("EVENT_CANCEL");
return (CALLBACK_CODE_EXIT, true);
}
_ => unreachable!(),
}
if event0 != EVENT_NONE {
self.deliver_waitable_event(event1, event2)
}
self.poll()
}
fn deliver_waitable_event(&mut self, waitable: u32, code: u32) {
self.remove_waitable(waitable);
let (ptr, callback) = self.waitables.remove(&waitable).unwrap();
unsafe {
callback(ptr, code);
}
}
fn poll(&mut self) -> (u32, bool) {
self.with_p3_task_set(|me| {
let mut context = Context::from_waker(&me.waker_clone);
loop {
me.waker.0.store(false, Ordering::Relaxed);
let poll;
unsafe {
poll = me.tasks.poll_next_unpin(&mut context);
if !SPAWNED.is_empty() {
me.tasks.extend(SPAWNED.drain(..));
}
}
match poll {
Poll::Ready(Some(())) => (),
Poll::Ready(None) => {
assert!(!me.remaining_work());
assert!(me.tasks.is_empty());
break (CALLBACK_CODE_EXIT, true);
}
Poll::Pending => {
assert!(!me.tasks.is_empty());
if me.waker.0.load(Ordering::Relaxed) {
break (CALLBACK_CODE_YIELD, false);
}
assert!(me.remaining_work());
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break (CALLBACK_CODE_WAIT | (waitable << 4), false);
}
}
}
})
}
fn with_p3_task_set<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
struct ResetTask(*mut cabi::wasip3_task);
impl Drop for ResetTask {
fn drop(&mut self) {
unsafe {
cabi::wasip3_task_set(self.0);
}
}
}
let self_raw = self as *mut FutureState;
self.wasip3_task.ptr = self_raw.cast();
let prev = unsafe { cabi::wasip3_task_set(&mut self.wasip3_task) };
let _reset = ResetTask(prev);
f(self)
}
}
impl Drop for FutureState {
fn drop(&mut self) {
if !self.tasks.is_empty() {
self.with_p3_task_set(|me| {
me.tasks = Default::default();
})
}
}
}
unsafe extern "C" fn waitable_register(
ptr: *mut c_void,
waitable: u32,
callback: unsafe extern "C" fn(*mut c_void, u32),
callback_ptr: *mut c_void,
) -> *mut c_void {
let ptr = ptr.cast::<FutureState>();
assert!(!ptr.is_null());
(*ptr).add_waitable(waitable);
match (*ptr).waitables.insert(waitable, (callback_ptr, callback)) {
Some((prev, _)) => prev,
None => ptr::null_mut(),
}
}
unsafe extern "C" fn waitable_unregister(ptr: *mut c_void, waitable: u32) -> *mut c_void {
let ptr = ptr.cast::<FutureState>();
assert!(!ptr.is_null());
(*ptr).remove_waitable(waitable);
match (*ptr).waitables.remove(&waitable) {
Some((prev, _)) => prev,
None => ptr::null_mut(),
}
}
#[derive(Default)]
struct FutureWaker(AtomicBool);
impl Wake for FutureWaker {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}
fn wake_by_ref(self: &Arc<Self>) {
self.0.store(true, Ordering::Relaxed)
}
}
static mut SPAWNED: Vec<BoxFuture> = Vec::new();
const EVENT_NONE: u32 = 0;
const EVENT_SUBTASK: u32 = 1;
const EVENT_STREAM_READ: u32 = 2;
const EVENT_STREAM_WRITE: u32 = 3;
const EVENT_FUTURE_READ: u32 = 4;
const EVENT_FUTURE_WRITE: u32 = 5;
const EVENT_CANCEL: u32 = 6;
const CALLBACK_CODE_EXIT: u32 = 0;
const CALLBACK_CODE_YIELD: u32 = 1;
const CALLBACK_CODE_WAIT: u32 = 2;
const _CALLBACK_CODE_POLL: u32 = 3;
const STATUS_STARTING: u32 = 0;
const STATUS_STARTED: u32 = 1;
const STATUS_RETURNED: u32 = 2;
const STATUS_STARTED_CANCELLED: u32 = 3;
const STATUS_RETURNED_CANCELLED: u32 = 4;
const BLOCKED: u32 = 0xffff_ffff;
const COMPLETED: u32 = 0x0;
const DROPPED: u32 = 0x1;
const CANCELLED: u32 = 0x2;
#[derive(PartialEq, Debug, Copy, Clone)]
enum ReturnCode {
Blocked,
Completed(u32),
Dropped(u32),
Cancelled(u32),
}
impl ReturnCode {
fn decode(val: u32) -> ReturnCode {
if val == BLOCKED {
return ReturnCode::Blocked;
}
let amt = val >> 4;
match val & 0xf {
COMPLETED => ReturnCode::Completed(amt),
DROPPED => ReturnCode::Dropped(amt),
CANCELLED => ReturnCode::Cancelled(amt),
_ => panic!("unknown return code {val:#x}"),
}
}
}
#[doc(hidden)]
pub fn start_task(task: impl Future<Output = ()> + 'static) -> i32 {
let state = Box::into_raw(Box::new(FutureState::new(Box::pin(task))));
unsafe {
assert!(context_get().is_null());
context_set(state.cast());
callback(EVENT_NONE, 0, 0) as i32
}
}
#[doc(hidden)]
pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
let state = context_get().cast::<FutureState>();
assert!(!state.is_null());
unsafe {
context_set(ptr::null_mut());
}
unsafe {
let (rc, done) = (*state).callback(event0, event1, event2);
if done {
drop(Box::from_raw(state));
} else {
context_set(state.cast());
}
rtdebug!(" => (cb) {rc:#x}");
rc
}
}
pub fn spawn(future: impl Future<Output = ()> + 'static) {
unsafe { SPAWNED.push(Box::pin(future)) }
}
pub fn block_on<T: 'static>(future: impl Future<Output = T> + 'static) -> T {
let (tx, mut rx) = oneshot::channel();
let state = &mut FutureState::new(Box::pin(future.map(move |v| drop(tx.send(v)))) as BoxFuture);
let mut event = (EVENT_NONE, 0, 0);
loop {
match state.callback(event.0, event.1, event.2) {
(_, true) => break rx.try_recv().unwrap().unwrap(),
(CALLBACK_CODE_YIELD, false) => event = state.waitable_set.as_ref().unwrap().poll(),
_ => event = state.waitable_set.as_ref().unwrap().wait(),
}
}
}
pub fn yield_blocking() -> bool {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn yield_() -> bool {
unreachable!();
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[thread-yield]"]
fn yield_() -> bool;
}
unsafe { !yield_() }
}
pub async fn yield_async() {
#[derive(Default)]
struct Yield {
yielded: bool,
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
context.waker().wake_by_ref();
Poll::Pending
}
}
}
Yield::default().await;
}
#[deprecated = "use backpressure_{inc,dec} instead"]
pub fn backpressure_set(enabled: bool) {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn backpressure_set(_: i32) {
unreachable!();
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-set]"]
fn backpressure_set(_: i32);
}
unsafe { backpressure_set(if enabled { 1 } else { 0 }) }
}
pub fn backpressure_inc() {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn backpressure_inc() {
unreachable!();
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-inc]"]
fn backpressure_inc();
}
unsafe { backpressure_inc() }
}
pub fn backpressure_dec() {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn backpressure_dec() {
unreachable!();
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[backpressure-dec]"]
fn backpressure_dec();
}
unsafe { backpressure_dec() }
}
fn context_get() -> *mut u8 {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn get() -> *mut u8 {
unreachable!()
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[context-get-0]"]
fn get() -> *mut u8;
}
unsafe { get() }
}
unsafe fn context_set(value: *mut u8) {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn set(_: *mut u8) {
unreachable!()
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "$root")]
extern "C" {
#[link_name = "[context-set-0]"]
fn set(value: *mut u8);
}
unsafe { set(value) }
}
#[doc(hidden)]
pub struct TaskCancelOnDrop {
_priv: (),
}
impl TaskCancelOnDrop {
#[doc(hidden)]
pub fn new() -> TaskCancelOnDrop {
TaskCancelOnDrop { _priv: () }
}
#[doc(hidden)]
pub fn forget(self) {
mem::forget(self);
}
}
impl Drop for TaskCancelOnDrop {
fn drop(&mut self) {
#[cfg(not(target_arch = "wasm32"))]
unsafe fn cancel() {
unreachable!()
}
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "[export]$root")]
extern "C" {
#[link_name = "[task-cancel]"]
fn cancel();
}
unsafe { cancel() }
}
}