#![deny(missing_docs)]
extern crate std;
use core::sync::atomic::{AtomicU32, Ordering};
use std::boxed::Box;
use std::collections::BTreeMap;
use std::ffi::c_void;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use std::task::{Context, Poll, Wake, Waker};
macro_rules! rtdebug {
($($f:tt)*) => {
if false {
std::eprintln!($($f)*);
}
}
}
macro_rules! extern_wasm {
(
$(#[$extern_attr:meta])*
unsafe extern "C" {
$(
$(#[$func_attr:meta])*
$vis:vis fn $func_name:ident ( $($args:tt)* ) $(-> $ret:ty)?;
)*
}
) => {
$(
#[cfg(not(target_family = "wasm"))]
#[allow(unused, reason = "dummy shim for non-wasm compilation, never invoked")]
$vis unsafe fn $func_name($($args)*) $(-> $ret)? {
unreachable!();
}
)*
#[cfg(target_family = "wasm")]
$(#[$extern_attr])*
unsafe extern "C" {
$(
$(#[$func_attr])*
$vis fn $func_name($($args)*) $(-> $ret)?;
)*
}
};
}
mod abi_buffer;
mod cabi;
mod error_context;
mod future_support;
#[cfg(feature = "futures-stream")]
mod futures_stream;
#[cfg(feature = "inter-task-wakeup")]
mod inter_task_wakeup;
mod stream_support;
mod subtask;
#[cfg(feature = "inter-task-wakeup")]
mod unit_stream;
mod waitable;
mod waitable_set;
#[cfg(not(feature = "inter-task-wakeup"))]
use inter_task_wakeup_disabled as inter_task_wakeup;
#[cfg(not(feature = "inter-task-wakeup"))]
mod inter_task_wakeup_disabled;
use self::waitable_set::WaitableSet;
pub use abi_buffer::*;
pub use error_context::*;
pub use future_support::*;
#[cfg(feature = "futures-stream")]
pub use futures_stream::*;
pub use stream_support::*;
#[doc(hidden)]
pub use subtask::Subtask;
#[cfg(feature = "inter-task-wakeup")]
pub use unit_stream::*;
type BoxFuture<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;
#[cfg(feature = "async-spawn")]
mod spawn;
#[cfg(feature = "async-spawn")]
pub use spawn::spawn;
#[cfg(not(feature = "async-spawn"))]
mod spawn_disabled;
#[cfg(not(feature = "async-spawn"))]
use spawn_disabled as spawn;
struct FutureState<'a> {
tasks: spawn::Tasks<'a>,
waitable_set: Option<WaitableSet>,
waitables: BTreeMap<u32, (*mut c_void, unsafe extern "C" fn(*mut c_void, u32))>,
wasip3_task: cabi::wasip3_task,
waker: Arc<FutureWaker>,
waker_clone: Waker,
inter_task_wakeup: inter_task_wakeup::State,
}
impl FutureState<'_> {
fn new(future: BoxFuture<'_>) -> FutureState<'_> {
let waker = Arc::new(FutureWaker::default());
FutureState {
waker_clone: waker.clone().into(),
waker,
tasks: spawn::Tasks::new(future),
waitable_set: None,
waitables: BTreeMap::new(),
wasip3_task: cabi::wasip3_task {
ptr: ptr::null_mut(),
version: cabi::WASIP3_TASK_V1,
waitable_register,
waitable_unregister,
},
inter_task_wakeup: Default::default(),
}
}
fn get_or_create_waitable_set(&mut self) -> &WaitableSet {
self.waitable_set.get_or_insert_with(WaitableSet::new)
}
fn add_waitable(&mut self, waitable: u32) {
self.get_or_create_waitable_set().join(waitable)
}
fn remove_waitable(&mut self, waitable: u32) {
WaitableSet::remove_waitable_from_all_sets(waitable)
}
fn remaining_work(&self) -> bool {
!self.waitables.is_empty()
}
fn callback(&mut self, event0: u32, event1: u32, event2: u32) -> CallbackCode {
match event0 {
EVENT_NONE => rtdebug!("EVENT_NONE"),
EVENT_SUBTASK => rtdebug!("EVENT_SUBTASK({event1:#x}, {event2:#x})"),
EVENT_STREAM_READ => rtdebug!("EVENT_STREAM_READ({event1:#x}, {event2:#x})"),
EVENT_STREAM_WRITE => rtdebug!("EVENT_STREAM_WRITE({event1:#x}, {event2:#x})"),
EVENT_FUTURE_READ => rtdebug!("EVENT_FUTURE_READ({event1:#x}, {event2:#x})"),
EVENT_FUTURE_WRITE => rtdebug!("EVENT_FUTURE_WRITE({event1:#x}, {event2:#x})"),
EVENT_CANCEL => {
rtdebug!("EVENT_CANCEL");
return CallbackCode::Exit;
}
_ => unreachable!(),
}
self.with_p3_task_set(|me| {
me.waker
.sleep_state
.store(SLEEP_STATE_WOKEN, Ordering::Relaxed);
if event0 != EVENT_NONE {
me.deliver_waitable_event(event1, event2)
}
me.cancel_inter_task_stream_read();
loop {
let mut context = Context::from_waker(&me.waker_clone);
me.waker
.sleep_state
.store(SLEEP_STATE_POLLING, Ordering::Relaxed);
let poll = me.tasks.poll_next(&mut context);
match poll {
Poll::Ready(Some(())) => (),
Poll::Ready(None) => {
assert!(me.tasks.is_empty());
if me.remaining_work() {
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break CallbackCode::Wait(waitable);
} else {
break CallbackCode::Exit;
}
}
Poll::Pending => {
assert!(!me.tasks.is_empty());
if me.waker.sleep_state.load(Ordering::Relaxed) == SLEEP_STATE_WOKEN {
if me.remaining_work() {
let (event0, event1, event2) =
me.waitable_set.as_ref().unwrap().poll();
if event0 != EVENT_NONE {
me.deliver_waitable_event(event1, event2);
continue;
}
}
break CallbackCode::Yield;
}
me.waker
.sleep_state
.store(SLEEP_STATE_SLEEPING, Ordering::Relaxed);
me.read_inter_task_stream();
let waitable = me.waitable_set.as_ref().unwrap().as_raw();
break CallbackCode::Wait(waitable);
}
}
}
})
}
fn deliver_waitable_event(&mut self, waitable: u32, code: u32) {
self.remove_waitable(waitable);
if self
.inter_task_wakeup
.consume_waitable_event(waitable, code)
{
return;
}
let (ptr, callback) = self.waitables.remove(&waitable).unwrap();
unsafe {
callback(ptr, code);
}
}
fn with_p3_task_set<R>(&mut self, f: impl FnOnce(&mut Self) -> R) -> R {
struct ResetTask(*mut cabi::wasip3_task);
impl Drop for ResetTask {
fn drop(&mut self) {
unsafe {
cabi::wasip3_task_set(self.0);
}
}
}
let self_raw = self as *mut FutureState<'_>;
self.wasip3_task.ptr = self_raw.cast();
let prev = unsafe { cabi::wasip3_task_set(&mut self.wasip3_task) };
let _reset = ResetTask(prev);
f(self)
}
}
impl Drop for FutureState<'_> {
fn drop(&mut self) {
self.cancel_inter_task_stream_read();
if !self.tasks.is_empty() {
self.with_p3_task_set(|me| {
me.tasks = Default::default();
})
}
}
}
unsafe extern "C" fn waitable_register(
ptr: *mut c_void,
waitable: u32,
callback: unsafe extern "C" fn(*mut c_void, u32),
callback_ptr: *mut c_void,
) -> *mut c_void {
let ptr = ptr.cast::<FutureState<'static>>();
assert!(!ptr.is_null());
unsafe {
(*ptr).add_waitable(waitable);
match (*ptr).waitables.insert(waitable, (callback_ptr, callback)) {
Some((prev, _)) => prev,
None => ptr::null_mut(),
}
}
}
unsafe extern "C" fn waitable_unregister(ptr: *mut c_void, waitable: u32) -> *mut c_void {
let ptr = ptr.cast::<FutureState<'static>>();
assert!(!ptr.is_null());
unsafe {
(*ptr).remove_waitable(waitable);
match (*ptr).waitables.remove(&waitable) {
Some((prev, _)) => prev,
None => ptr::null_mut(),
}
}
}
const SLEEP_STATE_POLLING: u32 = 0;
const SLEEP_STATE_WOKEN: u32 = 1;
const SLEEP_STATE_SLEEPING: u32 = 2;
#[derive(Default)]
struct FutureWaker {
sleep_state: AtomicU32,
inter_task_stream: inter_task_wakeup::WakerState,
}
impl Wake for FutureWaker {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}
fn wake_by_ref(self: &Arc<Self>) {
match self.sleep_state.swap(SLEEP_STATE_WOKEN, Ordering::Relaxed) {
SLEEP_STATE_POLLING | SLEEP_STATE_WOKEN => {}
other => {
assert_eq!(other, SLEEP_STATE_SLEEPING);
self.inter_task_stream.wake();
}
}
}
}
const EVENT_NONE: u32 = 0;
const EVENT_SUBTASK: u32 = 1;
const EVENT_STREAM_READ: u32 = 2;
const EVENT_STREAM_WRITE: u32 = 3;
const EVENT_FUTURE_READ: u32 = 4;
const EVENT_FUTURE_WRITE: u32 = 5;
const EVENT_CANCEL: u32 = 6;
#[derive(PartialEq, Debug)]
enum CallbackCode {
Exit,
Yield,
Wait(u32),
}
impl CallbackCode {
fn encode(self) -> u32 {
match self {
CallbackCode::Exit => 0,
CallbackCode::Yield => 1,
CallbackCode::Wait(waitable) => 2 | (waitable << 4),
}
}
}
const STATUS_STARTING: u32 = 0;
const STATUS_STARTED: u32 = 1;
const STATUS_RETURNED: u32 = 2;
const STATUS_STARTED_CANCELLED: u32 = 3;
const STATUS_RETURNED_CANCELLED: u32 = 4;
const BLOCKED: u32 = 0xffff_ffff;
const COMPLETED: u32 = 0x0;
const DROPPED: u32 = 0x1;
const CANCELLED: u32 = 0x2;
#[derive(PartialEq, Debug, Copy, Clone)]
enum ReturnCode {
Blocked,
Completed(u32),
Dropped(u32),
Cancelled(u32),
}
impl ReturnCode {
fn decode(val: u32) -> ReturnCode {
if val == BLOCKED {
return ReturnCode::Blocked;
}
let amt = val >> 4;
match val & 0xf {
COMPLETED => ReturnCode::Completed(amt),
DROPPED => ReturnCode::Dropped(amt),
CANCELLED => ReturnCode::Cancelled(amt),
_ => panic!("unknown return code {val:#x}"),
}
}
}
#[doc(hidden)]
pub fn start_task(task: impl Future<Output = ()> + 'static) -> i32 {
let state = Box::into_raw(Box::new(FutureState::new(Box::pin(task))));
unsafe {
assert!(context_get().is_null());
context_set(state.cast());
callback(EVENT_NONE, 0, 0) as i32
}
}
#[doc(hidden)]
pub unsafe fn callback(event0: u32, event1: u32, event2: u32) -> u32 {
let state = context_get().cast::<FutureState<'static>>();
assert!(!state.is_null());
unsafe {
context_set(ptr::null_mut());
}
unsafe {
let rc = (*state).callback(event0, event1, event2);
if rc == CallbackCode::Exit {
drop(Box::from_raw(state));
} else {
context_set(state.cast());
}
rtdebug!(" => (cb) {rc:?}");
rc.encode()
}
}
pub fn block_on<T: 'static>(future: impl Future<Output = T>) -> T {
let mut result = None;
let mut state = FutureState::new(Box::pin(async {
result = Some(future.await);
}));
let mut event = (EVENT_NONE, 0, 0);
loop {
match state.callback(event.0, event.1, event.2) {
CallbackCode::Exit => {
drop(state);
break result.unwrap();
}
CallbackCode::Yield => event = state.waitable_set.as_ref().unwrap().poll(),
CallbackCode::Wait(_) => event = state.waitable_set.as_ref().unwrap().wait(),
}
}
}
pub fn yield_blocking() -> bool {
extern_wasm! {
#[link(wasm_import_module = "$root")]
unsafe extern "C" {
#[link_name = "[thread-yield]"]
fn yield_() -> bool;
}
}
unsafe { !yield_() }
}
pub async fn yield_async() {
#[derive(Default)]
struct Yield {
yielded: bool,
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
context.waker().wake_by_ref();
Poll::Pending
}
}
}
Yield::default().await;
}
pub fn backpressure_inc() {
extern_wasm! {
#[link(wasm_import_module = "$root")]
unsafe extern "C" {
#[link_name = "[backpressure-inc]"]
fn backpressure_inc();
}
}
unsafe { backpressure_inc() }
}
pub fn backpressure_dec() {
extern_wasm! {
#[link(wasm_import_module = "$root")]
unsafe extern "C" {
#[link_name = "[backpressure-dec]"]
fn backpressure_dec();
}
}
unsafe { backpressure_dec() }
}
fn context_get() -> *mut u8 {
extern_wasm! {
#[link(wasm_import_module = "$root")]
unsafe extern "C" {
#[link_name = "[context-get-0]"]
fn get() -> *mut u8;
}
}
unsafe { get() }
}
unsafe fn context_set(value: *mut u8) {
extern_wasm! {
#[link(wasm_import_module = "$root")]
unsafe extern "C" {
#[link_name = "[context-set-0]"]
fn set(value: *mut u8);
}
}
unsafe { set(value) }
}
#[doc(hidden)]
pub struct TaskCancelOnDrop {
_priv: (),
}
impl TaskCancelOnDrop {
#[doc(hidden)]
pub fn new() -> TaskCancelOnDrop {
TaskCancelOnDrop { _priv: () }
}
#[doc(hidden)]
pub fn forget(self) {
mem::forget(self);
}
}
impl Drop for TaskCancelOnDrop {
fn drop(&mut self) {
extern_wasm! {
#[link(wasm_import_module = "[export]$root")]
unsafe extern "C" {
#[link_name = "[task-cancel]"]
fn cancel();
}
}
unsafe { cancel() }
}
}