use crate::component::func::{self, Func, Options};
use crate::component::{
Component, ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable,
ResourceTableError,
};
use crate::fiber::{self, StoreFiber, StoreFiberYield};
use crate::store::{StoreInner, StoreOpaque, StoreToken};
use crate::vm::component::{
CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
};
use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
use crate::{AsContext, AsContextMut, StoreContext, StoreContextMut, ValRaw};
use anyhow::{Context as _, Result, anyhow, bail};
use error_contexts::GlobalErrorContextRefCount;
use futures::channel::oneshot;
use futures::future::{self, Either, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
use std::any::Any;
use std::borrow::ToOwned;
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop, MaybeUninit};
use std::ops::DerefMut;
use std::pin::{Pin, pin};
use std::ptr::{self, NonNull};
use std::slice;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::vec::Vec;
use table::{TableDebug, TableId};
use wasmtime_environ::component::{
CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
RuntimeComponentInstanceIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex,
TypeComponentLocalErrorContextTableIndex, TypeFutureTableIndex, TypeStreamTableIndex,
TypeTupleIndex,
};
pub use abort::JoinHandle;
pub use futures_and_streams::{
Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
};
pub(crate) use futures_and_streams::{
ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,
};
mod abort;
mod error_contexts;
mod futures_and_streams;
mod table;
pub(crate) mod tls;
const BLOCKED: u32 = 0xffff_ffff;
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub enum Status {
Starting = 0,
Started = 1,
Returned = 2,
StartCancelled = 3,
ReturnCancelled = 4,
}
impl Status {
pub fn pack(self, waitable: Option<u32>) -> u32 {
assert!(matches!(self, Status::Returned) == waitable.is_none());
let waitable = waitable.unwrap_or(0);
assert!(waitable < (1 << 28));
(waitable << 4) | (self as u32)
}
}
#[derive(Clone, Copy, Debug)]
enum Event {
None,
Cancelled,
Subtask {
status: Status,
},
StreamRead {
code: ReturnCode,
pending: Option<(TypeStreamTableIndex, u32)>,
},
StreamWrite {
code: ReturnCode,
pending: Option<(TypeStreamTableIndex, u32)>,
},
FutureRead {
code: ReturnCode,
pending: Option<(TypeFutureTableIndex, u32)>,
},
FutureWrite {
code: ReturnCode,
pending: Option<(TypeFutureTableIndex, u32)>,
},
}
impl Event {
fn parts(self) -> (u32, u32) {
const EVENT_NONE: u32 = 0;
const EVENT_SUBTASK: u32 = 1;
const EVENT_STREAM_READ: u32 = 2;
const EVENT_STREAM_WRITE: u32 = 3;
const EVENT_FUTURE_READ: u32 = 4;
const EVENT_FUTURE_WRITE: u32 = 5;
const EVENT_CANCELLED: u32 = 6;
match self {
Event::None => (EVENT_NONE, 0),
Event::Cancelled => (EVENT_CANCELLED, 0),
Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
}
}
}
mod callback_code {
pub const EXIT: u32 = 0;
pub const YIELD: u32 = 1;
pub const WAIT: u32 = 2;
pub const POLL: u32 = 3;
}
const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
store: StoreContextMut<'a, T>,
get_data: fn(&mut T) -> D::Data<'_>,
instance: Option<Instance>,
}
impl<'a, T, D> Access<'a, T, D>
where
D: HasData + ?Sized,
T: 'static,
{
pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
Self {
store,
get_data,
instance: None,
}
}
pub fn data_mut(&mut self) -> &mut T {
self.store.data_mut()
}
pub fn get(&mut self) -> D::Data<'_> {
(self.get_data)(self.data_mut())
}
pub fn spawn(&mut self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
where
T: 'static,
{
let accessor = Accessor {
get_data: self.get_data,
instance: self.instance,
token: StoreToken::new(self.store.as_context_mut()),
};
self.instance
.unwrap()
.spawn_with_accessor(self.store.as_context_mut(), accessor, task)
}
pub fn instance(&self) -> Instance {
self.instance.unwrap()
}
}
impl<'a, T, D> AsContext for Access<'a, T, D>
where
D: HasData + ?Sized,
T: 'static,
{
type Data = T;
fn as_context(&self) -> StoreContext<'_, T> {
self.store.as_context()
}
}
impl<'a, T, D> AsContextMut for Access<'a, T, D>
where
D: HasData + ?Sized,
T: 'static,
{
fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
self.store.as_context_mut()
}
}
pub struct Accessor<T: 'static, D = HasSelf<T>>
where
D: HasData + ?Sized,
{
token: StoreToken<T>,
get_data: fn(&mut T) -> D::Data<'_>,
instance: Option<Instance>,
}
pub trait AsAccessor {
type Data: 'static;
type AccessorData: HasData + ?Sized;
fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
}
impl<T: AsAccessor + ?Sized> AsAccessor for &T {
type Data = T::Data;
type AccessorData = T::AccessorData;
fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
T::as_accessor(self)
}
}
impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
type Data = T;
type AccessorData = D;
fn as_accessor(&self) -> &Accessor<T, D> {
self
}
}
const _: () = {
const fn assert<T: Send + Sync>() {}
assert::<Accessor<UnsafeCell<u32>>>();
};
impl<T> Accessor<T> {
pub(crate) fn new(token: StoreToken<T>, instance: Option<Instance>) -> Self {
Self {
token,
get_data: |x| x,
instance,
}
}
}
impl<T, D> Accessor<T, D>
where
D: HasData + ?Sized,
{
pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
tls::get(|vmstore| {
fun(Access {
store: self.token.as_context_mut(vmstore),
get_data: self.get_data,
instance: self.instance,
})
})
}
pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
self.get_data
}
pub fn with_getter<D2: HasData>(
&self,
get_data: fn(&mut T) -> D2::Data<'_>,
) -> Accessor<T, D2> {
Accessor {
token: self.token,
get_data,
instance: self.instance,
}
}
pub fn spawn(&self, task: impl AccessorTask<T, D, Result<()>>) -> JoinHandle
where
T: 'static,
{
let instance = self.instance.unwrap();
let accessor = self.clone_for_spawn();
self.with(|mut access| {
instance.spawn_with_accessor(access.as_context_mut(), accessor, task)
})
}
pub fn instance(&self) -> Instance {
self.instance.unwrap()
}
fn clone_for_spawn(&self) -> Self {
Self {
token: self.token,
get_data: self.get_data,
instance: self.instance,
}
}
}
pub trait AccessorTask<T, D, R>: Send + 'static
where
D: HasData + ?Sized,
{
fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = R> + Send;
}
enum CallerInfo {
Async {
params: Vec<ValRaw>,
has_result: bool,
},
Sync {
params: Vec<ValRaw>,
result_count: u32,
},
}
enum WaitMode {
Fiber(StoreFiber<'static>),
Callback,
}
#[derive(Debug)]
enum SuspendReason {
Waiting {
set: TableId<WaitableSet>,
task: TableId<GuestTask>,
},
NeedWork,
Yielding { task: TableId<GuestTask> },
}
enum GuestCallKind {
DeliverEvent {
set: Option<TableId<WaitableSet>>,
},
Start(Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>),
}
impl fmt::Debug for GuestCallKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::DeliverEvent { set } => f.debug_struct("DeliverEvent").field("set", set).finish(),
Self::Start(_) => f.debug_tuple("Start").finish(),
}
}
}
#[derive(Debug)]
struct GuestCall {
task: TableId<GuestTask>,
kind: GuestCallKind,
}
impl GuestCall {
fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
let task_instance = state.get_mut(self.task)?.instance;
let state = state.instance_state(task_instance);
let ready = match &self.kind {
GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
GuestCallKind::Start(_) => !(state.do_not_enter || state.backpressure > 0),
};
log::trace!(
"call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
state.do_not_enter,
state.backpressure
);
Ok(ready)
}
}
enum WorkerItem {
GuestCall(GuestCall),
Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
}
#[derive(Debug)]
struct PollParams {
task: TableId<GuestTask>,
set: TableId<WaitableSet>,
}
enum WorkItem {
PushFuture(AlwaysMut<HostTaskFuture>),
ResumeFiber(StoreFiber<'static>),
GuestCall(GuestCall),
Poll(PollParams),
WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send>>),
}
impl fmt::Debug for WorkItem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
}
}
}
impl ComponentInstance {
fn handle_callback_code(
mut self: Pin<&mut Self>,
guest_task: TableId<GuestTask>,
runtime_instance: RuntimeComponentInstanceIndex,
code: u32,
initial_call: bool,
) -> Result<()> {
let (code, set) = unpack_callback_code(code);
log::trace!("received callback code from {guest_task:?}: {code} (set: {set})");
let state = self.as_mut().concurrent_state_mut();
let task = state.get_mut(guest_task)?;
if task.lift_result.is_some() {
if code == callback_code::EXIT {
return Err(anyhow!(crate::Trap::NoAsyncResult));
}
if initial_call {
Waitable::Guest(guest_task).set_event(
state,
Some(Event::Subtask {
status: Status::Started,
}),
)?;
}
}
let get_set = |instance: Pin<&mut Self>, handle| {
if handle == 0 {
bail!("invalid waitable-set handle");
}
let set = instance.guest_tables().0[runtime_instance].waitable_set_rep(handle)?;
Ok(TableId::<WaitableSet>::new(set))
};
match code {
callback_code::EXIT => {
let task = state.get_mut(guest_task)?;
match &task.caller {
Caller::Host {
remove_task_automatically,
..
} => {
if *remove_task_automatically {
log::trace!("handle_callback_code will delete task {guest_task:?}");
Waitable::Guest(guest_task).delete_from(state)?;
}
}
Caller::Guest { .. } => {
task.exited = true;
task.callback = None;
}
}
}
callback_code::YIELD => {
let task = state.get_mut(guest_task)?;
assert!(task.event.is_none());
task.event = Some(Event::None);
state.push_low_priority(WorkItem::GuestCall(GuestCall {
task: guest_task,
kind: GuestCallKind::DeliverEvent { set: None },
}));
}
callback_code::WAIT | callback_code::POLL => {
let set = get_set(self.as_mut(), set)?;
let state = self.concurrent_state_mut();
if state.get_mut(guest_task)?.event.is_some()
|| !state.get_mut(set)?.ready.is_empty()
{
state.push_high_priority(WorkItem::GuestCall(GuestCall {
task: guest_task,
kind: GuestCallKind::DeliverEvent { set: Some(set) },
}));
} else {
match code {
callback_code::POLL => {
state.push_low_priority(WorkItem::Poll(PollParams {
task: guest_task,
set,
}));
}
callback_code::WAIT => {
let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
assert!(old.is_none());
let old = state
.get_mut(set)?
.waiting
.insert(guest_task, WaitMode::Callback);
assert!(old.is_none());
}
_ => unreachable!(),
}
}
}
_ => bail!("unsupported callback code: {code}"),
}
Ok(())
}
fn get_event(
mut self: Pin<&mut Self>,
guest_task: TableId<GuestTask>,
set: Option<TableId<WaitableSet>>,
cancellable: bool,
) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
let state = self.as_mut().concurrent_state_mut();
if let Some(event) = state.get_mut(guest_task)?.event.take() {
log::trace!("deliver event {event:?} to {guest_task:?}");
if cancellable || !matches!(event, Event::Cancelled) {
return Ok(Some((event, None)));
} else {
state.get_mut(guest_task)?.event = Some(event);
}
}
Ok(
if let Some((set, waitable)) = set
.and_then(|set| {
state
.get_mut(set)
.map(|v| v.ready.pop_first().map(|v| (set, v)))
.transpose()
})
.transpose()?
{
let common = waitable.common(state)?;
let handle = common.handle.unwrap();
let event = common.event.take().unwrap();
log::trace!(
"deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
);
waitable.on_delivery(self, event);
Some((event, Some((waitable, handle))))
} else {
None
},
)
}
pub(crate) fn waitable_set_new(
mut self: Pin<&mut Self>,
caller_instance: RuntimeComponentInstanceIndex,
) -> Result<u32> {
let set = self
.as_mut()
.concurrent_state_mut()
.push(WaitableSet::default())?;
let handle = self.guest_tables().0[caller_instance].waitable_set_insert(set.rep())?;
log::trace!("new waitable set {set:?} (handle {handle})");
Ok(handle)
}
pub(crate) fn waitable_set_drop(
mut self: Pin<&mut Self>,
caller_instance: RuntimeComponentInstanceIndex,
set: u32,
) -> Result<()> {
let rep = self.as_mut().guest_tables().0[caller_instance].waitable_set_remove(set)?;
log::trace!("drop waitable set {rep} (handle {set})");
let set = self
.concurrent_state_mut()
.delete(TableId::<WaitableSet>::new(rep))?;
if !set.waiting.is_empty() {
bail!("cannot drop waitable set with waiters");
}
Ok(())
}
pub(crate) fn waitable_join(
mut self: Pin<&mut Self>,
caller_instance: RuntimeComponentInstanceIndex,
waitable_handle: u32,
set_handle: u32,
) -> Result<()> {
let waitable = Waitable::from_instance(self.as_mut(), caller_instance, waitable_handle)?;
let set = if set_handle == 0 {
None
} else {
let set =
self.as_mut().guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
Some(TableId::<WaitableSet>::new(set))
};
log::trace!(
"waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
);
waitable.join(self.concurrent_state_mut(), set)
}
pub(crate) fn subtask_drop(
mut self: Pin<&mut Self>,
caller_instance: RuntimeComponentInstanceIndex,
task_id: u32,
) -> Result<()> {
self.as_mut().waitable_join(caller_instance, task_id, 0)?;
let (rep, is_host) =
self.as_mut().guest_tables().0[caller_instance].subtask_remove(task_id)?;
let concurrent_state = self.concurrent_state_mut();
let (waitable, expected_caller_instance, delete) = if is_host {
let id = TableId::<HostTask>::new(rep);
let task = concurrent_state.get_mut(id)?;
if task.join_handle.is_some() {
bail!("cannot drop a subtask which has not yet resolved");
}
(Waitable::Host(id), task.caller_instance, true)
} else {
let id = TableId::<GuestTask>::new(rep);
let task = concurrent_state.get_mut(id)?;
if task.lift_result.is_some() {
bail!("cannot drop a subtask which has not yet resolved");
}
if let Caller::Guest { instance, .. } = &task.caller {
(Waitable::Guest(id), *instance, task.exited)
} else {
unreachable!()
}
};
waitable.common(concurrent_state)?.handle = None;
if waitable.take_event(concurrent_state)?.is_some() {
bail!("cannot drop a subtask with an undelivered event");
}
if delete {
waitable.delete_from(concurrent_state)?;
}
assert_eq!(expected_caller_instance, caller_instance);
log::trace!("subtask_drop {waitable:?} (handle {task_id})");
Ok(())
}
}
impl Instance {
#[doc(hidden)]
pub fn assert_concurrent_state_empty(&self, mut store: impl AsContextMut) {
let mut instance = self.id().get_mut(store.as_context_mut().0);
assert!(
instance
.as_mut()
.guest_tables()
.0
.iter()
.all(|(_, table)| table.is_empty())
);
let state = instance.concurrent_state_mut();
assert!(
state.table.get_mut().is_empty(),
"non-empty table: {:?}",
state.table.get_mut()
);
assert!(state.high_priority.is_empty());
assert!(state.low_priority.is_empty());
assert!(state.guest_task.is_none());
assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
assert!(
state
.instance_states
.iter()
.all(|(_, state)| state.pending.is_empty())
);
assert!(state.global_error_context_ref_counts.is_empty());
}
pub async fn run_concurrent<T, R>(
self,
mut store: impl AsContextMut<Data = T>,
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
) -> Result<R>
where
T: Send + 'static,
{
check_recursive_run();
let mut store = store.as_context_mut();
let token = StoreToken::new(store.as_context_mut());
struct Dropper<'a, T: 'static, V> {
store: StoreContextMut<'a, T>,
value: ManuallyDrop<V>,
}
impl<'a, T, V> Drop for Dropper<'a, T, V> {
fn drop(&mut self) {
tls::set(self.store.0, || {
unsafe { ManuallyDrop::drop(&mut self.value) }
});
}
}
let accessor = &Accessor::new(token, Some(self));
let dropper = &mut Dropper {
store,
value: ManuallyDrop::new(fun(accessor)),
};
let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
self.poll_until(dropper.store.as_context_mut(), future)
.await
}
pub fn spawn<U: 'static>(
self,
mut store: impl AsContextMut<Data = U>,
task: impl AccessorTask<U, HasSelf<U>, Result<()>>,
) -> JoinHandle {
let mut store = store.as_context_mut();
let accessor = Accessor::new(StoreToken::new(store.as_context_mut()), Some(self));
self.spawn_with_accessor(store, accessor, task)
}
fn spawn_with_accessor<T, D>(
self,
mut store: StoreContextMut<T>,
accessor: Accessor<T, D>,
task: impl AccessorTask<T, D, Result<()>>,
) -> JoinHandle
where
T: 'static,
D: HasData + ?Sized,
{
let store = store.as_context_mut();
let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
self.concurrent_state_mut(store.0)
.push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
handle
}
async fn poll_until<T, R>(
self,
mut store: StoreContextMut<'_, T>,
mut future: Pin<&mut impl Future<Output = R>>,
) -> Result<R>
where
T: Send,
{
loop {
let mut futures = self
.concurrent_state_mut(store.0)
.futures
.get_mut()
.take()
.unwrap();
let mut next = pin!(futures.next());
let result = future::poll_fn(|cx| {
if let Poll::Ready(value) = self.set_tls(store.0, || future.as_mut().poll(cx)) {
return Poll::Ready(Ok(Either::Left(value)));
}
let next = match self.set_tls(store.0, || next.as_mut().poll(cx)) {
Poll::Ready(Some(output)) => {
match output {
Err(e) => return Poll::Ready(Err(e)),
Ok(()) => {}
}
Poll::Ready(true)
}
Poll::Ready(None) => Poll::Ready(false),
Poll::Pending => Poll::Pending,
};
let mut instance = self.id().get_mut(store.0);
let state = instance.as_mut().concurrent_state_mut();
let ready = mem::take(&mut state.high_priority);
let ready = if ready.is_empty() {
let ready = mem::take(&mut state.low_priority);
if ready.is_empty() {
return match next {
Poll::Ready(true) => {
Poll::Ready(Ok(Either::Right(Vec::new())))
}
Poll::Ready(false) => {
if let Poll::Ready(value) =
self.set_tls(store.0, || future.as_mut().poll(cx))
{
Poll::Ready(Ok(Either::Left(value)))
} else {
Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
}
}
Poll::Pending => Poll::Pending,
};
} else {
ready
}
} else {
ready
};
Poll::Ready(Ok(Either::Right(ready)))
})
.await;
*self.concurrent_state_mut(store.0).futures.get_mut() = Some(futures);
match result? {
Either::Left(value) => break Ok(value),
Either::Right(ready) => {
struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
store: StoreContextMut<'a, T>,
ready: I,
}
impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
fn drop(&mut self) {
while let Some(item) = self.ready.next() {
match item {
WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
WorkItem::PushFuture(future) => {
tls::set(self.store.0, move || drop(future))
}
_ => {}
}
}
}
}
let mut dispose = Dispose {
store: store.as_context_mut(),
ready: ready.into_iter(),
};
while let Some(item) = dispose.ready.next() {
self.handle_work_item(dispose.store.as_context_mut(), item)
.await?;
}
}
}
}
}
async fn handle_work_item<T: Send>(
self,
store: StoreContextMut<'_, T>,
item: WorkItem,
) -> Result<()> {
log::trace!("handle work item {item:?}");
match item {
WorkItem::PushFuture(future) => {
self.concurrent_state_mut(store.0)
.futures
.get_mut()
.as_mut()
.unwrap()
.push(future.into_inner());
}
WorkItem::ResumeFiber(fiber) => {
self.resume_fiber(store.0, fiber).await?;
}
WorkItem::GuestCall(call) => {
let state = self.concurrent_state_mut(store.0);
if call.is_ready(state)? {
self.run_on_worker(store, WorkerItem::GuestCall(call))
.await?;
} else {
let task = state.get_mut(call.task)?;
if !task.starting_sent {
task.starting_sent = true;
if let GuestCallKind::Start(_) = &call.kind {
Waitable::Guest(call.task).set_event(
state,
Some(Event::Subtask {
status: Status::Starting,
}),
)?;
}
}
let runtime_instance = state.get_mut(call.task)?.instance;
state
.instance_state(runtime_instance)
.pending
.insert(call.task, call.kind);
}
}
WorkItem::Poll(params) => {
let state = self.concurrent_state_mut(store.0);
if state.get_mut(params.task)?.event.is_some()
|| !state.get_mut(params.set)?.ready.is_empty()
{
state.push_high_priority(WorkItem::GuestCall(GuestCall {
task: params.task,
kind: GuestCallKind::DeliverEvent {
set: Some(params.set),
},
}));
} else {
state.get_mut(params.task)?.event = Some(Event::None);
state.push_high_priority(WorkItem::GuestCall(GuestCall {
task: params.task,
kind: GuestCallKind::DeliverEvent {
set: Some(params.set),
},
}));
}
}
WorkItem::WorkerFunction(fun) => {
self.run_on_worker(store, WorkerItem::Function(fun)).await?;
}
}
Ok(())
}
async fn resume_fiber(self, store: &mut StoreOpaque, fiber: StoreFiber<'static>) -> Result<()> {
let old_task = self.concurrent_state_mut(store).guest_task;
log::trace!("resume_fiber: save current task {old_task:?}");
let fiber = fiber::resolve_or_release(store, fiber).await?;
let state = self.concurrent_state_mut(store);
state.guest_task = old_task;
log::trace!("resume_fiber: restore current task {old_task:?}");
if let Some(mut fiber) = fiber {
match state.suspend_reason.take().unwrap() {
SuspendReason::NeedWork => {
if state.worker.is_none() {
state.worker = Some(fiber);
} else {
fiber.dispose(store);
}
}
SuspendReason::Yielding { .. } => {
state.push_low_priority(WorkItem::ResumeFiber(fiber));
}
SuspendReason::Waiting { set, task } => {
let old = state
.get_mut(set)?
.waiting
.insert(task, WaitMode::Fiber(fiber));
assert!(old.is_none());
}
}
}
Ok(())
}
async fn run_on_worker<T: Send>(
self,
store: StoreContextMut<'_, T>,
item: WorkerItem,
) -> Result<()> {
let worker = if let Some(fiber) = self.concurrent_state_mut(store.0).worker.take() {
fiber
} else {
fiber::make_fiber(store.0, move |store| {
loop {
match self.concurrent_state_mut(store).worker_item.take().unwrap() {
WorkerItem::GuestCall(call) => self.handle_guest_call(store, call)?,
WorkerItem::Function(fun) => fun.into_inner()(store, self)?,
}
self.suspend(store, SuspendReason::NeedWork)?;
}
})?
};
let worker_item = &mut self.concurrent_state_mut(store.0).worker_item;
assert!(worker_item.is_none());
*worker_item = Some(item);
self.resume_fiber(store.0, worker).await
}
fn handle_guest_call(self, store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
match call.kind {
GuestCallKind::DeliverEvent { set } => {
let (event, waitable) = self
.id()
.get_mut(store)
.get_event(call.task, set, true)?
.unwrap();
let state = self.concurrent_state_mut(store);
let task = state.get_mut(call.task)?;
let runtime_instance = task.instance;
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
log::trace!(
"use callback to deliver event {event:?} to {:?} for {waitable:?}",
call.task,
);
let old_task = state.guest_task.replace(call.task);
log::trace!(
"GuestCallKind::DeliverEvent: replaced {old_task:?} with {:?} as current task",
call.task
);
self.maybe_push_call_context(store.store_opaque_mut(), call.task)?;
let state = self.concurrent_state_mut(store);
state.enter_instance(runtime_instance);
let callback = state.get_mut(call.task)?.callback.take().unwrap();
let code = callback(store, self, runtime_instance, event, handle)?;
let state = self.concurrent_state_mut(store);
state.get_mut(call.task)?.callback = Some(callback);
state.exit_instance(runtime_instance)?;
self.maybe_pop_call_context(store.store_opaque_mut(), call.task)?;
self.id().get_mut(store).handle_callback_code(
call.task,
runtime_instance,
code,
false,
)?;
self.concurrent_state_mut(store).guest_task = old_task;
log::trace!("GuestCallKind::DeliverEvent: restored {old_task:?} as current task");
}
GuestCallKind::Start(fun) => {
fun(store, self)?;
}
}
Ok(())
}
fn suspend(self, store: &mut dyn VMStore, reason: SuspendReason) -> Result<()> {
log::trace!("suspend fiber: {reason:?}");
let task = match &reason {
SuspendReason::Yielding { task } | SuspendReason::Waiting { task, .. } => Some(*task),
SuspendReason::NeedWork => None,
};
let old_guest_task = if let Some(task) = task {
self.maybe_pop_call_context(store, task)?;
self.concurrent_state_mut(store).guest_task
} else {
None
};
let suspend_reason = &mut self.concurrent_state_mut(store).suspend_reason;
assert!(suspend_reason.is_none());
*suspend_reason = Some(reason);
store.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
if let Some(task) = task {
self.concurrent_state_mut(store).guest_task = old_guest_task;
self.maybe_push_call_context(store, task)?;
}
Ok(())
}
fn maybe_push_call_context(
self,
store: &mut StoreOpaque,
guest_task: TableId<GuestTask>,
) -> Result<()> {
let task = self.concurrent_state_mut(store).get_mut(guest_task)?;
if task.lift_result.is_some() {
log::trace!("push call context for {guest_task:?}");
let call_context = task.call_context.take().unwrap();
store.component_resource_state().0.push(call_context);
}
Ok(())
}
fn maybe_pop_call_context(
self,
store: &mut StoreOpaque,
guest_task: TableId<GuestTask>,
) -> Result<()> {
if self
.concurrent_state_mut(store)
.get_mut(guest_task)?
.lift_result
.is_some()
{
log::trace!("pop call context for {guest_task:?}");
let call_context = Some(store.component_resource_state().0.pop().unwrap());
self.concurrent_state_mut(store)
.get_mut(guest_task)?
.call_context = call_context;
}
Ok(())
}
unsafe fn queue_call<T: 'static>(
self,
mut store: StoreContextMut<T>,
guest_task: TableId<GuestTask>,
callee: SendSyncPtr<VMFuncRef>,
param_count: usize,
result_count: usize,
flags: Option<InstanceFlags>,
async_: bool,
callback: Option<SendSyncPtr<VMFuncRef>>,
post_return: Option<SendSyncPtr<VMFuncRef>>,
) -> Result<()> {
unsafe fn make_call<T: 'static>(
store: StoreContextMut<T>,
guest_task: TableId<GuestTask>,
callee: SendSyncPtr<VMFuncRef>,
param_count: usize,
result_count: usize,
flags: Option<InstanceFlags>,
) -> impl FnOnce(
&mut dyn VMStore,
Instance,
) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
+ Send
+ Sync
+ 'static
+ use<T> {
let token = StoreToken::new(store);
move |store: &mut dyn VMStore, instance: Instance| {
let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
let may_enter_after_call = task.call_post_return_automatically();
let lower = task.lower_params.take().unwrap();
lower(store, instance, &mut storage[..param_count])?;
let mut store = token.as_context_mut(store);
unsafe {
if let Some(mut flags) = flags {
flags.set_may_enter(false);
}
crate::Func::call_unchecked_raw(
&mut store,
callee.as_non_null(),
NonNull::new(
&mut storage[..param_count.max(result_count)]
as *mut [MaybeUninit<ValRaw>] as _,
)
.unwrap(),
)?;
if let Some(mut flags) = flags {
flags.set_may_enter(may_enter_after_call);
}
}
Ok(storage)
}
}
let call = unsafe {
make_call(
store.as_context_mut(),
guest_task,
callee,
param_count,
result_count,
flags,
)
};
let callee_instance = self
.concurrent_state_mut(store.0)
.get_mut(guest_task)?
.instance;
let fun = if callback.is_some() {
assert!(async_);
Box::new(move |store: &mut dyn VMStore, instance: Instance| {
let old_task = instance
.concurrent_state_mut(store)
.guest_task
.replace(guest_task);
log::trace!(
"stackless call: replaced {old_task:?} with {guest_task:?} as current task"
);
instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
instance
.concurrent_state_mut(store)
.enter_instance(callee_instance);
let storage = call(store, instance)?;
instance
.concurrent_state_mut(store)
.exit_instance(callee_instance)?;
instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
let state = instance.concurrent_state_mut(store);
state.guest_task = old_task;
log::trace!("stackless call: restored {old_task:?} as current task");
let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
instance.id().get_mut(store).handle_callback_code(
guest_task,
callee_instance,
code,
true,
)?;
Ok(())
})
as Box<dyn FnOnce(&mut dyn VMStore, Instance) -> Result<()> + Send + Sync>
} else {
let token = StoreToken::new(store.as_context_mut());
Box::new(move |store: &mut dyn VMStore, instance: Instance| {
let old_task = instance
.concurrent_state_mut(store)
.guest_task
.replace(guest_task);
log::trace!(
"stackful call: replaced {old_task:?} with {guest_task:?} as current task",
);
let mut flags = instance.id().get(store).instance_flags(callee_instance);
instance.maybe_push_call_context(store.store_opaque_mut(), guest_task)?;
if !async_ {
instance
.concurrent_state_mut(store)
.enter_instance(callee_instance);
}
let storage = call(store, instance)?;
if async_ {
if instance
.concurrent_state_mut(store)
.get_mut(guest_task)?
.lift_result
.is_some()
{
return Err(anyhow!(crate::Trap::NoAsyncResult));
}
} else {
let lift = {
let state = instance.concurrent_state_mut(store);
state.exit_instance(callee_instance)?;
assert!(state.get_mut(guest_task)?.result.is_none());
state.get_mut(guest_task)?.lift_result.take().unwrap()
};
let result = (lift.lift)(store, instance, unsafe {
mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
&storage[..result_count],
)
})?;
let post_return_arg = match result_count {
0 => ValRaw::i32(0),
1 => unsafe { storage[0].assume_init() },
_ => unreachable!(),
};
if instance
.concurrent_state_mut(store)
.get_mut(guest_task)?
.call_post_return_automatically()
{
unsafe { flags.set_needs_post_return(false) }
if let Some(func) = post_return {
let mut store = token.as_context_mut(store);
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
func.as_non_null(),
slice::from_ref(&post_return_arg).into(),
)?;
}
}
unsafe { flags.set_may_enter(true) }
}
instance.task_complete(
store,
guest_task,
result,
Status::Returned,
post_return_arg,
)?;
}
instance.maybe_pop_call_context(store.store_opaque_mut(), guest_task)?;
let task = instance.concurrent_state_mut(store).get_mut(guest_task)?;
match &task.caller {
Caller::Host {
remove_task_automatically,
..
} => {
if *remove_task_automatically {
Waitable::Guest(guest_task)
.delete_from(instance.concurrent_state_mut(store))?;
}
}
Caller::Guest { .. } => {
task.exited = true;
}
}
Ok(())
})
};
self.concurrent_state_mut(store.0)
.push_high_priority(WorkItem::GuestCall(GuestCall {
task: guest_task,
kind: GuestCallKind::Start(fun),
}));
Ok(())
}
unsafe fn prepare_call<T: 'static>(
self,
mut store: StoreContextMut<T>,
start: *mut VMFuncRef,
return_: *mut VMFuncRef,
caller_instance: RuntimeComponentInstanceIndex,
callee_instance: RuntimeComponentInstanceIndex,
task_return_type: TypeTupleIndex,
memory: *mut VMMemoryDefinition,
string_encoding: u8,
caller_info: CallerInfo,
) -> Result<()> {
enum ResultInfo {
Heap { results: u32 },
Stack { result_count: u32 },
}
let result_info = match &caller_info {
CallerInfo::Async {
has_result: true,
params,
} => ResultInfo::Heap {
results: params.last().unwrap().get_u32(),
},
CallerInfo::Async {
has_result: false, ..
} => ResultInfo::Stack { result_count: 0 },
CallerInfo::Sync {
result_count,
params,
} if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
results: params.last().unwrap().get_u32(),
},
CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
result_count: *result_count,
},
};
let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
let start = SendSyncPtr::new(NonNull::new(start).unwrap());
let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
let token = StoreToken::new(store.as_context_mut());
let state = self.concurrent_state_mut(store.0);
let old_task = state.guest_task.take();
let new_task = GuestTask::new(
state,
Box::new(move |store, instance, dst| {
let mut store = token.as_context_mut(store);
assert!(dst.len() <= MAX_FLAT_PARAMS);
let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
let count = match caller_info {
CallerInfo::Async { params, has_result } => {
let params = ¶ms[..params.len() - usize::from(has_result)];
for (param, src) in params.iter().zip(&mut src) {
src.write(*param);
}
params.len()
}
CallerInfo::Sync { params, .. } => {
for (param, src) in params.iter().zip(&mut src) {
src.write(*param);
}
params.len()
}
};
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
start.as_non_null(),
NonNull::new(
&mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
)
.unwrap(),
)?;
}
dst.copy_from_slice(&src[..dst.len()]);
let state = instance.concurrent_state_mut(store.0);
let task = state.guest_task.unwrap();
Waitable::Guest(task).set_event(
state,
Some(Event::Subtask {
status: Status::Started,
}),
)?;
Ok(())
}),
LiftResult {
lift: Box::new(move |store, instance, src| {
let mut store = token.as_context_mut(store);
let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
my_src.push(ValRaw::u32(*results));
}
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
return_.as_non_null(),
my_src.as_mut_slice().into(),
)?;
}
let state = instance.concurrent_state_mut(store.0);
let task = state.guest_task.unwrap();
if sync_caller {
state.get_mut(task)?.sync_result =
Some(if let ResultInfo::Stack { result_count } = &result_info {
match result_count {
0 => None,
1 => Some(my_src[0]),
_ => unreachable!(),
}
} else {
None
});
}
Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
}),
ty: task_return_type,
memory: NonNull::new(memory).map(SendSyncPtr::new),
string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
},
Caller::Guest {
task: old_task.unwrap(),
instance: caller_instance,
},
None,
callee_instance,
)?;
let guest_task = state.push(new_task)?;
if let Some(old_task) = old_task {
if !state.may_enter(guest_task) {
bail!(crate::Trap::CannotEnterComponent);
}
state.get_mut(old_task)?.subtasks.insert(guest_task);
};
state.guest_task = Some(guest_task);
log::trace!("pushed {guest_task:?} as current task; old task was {old_task:?}");
Ok(())
}
unsafe fn call_callback<T>(
self,
mut store: StoreContextMut<T>,
callee_instance: RuntimeComponentInstanceIndex,
function: SendSyncPtr<VMFuncRef>,
event: Event,
handle: u32,
may_enter_after_call: bool,
) -> Result<u32> {
let mut flags = self.id().get(store.0).instance_flags(callee_instance);
let (ordinal, result) = event.parts();
let params = &mut [
ValRaw::u32(ordinal),
ValRaw::u32(handle),
ValRaw::u32(result),
];
unsafe {
flags.set_may_enter(false);
crate::Func::call_unchecked_raw(
&mut store,
function.as_non_null(),
params.as_mut_slice().into(),
)?;
flags.set_may_enter(may_enter_after_call);
}
Ok(params[0].get_u32())
}
unsafe fn start_call<T: 'static>(
self,
mut store: StoreContextMut<T>,
callback: *mut VMFuncRef,
post_return: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
result_count: u32,
flags: u32,
storage: Option<&mut [MaybeUninit<ValRaw>]>,
) -> Result<u32> {
let token = StoreToken::new(store.as_context_mut());
let async_caller = storage.is_none();
let state = self.concurrent_state_mut(store.0);
let guest_task = state.guest_task.unwrap();
let may_enter_after_call = state.get_mut(guest_task)?.call_post_return_automatically();
let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
let param_count = usize::try_from(param_count).unwrap();
assert!(param_count <= MAX_FLAT_PARAMS);
let result_count = usize::try_from(result_count).unwrap();
assert!(result_count <= MAX_FLAT_RESULTS);
let task = state.get_mut(guest_task)?;
if !callback.is_null() {
let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
task.callback = Some(Box::new(
move |store, instance, runtime_instance, event, handle| {
let store = token.as_context_mut(store);
unsafe {
instance.call_callback::<T>(
store,
runtime_instance,
callback,
event,
handle,
may_enter_after_call,
)
}
},
));
}
let Caller::Guest {
task: caller,
instance: runtime_instance,
} = &task.caller
else {
unreachable!()
};
let caller = *caller;
let caller_instance = *runtime_instance;
let callee_instance = task.instance;
let instance_flags = if callback.is_null() {
None
} else {
Some(self.id().get(store.0).instance_flags(callee_instance))
};
unsafe {
self.queue_call(
store.as_context_mut(),
guest_task,
callee,
param_count,
result_count,
instance_flags,
(flags & START_FLAG_ASYNC_CALLEE) != 0,
NonNull::new(callback).map(SendSyncPtr::new),
NonNull::new(post_return).map(SendSyncPtr::new),
)?;
}
let state = self.concurrent_state_mut(store.0);
let guest_waitable = Waitable::Guest(guest_task);
let old_set = guest_waitable.common(state)?.set;
let set = state.get_mut(caller)?.sync_call_set;
guest_waitable.join(state, Some(set))?;
let (status, waitable) = loop {
self.suspend(store.0, SuspendReason::Waiting { set, task: caller })?;
let state = self.concurrent_state_mut(store.0);
let event = guest_waitable.take_event(state)?;
let Some(Event::Subtask { status }) = event else {
unreachable!();
};
log::trace!("status {status:?} for {guest_task:?}");
if status == Status::Returned {
break (status, None);
} else if async_caller {
let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
.subtask_insert_guest(guest_task.rep())?;
self.concurrent_state_mut(store.0)
.get_mut(guest_task)?
.common
.handle = Some(handle);
break (status, Some(handle));
} else {
}
};
let state = self.concurrent_state_mut(store.0);
guest_waitable.join(state, old_set)?;
if let Some(storage) = storage {
let task = state.get_mut(guest_task)?;
if let Some(result) = task.sync_result.take() {
if let Some(result) = result {
storage[0] = MaybeUninit::new(result);
}
if task.exited {
Waitable::Guest(guest_task).delete_from(state)?;
}
} else {
return Err(anyhow!(crate::Trap::NoAsyncResult));
}
}
state.guest_task = Some(caller);
log::trace!("popped current task {guest_task:?}; new task is {caller:?}");
Ok(status.pack(waitable))
}
pub(crate) fn wrap_call<T, F, R>(
self,
store: StoreContextMut<T>,
closure: F,
) -> impl Future<Output = Result<R>> + 'static
where
T: 'static,
F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
let token = StoreToken::new(store);
async move {
let mut accessor = Accessor::new(token, Some(self));
closure(&mut accessor).await
}
}
pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
self,
mut store: StoreContextMut<T>,
future: impl Future<Output = Result<R>> + Send + 'static,
caller_instance: RuntimeComponentInstanceIndex,
lower: impl FnOnce(StoreContextMut<T>, Instance, R) -> Result<()> + Send + 'static,
) -> Result<Option<u32>> {
let token = StoreToken::new(store.as_context_mut());
let state = self.concurrent_state_mut(store.0);
let caller = state.guest_task.unwrap();
let (join_handle, future) = JoinHandle::run(async move {
let mut future = pin!(future);
let mut call_context = None;
future::poll_fn(move |cx| {
tls::get(|store| {
if let Some(call_context) = call_context.take() {
token
.as_context_mut(store)
.0
.component_resource_state()
.0
.push(call_context);
}
});
let result = future.as_mut().poll(cx);
if result.is_pending() {
tls::get(|store| {
call_context = Some(
token
.as_context_mut(store)
.0
.component_resource_state()
.0
.pop()
.unwrap(),
);
});
}
result
})
.await
});
let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
log::trace!("new host task child of {caller:?}: {task:?}");
let mut future = Box::pin(future);
let poll = self.set_tls(store.0, || {
future
.as_mut()
.poll(&mut Context::from_waker(&Waker::noop()))
});
Ok(match poll {
Poll::Ready(None) => unreachable!(),
Poll::Ready(Some(result)) => {
lower(store.as_context_mut(), self, result?)?;
log::trace!("delete host task {task:?} (already ready)");
self.concurrent_state_mut(store.0).delete(task)?;
None
}
Poll::Pending => {
let future = Box::pin(async move {
let result = match future.await {
Some(result) => result?,
None => return Ok(()),
};
tls::get(move |store| {
self.concurrent_state_mut(store).push_high_priority(
WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store, _| {
lower(token.as_context_mut(store), self, result)?;
let state = self.concurrent_state_mut(store);
state.get_mut(task)?.join_handle.take();
Waitable::Host(task).set_event(
state,
Some(Event::Subtask {
status: Status::Returned,
}),
)
}))),
);
Ok(())
})
});
self.concurrent_state_mut(store.0).push_future(future);
let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
.subtask_insert_host(task.rep())?;
self.concurrent_state_mut(store.0)
.get_mut(task)?
.common
.handle = Some(handle);
log::trace!(
"assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
);
Some(handle)
}
})
}
pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
self,
store: &mut dyn VMStore,
future: impl Future<Output = Result<R>> + Send + 'static,
caller_instance: RuntimeComponentInstanceIndex,
) -> Result<R> {
let state = self.concurrent_state_mut(store);
let Some(caller) = state.guest_task else {
return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
Poll::Ready(result) => result,
Poll::Pending => {
unreachable!()
}
};
};
let old_result = state
.get_mut(caller)
.with_context(|| format!("bad handle: {caller:?}"))?
.result
.take();
let task = state.push(HostTask::new(caller_instance, None))?;
log::trace!("new host task child of {caller:?}: {task:?}");
let mut future = Box::pin(async move {
let result = future.await?;
tls::get(move |store| {
let state = self.concurrent_state_mut(store);
state.get_mut(caller)?.result = Some(Box::new(result) as _);
Waitable::Host(task).set_event(
state,
Some(Event::Subtask {
status: Status::Returned,
}),
)?;
Ok(())
})
}) as HostTaskFuture;
let poll = self.set_tls(store, || {
future
.as_mut()
.poll(&mut Context::from_waker(&Waker::noop()))
});
match poll {
Poll::Ready(result) => {
result?;
log::trace!("delete host task {task:?} (already ready)");
self.concurrent_state_mut(store).delete(task)?;
}
Poll::Pending => {
let state = self.concurrent_state_mut(store);
state.push_future(future);
let set = state.get_mut(caller)?.sync_call_set;
Waitable::Host(task).join(state, Some(set))?;
self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
}
}
Ok(*mem::replace(
&mut self.concurrent_state_mut(store).get_mut(caller)?.result,
old_result,
)
.unwrap()
.downcast()
.unwrap())
}
pub(crate) fn task_return(
self,
store: &mut dyn VMStore,
ty: TypeTupleIndex,
options: OptionsIndex,
storage: &[ValRaw],
) -> Result<()> {
let state = self.concurrent_state_mut(store);
let CanonicalOptions {
string_encoding,
data_model,
..
} = *state.options(options);
let guest_task = state.guest_task.unwrap();
let lift = state
.get_mut(guest_task)?
.lift_result
.take()
.ok_or_else(|| {
anyhow!("`task.return` or `task.cancel` called more than once for current task")
})?;
assert!(state.get_mut(guest_task)?.result.is_none());
let invalid = ty != lift.ty
|| string_encoding != lift.string_encoding
|| match data_model {
CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
Some(memory) => {
let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
let actual = self.id().get(store).runtime_memory(memory);
expected != actual
}
None => false,
},
CanonicalOptionsDataModel::Gc { .. } => true,
};
if invalid {
bail!("invalid `task.return` signature and/or options for current task");
}
log::trace!("task.return for {guest_task:?}");
let result = (lift.lift)(store, self, storage)?;
self.task_complete(store, guest_task, result, Status::Returned, ValRaw::i32(0))
}
pub(crate) fn task_cancel(
self,
store: &mut dyn VMStore,
_caller_instance: RuntimeComponentInstanceIndex,
) -> Result<()> {
let state = self.concurrent_state_mut(store);
let guest_task = state.guest_task.unwrap();
let task = state.get_mut(guest_task)?;
if !task.cancel_sent {
bail!("`task.cancel` called by task which has not been cancelled")
}
_ = task.lift_result.take().ok_or_else(|| {
anyhow!("`task.return` or `task.cancel` called more than once for current task")
})?;
assert!(task.result.is_none());
log::trace!("task.cancel for {guest_task:?}");
self.task_complete(
store,
guest_task,
Box::new(DummyResult),
Status::ReturnCancelled,
ValRaw::i32(0),
)
}
fn task_complete(
self,
store: &mut dyn VMStore,
guest_task: TableId<GuestTask>,
result: Box<dyn Any + Send + Sync>,
status: Status,
post_return_arg: ValRaw,
) -> Result<()> {
if self
.concurrent_state_mut(store)
.get_mut(guest_task)?
.call_post_return_automatically()
{
let (calls, host_table, _, instance) = store
.store_opaque_mut()
.component_resource_state_with_instance(self);
ResourceTables {
calls,
host_table: Some(host_table),
guest: Some(instance.guest_tables()),
}
.exit_call()?;
} else {
let function_index = self
.concurrent_state_mut(store)
.get_mut(guest_task)?
.function_index
.unwrap();
self.id()
.get_mut(store)
.post_return_arg_set(function_index, post_return_arg);
}
let state = self.concurrent_state_mut(store);
let task = state.get_mut(guest_task)?;
if let Caller::Host { tx, .. } = &mut task.caller {
if let Some(tx) = tx.take() {
_ = tx.send(result);
}
} else {
task.result = Some(result);
Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
}
Ok(())
}
pub(crate) fn waitable_set_wait(
self,
store: &mut dyn VMStore,
options: OptionsIndex,
set: u32,
payload: u32,
) -> Result<u32> {
let opts = self.concurrent_state_mut(store).options(options);
let cancellable = opts.cancellable;
let caller_instance = opts.instance;
let rep =
self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
self.waitable_check(
store,
cancellable,
WaitableCheck::Wait(WaitableCheckParams {
set: TableId::new(rep),
options,
payload,
}),
)
}
pub(crate) fn waitable_set_poll(
self,
store: &mut dyn VMStore,
options: OptionsIndex,
set: u32,
payload: u32,
) -> Result<u32> {
let opts = self.concurrent_state_mut(store).options(options);
let cancellable = opts.cancellable;
let caller_instance = opts.instance;
let rep =
self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
self.waitable_check(
store,
cancellable,
WaitableCheck::Poll(WaitableCheckParams {
set: TableId::new(rep),
options,
payload,
}),
)
}
pub(crate) fn thread_yield(self, store: &mut dyn VMStore, cancellable: bool) -> Result<bool> {
self.waitable_check(store, cancellable, WaitableCheck::Yield)
.map(|_| {
if cancellable {
let state = self.concurrent_state_mut(store);
let task = state.guest_task.unwrap();
if let Some(event) = state.get_mut(task).unwrap().event.take() {
assert!(matches!(event, Event::Cancelled));
true
} else {
false
}
} else {
false
}
})
}
fn waitable_check(
self,
store: &mut dyn VMStore,
cancellable: bool,
check: WaitableCheck,
) -> Result<u32> {
let guest_task = self.concurrent_state_mut(store).guest_task.unwrap();
let (wait, set) = match &check {
WaitableCheck::Wait(params) => (true, Some(params.set)),
WaitableCheck::Poll(params) => (false, Some(params.set)),
WaitableCheck::Yield => (false, None),
};
self.suspend(store, SuspendReason::Yielding { task: guest_task })?;
log::trace!("waitable check for {guest_task:?}; set {set:?}");
let state = self.concurrent_state_mut(store);
let task = state.get_mut(guest_task)?;
if wait && task.callback.is_some() {
bail!("cannot call `task.wait` from async-lifted export with callback");
}
if wait {
let set = set.unwrap();
if (task.event.is_none()
|| (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
&& state.get_mut(set)?.ready.is_empty()
{
if cancellable {
let old = state.get_mut(guest_task)?.wake_on_cancel.replace(set);
assert!(old.is_none());
}
self.suspend(
store,
SuspendReason::Waiting {
set,
task: guest_task,
},
)?;
}
}
log::trace!("waitable check for {guest_task:?}; set {set:?}, part two");
let result = match check {
WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
let event = self.id().get_mut(store).get_event(
guest_task,
Some(params.set),
cancellable,
)?;
let (ordinal, handle, result) = if wait {
let (event, waitable) = event.unwrap();
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
let (ordinal, result) = event.parts();
(ordinal, handle, result)
} else {
if let Some((event, waitable)) = event {
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
let (ordinal, result) = event.parts();
(ordinal, handle, result)
} else {
log::trace!(
"no events ready to deliver via waitable-set.poll to {guest_task:?}; set {:?}",
params.set
);
let (ordinal, result) = Event::None.parts();
(ordinal, 0, result)
}
};
let store = store.store_opaque_mut();
let options = Options::new_index(store, self, params.options);
let ptr = func::validate_inbounds::<(u32, u32)>(
options.memory_mut(store),
&ValRaw::u32(params.payload),
)?;
options.memory_mut(store)[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
options.memory_mut(store)[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
Ok(ordinal)
}
WaitableCheck::Yield => Ok(0),
};
result
}
pub(crate) fn subtask_cancel(
self,
store: &mut dyn VMStore,
caller_instance: RuntimeComponentInstanceIndex,
async_: bool,
task_id: u32,
) -> Result<u32> {
let (rep, is_host) =
self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
let (waitable, expected_caller_instance) = if is_host {
let id = TableId::<HostTask>::new(rep);
(
Waitable::Host(id),
self.concurrent_state_mut(store)
.get_mut(id)?
.caller_instance,
)
} else {
let id = TableId::<GuestTask>::new(rep);
if let Caller::Guest { instance, .. } =
&self.concurrent_state_mut(store).get_mut(id)?.caller
{
(Waitable::Guest(id), *instance)
} else {
unreachable!()
}
};
assert_eq!(expected_caller_instance, caller_instance);
log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
let concurrent_state = self.concurrent_state_mut(store);
if let Waitable::Host(host_task) = waitable {
if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
handle.abort();
return Ok(Status::ReturnCancelled as u32);
}
} else {
let caller = concurrent_state.guest_task.unwrap();
let guest_task = TableId::<GuestTask>::new(rep);
let task = concurrent_state.get_mut(guest_task)?;
if task.lower_params.is_some() {
task.lower_params = None;
task.lift_result = None;
let callee_instance = task.instance;
let kind = concurrent_state
.instance_state(callee_instance)
.pending
.remove(&guest_task);
if kind.is_none() {
bail!("`subtask.cancel` called after terminal status delivered");
}
return Ok(Status::StartCancelled as u32);
} else if task.lift_result.is_some() {
task.cancel_sent = true;
task.event = Some(Event::Cancelled);
if let Some(set) = task.wake_on_cancel.take() {
let item = match concurrent_state
.get_mut(set)?
.waiting
.remove(&guest_task)
.unwrap()
{
WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
WaitMode::Callback => WorkItem::GuestCall(GuestCall {
task: guest_task,
kind: GuestCallKind::DeliverEvent { set: None },
}),
};
concurrent_state.push_high_priority(item);
self.suspend(store, SuspendReason::Yielding { task: caller })?;
}
let concurrent_state = self.concurrent_state_mut(store);
let task = concurrent_state.get_mut(guest_task)?;
if task.lift_result.is_some() {
if async_ {
return Ok(BLOCKED);
} else {
self.wait_for_event(store, Waitable::Guest(guest_task))?;
}
}
}
}
let event = waitable.take_event(self.concurrent_state_mut(store))?;
if let Some(Event::Subtask {
status: status @ (Status::Returned | Status::ReturnCancelled),
}) = event
{
Ok(status as u32)
} else {
bail!("`subtask.cancel` called after terminal status delivered");
}
}
fn wait_for_event(self, store: &mut dyn VMStore, waitable: Waitable) -> Result<()> {
let state = self.concurrent_state_mut(store);
let caller = state.guest_task.unwrap();
let old_set = waitable.common(state)?.set;
let set = state.get_mut(caller)?.sync_call_set;
waitable.join(state, Some(set))?;
self.suspend(store, SuspendReason::Waiting { set, task: caller })?;
let state = self.concurrent_state_mut(store);
waitable.join(state, old_set)
}
fn set_tls<R>(self, store: &mut dyn VMStore, f: impl FnOnce() -> R) -> R {
struct Reset<'a>(&'a mut dyn VMStore, Option<ComponentInstanceId>);
impl Drop for Reset<'_> {
fn drop(&mut self) {
self.0.concurrent_async_state_mut().current_instance = self.1;
}
}
let prev = mem::replace(
&mut store.concurrent_async_state_mut().current_instance,
Some(self.id().instance()),
);
let reset = Reset(store, prev);
tls::set(reset.0, f)
}
pub(crate) fn concurrent_state_mut<'a>(
&self,
store: &'a mut StoreOpaque,
) -> &'a mut ConcurrentState {
self.id().get_mut(store).concurrent_state_mut()
}
}
pub trait VMComponentAsyncStore {
unsafe fn prepare_call(
&mut self,
instance: Instance,
memory: *mut VMMemoryDefinition,
start: *mut VMFuncRef,
return_: *mut VMFuncRef,
caller_instance: RuntimeComponentInstanceIndex,
callee_instance: RuntimeComponentInstanceIndex,
task_return_type: TypeTupleIndex,
string_encoding: u8,
result_count: u32,
storage: *mut ValRaw,
storage_len: usize,
) -> Result<()>;
unsafe fn sync_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
storage: *mut MaybeUninit<ValRaw>,
storage_len: usize,
) -> Result<()>;
unsafe fn async_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
post_return: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
result_count: u32,
flags: u32,
) -> Result<u32>;
fn future_write(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32>;
fn future_read(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32>;
fn future_drop_writable(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
writer: u32,
) -> Result<()>;
fn stream_write(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn stream_read(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn flat_stream_write(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn flat_stream_read(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn stream_drop_writable(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
writer: u32,
) -> Result<()>;
fn error_context_debug_message(
&mut self,
instance: Instance,
ty: TypeComponentLocalErrorContextTableIndex,
options: OptionsIndex,
err_ctx_handle: u32,
debug_msg_address: u32,
) -> Result<()>;
}
impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
unsafe fn prepare_call(
&mut self,
instance: Instance,
memory: *mut VMMemoryDefinition,
start: *mut VMFuncRef,
return_: *mut VMFuncRef,
caller_instance: RuntimeComponentInstanceIndex,
callee_instance: RuntimeComponentInstanceIndex,
task_return_type: TypeTupleIndex,
string_encoding: u8,
result_count_or_max_if_async: u32,
storage: *mut ValRaw,
storage_len: usize,
) -> Result<()> {
let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
unsafe {
instance.prepare_call(
StoreContextMut(self),
start,
return_,
caller_instance,
callee_instance,
task_return_type,
memory,
string_encoding,
match result_count_or_max_if_async {
PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
params,
has_result: false,
},
PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
params,
has_result: true,
},
result_count => CallerInfo::Sync {
params,
result_count,
},
},
)
}
}
unsafe fn sync_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
storage: *mut MaybeUninit<ValRaw>,
storage_len: usize,
) -> Result<()> {
unsafe {
instance
.start_call(
StoreContextMut(self),
callback,
ptr::null_mut(),
callee,
param_count,
1,
START_FLAG_ASYNC_CALLEE,
Some(std::slice::from_raw_parts_mut(storage, storage_len)),
)
.map(drop)
}
}
unsafe fn async_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
post_return: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
result_count: u32,
flags: u32,
) -> Result<u32> {
unsafe {
instance.start_call(
StoreContextMut(self),
callback,
post_return,
callee,
param_count,
result_count,
flags,
None,
)
}
}
fn future_write(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32> {
instance
.guest_write(
StoreContextMut(self),
TransmitIndex::Future(ty),
options,
None,
future,
address,
1,
)
.map(|result| result.encode())
}
fn future_read(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32> {
instance
.guest_read(
StoreContextMut(self),
TransmitIndex::Future(ty),
options,
None,
future,
address,
1,
)
.map(|result| result.encode())
}
fn stream_write(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_write(
StoreContextMut(self),
TransmitIndex::Stream(ty),
options,
None,
stream,
address,
count,
)
.map(|result| result.encode())
}
fn stream_read(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_read(
StoreContextMut(self),
TransmitIndex::Stream(ty),
options,
None,
stream,
address,
count,
)
.map(|result| result.encode())
}
fn future_drop_writable(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
writer: u32,
) -> Result<()> {
instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Future(ty), writer)
}
fn flat_stream_write(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_write(
StoreContextMut(self),
TransmitIndex::Stream(ty),
options,
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
.map(|result| result.encode())
}
fn flat_stream_read(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_read(
StoreContextMut(self),
TransmitIndex::Stream(ty),
options,
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
.map(|result| result.encode())
}
fn stream_drop_writable(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
writer: u32,
) -> Result<()> {
instance.guest_drop_writable(StoreContextMut(self), TransmitIndex::Stream(ty), writer)
}
fn error_context_debug_message(
&mut self,
instance: Instance,
ty: TypeComponentLocalErrorContextTableIndex,
options: OptionsIndex,
err_ctx_handle: u32,
debug_msg_address: u32,
) -> Result<()> {
instance.error_context_debug_message(
StoreContextMut(self),
ty,
options,
err_ctx_handle,
debug_msg_address,
)
}
}
type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
struct HostTask {
common: WaitableCommon,
caller_instance: RuntimeComponentInstanceIndex,
join_handle: Option<JoinHandle>,
}
impl HostTask {
fn new(
caller_instance: RuntimeComponentInstanceIndex,
join_handle: Option<JoinHandle>,
) -> Self {
Self {
common: WaitableCommon::default(),
caller_instance,
join_handle,
}
}
}
impl TableDebug for HostTask {
fn type_name() -> &'static str {
"HostTask"
}
}
type CallbackFn = Box<
dyn Fn(&mut dyn VMStore, Instance, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
+ Send
+ Sync
+ 'static,
>;
enum Caller {
Host {
tx: Option<oneshot::Sender<LiftedResult>>,
exit_tx: Arc<oneshot::Sender<()>>,
remove_task_automatically: bool,
call_post_return_automatically: bool,
},
Guest {
task: TableId<GuestTask>,
instance: RuntimeComponentInstanceIndex,
},
}
struct LiftResult {
lift: RawLift,
ty: TypeTupleIndex,
memory: Option<SendSyncPtr<VMMemoryDefinition>>,
string_encoding: StringEncoding,
}
struct GuestTask {
common: WaitableCommon,
lower_params: Option<RawLower>,
lift_result: Option<LiftResult>,
result: Option<LiftedResult>,
callback: Option<CallbackFn>,
caller: Caller,
call_context: Option<CallContext>,
sync_result: Option<Option<ValRaw>>,
cancel_sent: bool,
starting_sent: bool,
context: [u32; 2],
subtasks: HashSet<TableId<GuestTask>>,
sync_call_set: TableId<WaitableSet>,
instance: RuntimeComponentInstanceIndex,
event: Option<Event>,
wake_on_cancel: Option<TableId<WaitableSet>>,
function_index: Option<ExportIndex>,
exited: bool,
}
impl GuestTask {
fn new(
state: &mut ConcurrentState,
lower_params: RawLower,
lift_result: LiftResult,
caller: Caller,
callback: Option<CallbackFn>,
component_instance: RuntimeComponentInstanceIndex,
) -> Result<Self> {
let sync_call_set = state.push(WaitableSet::default())?;
Ok(Self {
common: WaitableCommon::default(),
lower_params: Some(lower_params),
lift_result: Some(lift_result),
result: None,
callback,
caller,
call_context: Some(CallContext::default()),
sync_result: None,
cancel_sent: false,
starting_sent: false,
context: [0u32; 2],
subtasks: HashSet::new(),
sync_call_set,
instance: component_instance,
event: None,
wake_on_cancel: None,
function_index: None,
exited: false,
})
}
fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
if let Some(Event::Subtask {
status: Status::Returned | Status::ReturnCancelled,
}) = waitable.common(state)?.event
{
waitable.delete_from(state)?;
}
}
state.delete(self.sync_call_set)?;
match &self.caller {
Caller::Guest {
task,
instance: runtime_instance,
} => {
let task_mut = state.get_mut(*task)?;
let present = task_mut.subtasks.remove(&me);
assert!(present);
for subtask in &self.subtasks {
task_mut.subtasks.insert(*subtask);
}
for subtask in &self.subtasks {
state.get_mut(*subtask)?.caller = Caller::Guest {
task: *task,
instance: *runtime_instance,
};
}
}
Caller::Host { exit_tx, .. } => {
for subtask in &self.subtasks {
state.get_mut(*subtask)?.caller = Caller::Host {
tx: None,
exit_tx: exit_tx.clone(),
remove_task_automatically: true,
call_post_return_automatically: true,
};
}
}
}
Ok(())
}
fn call_post_return_automatically(&self) -> bool {
matches!(
self.caller,
Caller::Guest { .. }
| Caller::Host {
call_post_return_automatically: true,
..
}
)
}
}
impl TableDebug for GuestTask {
fn type_name() -> &'static str {
"GuestTask"
}
}
#[derive(Default)]
struct WaitableCommon {
event: Option<Event>,
set: Option<TableId<WaitableSet>>,
handle: Option<u32>,
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
enum Waitable {
Host(TableId<HostTask>),
Guest(TableId<GuestTask>),
Transmit(TableId<TransmitHandle>),
}
impl Waitable {
fn from_instance(
state: Pin<&mut ComponentInstance>,
caller_instance: RuntimeComponentInstanceIndex,
waitable: u32,
) -> Result<Self> {
use crate::runtime::vm::component::Waitable;
let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
Ok(match kind {
Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
})
}
fn rep(&self) -> u32 {
match self {
Self::Host(id) => id.rep(),
Self::Guest(id) => id.rep(),
Self::Transmit(id) => id.rep(),
}
}
fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
log::trace!("waitable {self:?} join set {set:?}",);
let old = mem::replace(&mut self.common(state)?.set, set);
if let Some(old) = old {
match *self {
Waitable::Host(id) => state.remove_child(id, old),
Waitable::Guest(id) => state.remove_child(id, old),
Waitable::Transmit(id) => state.remove_child(id, old),
}?;
state.get_mut(old)?.ready.remove(self);
}
if let Some(set) = set {
match *self {
Waitable::Host(id) => state.add_child(id, set),
Waitable::Guest(id) => state.add_child(id, set),
Waitable::Transmit(id) => state.add_child(id, set),
}?;
if self.common(state)?.event.is_some() {
self.mark_ready(state)?;
}
}
Ok(())
}
fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
Ok(match self {
Self::Host(id) => &mut state.get_mut(*id)?.common,
Self::Guest(id) => &mut state.get_mut(*id)?.common,
Self::Transmit(id) => &mut state.get_mut(*id)?.common,
})
}
fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
log::trace!("set event for {self:?}: {event:?}");
self.common(state)?.event = event;
self.mark_ready(state)
}
fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
let common = self.common(state)?;
let event = common.event.take();
if let Some(set) = self.common(state)?.set {
state.get_mut(set)?.ready.remove(self);
}
Ok(event)
}
fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
if let Some(set) = self.common(state)?.set {
state.get_mut(set)?.ready.insert(*self);
if let Some((task, mode)) = state.get_mut(set)?.waiting.pop_first() {
let wake_on_cancel = state.get_mut(task)?.wake_on_cancel.take();
assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
let item = match mode {
WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
WaitMode::Callback => WorkItem::GuestCall(GuestCall {
task,
kind: GuestCallKind::DeliverEvent { set: Some(set) },
}),
};
state.push_high_priority(item);
}
}
Ok(())
}
fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
match event {
Event::FutureRead {
pending: Some((ty, handle)),
..
}
| Event::FutureWrite {
pending: Some((ty, handle)),
..
} => {
let runtime_instance = instance.component().types()[ty].instance;
let (rep, state) = instance.guest_tables().0[runtime_instance]
.future_rep(ty, handle)
.unwrap();
assert_eq!(rep, self.rep());
assert_eq!(*state, TransmitLocalState::Busy);
*state = match event {
Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
_ => unreachable!(),
};
}
Event::StreamRead {
pending: Some((ty, handle)),
code,
}
| Event::StreamWrite {
pending: Some((ty, handle)),
code,
} => {
let runtime_instance = instance.component().types()[ty].instance;
let (rep, state) = instance.guest_tables().0[runtime_instance]
.stream_rep(ty, handle)
.unwrap();
assert_eq!(rep, self.rep());
assert_eq!(*state, TransmitLocalState::Busy);
let done = matches!(code, ReturnCode::Dropped(_));
*state = match event {
Event::StreamRead { .. } => TransmitLocalState::Read { done },
Event::StreamWrite { .. } => TransmitLocalState::Write { done },
_ => unreachable!(),
};
}
_ => {}
}
}
fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
match self {
Self::Host(task) => {
log::trace!("delete host task {task:?}");
state.delete(*task)?;
}
Self::Guest(task) => {
log::trace!("delete guest task {task:?}");
state.delete(*task)?.dispose(state, *task)?;
}
Self::Transmit(task) => {
state.delete(*task)?;
}
}
Ok(())
}
}
impl fmt::Debug for Waitable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Host(id) => write!(f, "{id:?}"),
Self::Guest(id) => write!(f, "{id:?}"),
Self::Transmit(id) => write!(f, "{id:?}"),
}
}
}
#[derive(Default)]
struct WaitableSet {
ready: BTreeSet<Waitable>,
waiting: BTreeMap<TableId<GuestTask>, WaitMode>,
}
impl TableDebug for WaitableSet {
fn type_name() -> &'static str {
"WaitableSet"
}
}
type RawLower = Box<
dyn FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
>;
type RawLift = Box<
dyn FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
+ Send
+ Sync,
>;
type LiftedResult = Box<dyn Any + Send + Sync>;
struct DummyResult;
pub(crate) struct AsyncState {
current_instance: Option<ComponentInstanceId>,
}
impl Default for AsyncState {
fn default() -> Self {
Self {
current_instance: None,
}
}
}
#[derive(Default)]
struct InstanceState {
backpressure: u16,
do_not_enter: bool,
pending: BTreeMap<TableId<GuestTask>, GuestCallKind>,
}
pub struct ConcurrentState {
guest_task: Option<TableId<GuestTask>>,
futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
table: AlwaysMut<ResourceTable>,
instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
high_priority: Vec<WorkItem>,
low_priority: Vec<WorkItem>,
suspend_reason: Option<SuspendReason>,
worker: Option<StoreFiber<'static>>,
worker_item: Option<WorkerItem>,
global_error_context_ref_counts:
BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
component: Component,
}
impl ConcurrentState {
pub(crate) fn new(component: &Component) -> Self {
Self {
guest_task: None,
table: AlwaysMut::new(ResourceTable::new()),
futures: AlwaysMut::new(Some(FuturesUnordered::new())),
instance_states: HashMap::new(),
high_priority: Vec::new(),
low_priority: Vec::new(),
suspend_reason: None,
worker: None,
worker_item: None,
global_error_context_ref_counts: BTreeMap::new(),
component: component.clone(),
}
}
pub(crate) fn take_fibers_and_futures(
&mut self,
fibers: &mut Vec<StoreFiber<'static>>,
futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
) {
for entry in self.table.get_mut().iter_mut() {
if let Some(set) = entry.downcast_mut::<WaitableSet>() {
for mode in mem::take(&mut set.waiting).into_values() {
if let WaitMode::Fiber(fiber) = mode {
fibers.push(fiber);
}
}
}
}
if let Some(fiber) = self.worker.take() {
fibers.push(fiber);
}
let mut take_items = |list| {
for item in mem::take(list) {
match item {
WorkItem::ResumeFiber(fiber) => {
fibers.push(fiber);
}
WorkItem::PushFuture(future) => {
self.futures
.get_mut()
.as_mut()
.unwrap()
.push(future.into_inner());
}
_ => {}
}
}
};
take_items(&mut self.high_priority);
take_items(&mut self.low_priority);
if let Some(them) = self.futures.get_mut().take() {
futures.push(them);
}
}
fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
self.instance_states.entry(instance).or_default()
}
fn push<V: Send + Sync + 'static>(
&mut self,
value: V,
) -> Result<TableId<V>, ResourceTableError> {
self.table.get_mut().push(value).map(TableId::from)
}
fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
self.table.get_mut().get_mut(&Resource::from(id))
}
pub fn add_child<T: 'static, U: 'static>(
&mut self,
child: TableId<T>,
parent: TableId<U>,
) -> Result<(), ResourceTableError> {
self.table
.get_mut()
.add_child(Resource::from(child), Resource::from(parent))
}
pub fn remove_child<T: 'static, U: 'static>(
&mut self,
child: TableId<T>,
parent: TableId<U>,
) -> Result<(), ResourceTableError> {
self.table
.get_mut()
.remove_child(Resource::from(child), Resource::from(parent))
}
fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
self.table.get_mut().delete(Resource::from(id))
}
fn push_future(&mut self, future: HostTaskFuture) {
self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
}
fn push_high_priority(&mut self, item: WorkItem) {
log::trace!("push high priority: {item:?}");
self.high_priority.push(item);
}
fn push_low_priority(&mut self, item: WorkItem) {
log::trace!("push low priority: {item:?}");
self.low_priority.push(item);
}
fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
let guest_instance = self.get_mut(guest_task).unwrap().instance;
loop {
match &self.get_mut(guest_task).unwrap().caller {
Caller::Host { .. } => break true,
Caller::Guest { task, instance } => {
if *instance == guest_instance {
break false;
} else {
guest_task = *task;
}
}
}
}
}
fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
self.instance_state(instance).do_not_enter = true;
}
fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
self.instance_state(instance).do_not_enter = false;
self.partition_pending(instance)
}
fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
for (task, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
let call = GuestCall { task, kind };
if call.is_ready(self)? {
self.push_high_priority(WorkItem::GuestCall(call));
} else {
self.instance_state(instance)
.pending
.insert(call.task, call.kind);
}
}
Ok(())
}
pub(crate) fn backpressure_modify(
&mut self,
caller_instance: RuntimeComponentInstanceIndex,
modify: impl FnOnce(u16) -> Option<u16>,
) -> Result<()> {
let state = self.instance_state(caller_instance);
let old = state.backpressure;
let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
state.backpressure = new;
if old > 0 && new == 0 {
self.partition_pending(caller_instance)?;
}
Ok(())
}
pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
let task = self.guest_task.unwrap();
let val = self.get_mut(task)?.context[usize::try_from(slot).unwrap()];
log::trace!("context_get {task:?} slot {slot} val {val:#x}");
Ok(val)
}
pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
let task = self.guest_task.unwrap();
log::trace!("context_set {task:?} slot {slot} val {val:#x}");
self.get_mut(task)?.context[usize::try_from(slot).unwrap()] = val;
Ok(())
}
fn options(&self, options: OptionsIndex) -> &CanonicalOptions {
&self.component.env_component().options[options]
}
}
fn for_any_lower<
F: FnOnce(&mut dyn VMStore, Instance, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
>(
fun: F,
) -> F {
fun
}
fn for_any_lift<
F: FnOnce(&mut dyn VMStore, Instance, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
+ Send
+ Sync,
>(
fun: F,
) -> F {
fun
}
fn checked<F: Future + Send + 'static>(
instance: Instance,
fut: F,
) -> impl Future<Output = F::Output> + Send + 'static {
async move {
let mut fut = pin!(fut);
future::poll_fn(move |cx| {
let message = "\
`Future`s which depend on asynchronous component tasks, streams, or \
futures to complete may only be polled from the event loop of the \
instance from which they originated. Please use \
`Instance::{run_concurrent,spawn}` to poll or await them.\
";
tls::try_get(|store| {
let matched = match store {
tls::TryGet::Some(store) => {
let a = store.concurrent_async_state_mut().current_instance;
a == Some(instance.id().instance())
}
tls::TryGet::Taken | tls::TryGet::None => false,
};
if !matched {
panic!("{message}")
}
});
fut.as_mut().poll(cx)
})
.await
}
}
fn check_recursive_run() {
tls::try_get(|store| {
if !matches!(store, tls::TryGet::None) {
panic!("Recursive `Instance::run_concurrent` calls not supported")
}
});
}
fn unpack_callback_code(code: u32) -> (u32, u32) {
(code & 0xF, code >> 4)
}
struct WaitableCheckParams {
set: TableId<WaitableSet>,
options: OptionsIndex,
payload: u32,
}
enum WaitableCheck {
Wait(WaitableCheckParams),
Poll(WaitableCheckParams),
Yield,
}
pub(crate) struct PreparedCall<R> {
handle: Func,
task: TableId<GuestTask>,
param_count: usize,
rx: oneshot::Receiver<LiftedResult>,
exit_rx: oneshot::Receiver<()>,
_phantom: PhantomData<R>,
}
impl<R> PreparedCall<R> {
pub(crate) fn task_id(&self) -> TaskId {
TaskId {
handle: self.handle,
task: self.task,
}
}
}
pub(crate) struct TaskId {
handle: Func,
task: TableId<GuestTask>,
}
impl TaskId {
pub(crate) fn remove<T>(&self, store: StoreContextMut<T>) -> Result<()> {
Waitable::Guest(self.task).delete_from(self.handle.instance().concurrent_state_mut(store.0))
}
}
pub(crate) fn prepare_call<T, R>(
mut store: StoreContextMut<T>,
handle: Func,
param_count: usize,
remove_task_automatically: bool,
call_post_return_automatically: bool,
lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
+ Send
+ Sync
+ 'static,
lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
+ Send
+ Sync
+ 'static,
) -> Result<PreparedCall<R>> {
let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
let instance = handle.instance().id().get(store.0);
let task_return_type = instance.component().types()[ty].results;
let component_instance = raw_options.instance;
let callback = options.callback();
let memory = options.memory_raw().map(SendSyncPtr::new);
let string_encoding = options.string_encoding();
let token = StoreToken::new(store.as_context_mut());
let state = handle.instance().concurrent_state_mut(store.0);
assert!(state.guest_task.is_none());
let (tx, rx) = oneshot::channel();
let (exit_tx, exit_rx) = oneshot::channel();
let mut task = GuestTask::new(
state,
Box::new(for_any_lower(move |store, instance, params| {
debug_assert!(instance.id() == handle.instance().id());
lower_params(handle, token.as_context_mut(store), params)
})),
LiftResult {
lift: Box::new(for_any_lift(move |store, instance, result| {
debug_assert!(instance.id() == handle.instance().id());
lift_result(handle, store, result)
})),
ty: task_return_type,
memory,
string_encoding,
},
Caller::Host {
tx: Some(tx),
exit_tx: Arc::new(exit_tx),
remove_task_automatically,
call_post_return_automatically,
},
callback.map(|callback| {
let callback = SendSyncPtr::new(callback);
Box::new(
move |store: &mut dyn VMStore,
instance: Instance,
runtime_instance,
event,
handle| {
let store = token.as_context_mut(store);
unsafe {
instance.call_callback(
store,
runtime_instance,
callback,
event,
handle,
call_post_return_automatically,
)
}
},
) as CallbackFn
}),
component_instance,
)?;
task.function_index = Some(handle.index());
let task = state.push(task)?;
Ok(PreparedCall {
handle,
task,
param_count,
rx,
exit_rx,
_phantom: PhantomData,
})
}
pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
mut store: StoreContextMut<T>,
prepared: PreparedCall<R>,
) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
let PreparedCall {
handle,
task,
param_count,
rx,
exit_rx,
..
} = prepared;
queue_call0(store.as_context_mut(), handle, task, param_count)?;
Ok(checked(
handle.instance(),
rx.map(move |result| {
result
.map(|v| (*v.downcast().unwrap(), exit_rx))
.map_err(anyhow::Error::from)
}),
))
}
fn queue_call0<T: 'static>(
store: StoreContextMut<T>,
handle: Func,
guest_task: TableId<GuestTask>,
param_count: usize,
) -> Result<()> {
let (options, flags, _ty, raw_options) = handle.abi_info(store.0);
let is_concurrent = raw_options.async_;
let instance = handle.instance();
let callee = handle.lifted_core_func(store.0);
let callback = options.callback();
let post_return = handle.post_return_core_func(store.0);
log::trace!("queueing call {guest_task:?}");
let instance_flags = if callback.is_none() {
None
} else {
Some(flags)
};
unsafe {
instance.queue_call(
store,
guest_task,
SendSyncPtr::new(callee),
param_count,
1,
instance_flags,
is_concurrent,
callback.map(SendSyncPtr::new),
post_return.map(SendSyncPtr::new),
)
}
}