use crate::component::func::{self, Func, call_post_return};
use crate::component::{
HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
};
use crate::fiber::{self, StoreFiber, StoreFiberYield};
use crate::prelude::*;
use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
use crate::vm::component::{CallContext, ComponentInstance, InstanceState, ResourceTables};
use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
use crate::{
AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
bail,
error::{Context as _, format_err},
};
use error_contexts::GlobalErrorContextRefCount;
use futures::channel::oneshot;
use futures::future::{self, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
use std::any::Any;
use std::borrow::ToOwned;
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop, MaybeUninit};
use std::ops::DerefMut;
use std::pin::{Pin, pin};
use std::ptr::{self, NonNull};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::vec::Vec;
use table::{TableDebug, TableId};
use wasmtime_environ::Trap;
use wasmtime_environ::component::{
CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
};
use wasmtime_environ::packed_option::ReservedValue;
pub use abort::JoinHandle;
pub use future_stream_any::{FutureAny, StreamAny};
pub use futures_and_streams::{
Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
};
pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
mod abort;
mod error_contexts;
mod future_stream_any;
mod futures_and_streams;
pub(crate) mod table;
pub(crate) mod tls;
const BLOCKED: u32 = 0xffff_ffff;
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub enum Status {
Starting = 0,
Started = 1,
Returned = 2,
StartCancelled = 3,
ReturnCancelled = 4,
}
impl Status {
pub fn pack(self, waitable: Option<u32>) -> u32 {
assert!(matches!(self, Status::Returned) == waitable.is_none());
let waitable = waitable.unwrap_or(0);
assert!(waitable < (1 << 28));
(waitable << 4) | (self as u32)
}
}
#[derive(Clone, Copy, Debug)]
enum Event {
None,
Cancelled,
Subtask {
status: Status,
},
StreamRead {
code: ReturnCode,
pending: Option<(TypeStreamTableIndex, u32)>,
},
StreamWrite {
code: ReturnCode,
pending: Option<(TypeStreamTableIndex, u32)>,
},
FutureRead {
code: ReturnCode,
pending: Option<(TypeFutureTableIndex, u32)>,
},
FutureWrite {
code: ReturnCode,
pending: Option<(TypeFutureTableIndex, u32)>,
},
}
impl Event {
fn parts(self) -> (u32, u32) {
const EVENT_NONE: u32 = 0;
const EVENT_SUBTASK: u32 = 1;
const EVENT_STREAM_READ: u32 = 2;
const EVENT_STREAM_WRITE: u32 = 3;
const EVENT_FUTURE_READ: u32 = 4;
const EVENT_FUTURE_WRITE: u32 = 5;
const EVENT_CANCELLED: u32 = 6;
match self {
Event::None => (EVENT_NONE, 0),
Event::Cancelled => (EVENT_CANCELLED, 0),
Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
}
}
}
mod callback_code {
pub const EXIT: u32 = 0;
pub const YIELD: u32 = 1;
pub const WAIT: u32 = 2;
}
const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
store: StoreContextMut<'a, T>,
get_data: fn(&mut T) -> D::Data<'_>,
}
impl<'a, T, D> Access<'a, T, D>
where
D: HasData + ?Sized,
T: 'static,
{
pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
Self { store, get_data }
}
pub fn data_mut(&mut self) -> &mut T {
self.store.data_mut()
}
pub fn get(&mut self) -> D::Data<'_> {
(self.get_data)(self.data_mut())
}
pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
where
T: 'static,
{
let accessor = Accessor {
get_data: self.get_data,
token: StoreToken::new(self.store.as_context_mut()),
};
self.store
.as_context_mut()
.spawn_with_accessor(accessor, task)
}
pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
self.get_data
}
}
impl<'a, T, D> AsContext for Access<'a, T, D>
where
D: HasData + ?Sized,
T: 'static,
{
type Data = T;
fn as_context(&self) -> StoreContext<'_, T> {
self.store.as_context()
}
}
impl<'a, T, D> AsContextMut for Access<'a, T, D>
where
D: HasData + ?Sized,
T: 'static,
{
fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
self.store.as_context_mut()
}
}
pub struct Accessor<T: 'static, D = HasSelf<T>>
where
D: HasData + ?Sized,
{
token: StoreToken<T>,
get_data: fn(&mut T) -> D::Data<'_>,
}
pub trait AsAccessor {
type Data: 'static;
type AccessorData: HasData + ?Sized;
fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
}
impl<T: AsAccessor + ?Sized> AsAccessor for &T {
type Data = T::Data;
type AccessorData = T::AccessorData;
fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
T::as_accessor(self)
}
}
impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
type Data = T;
type AccessorData = D;
fn as_accessor(&self) -> &Accessor<T, D> {
self
}
}
const _: () = {
const fn assert<T: Send + Sync>() {}
assert::<Accessor<UnsafeCell<u32>>>();
};
impl<T> Accessor<T> {
pub(crate) fn new(token: StoreToken<T>) -> Self {
Self {
token,
get_data: |x| x,
}
}
}
impl<T, D> Accessor<T, D>
where
D: HasData + ?Sized,
{
pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
tls::get(|vmstore| {
fun(Access {
store: self.token.as_context_mut(vmstore),
get_data: self.get_data,
})
})
}
pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
self.get_data
}
pub fn with_getter<D2: HasData>(
&self,
get_data: fn(&mut T) -> D2::Data<'_>,
) -> Accessor<T, D2> {
Accessor {
token: self.token,
get_data,
}
}
pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
where
T: 'static,
{
let accessor = self.clone_for_spawn();
self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
}
fn clone_for_spawn(&self) -> Self {
Self {
token: self.token,
get_data: self.get_data,
}
}
}
pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
where
D: HasData + ?Sized,
{
fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
}
enum CallerInfo {
Async {
params: Vec<ValRaw>,
has_result: bool,
},
Sync {
params: Vec<ValRaw>,
result_count: u32,
},
}
enum WaitMode {
Fiber(StoreFiber<'static>),
Callback(Instance),
}
#[derive(Debug)]
enum SuspendReason {
Waiting {
set: TableId<WaitableSet>,
thread: QualifiedThreadId,
skip_may_block_check: bool,
},
NeedWork,
Yielding {
thread: QualifiedThreadId,
skip_may_block_check: bool,
},
ExplicitlySuspending {
thread: QualifiedThreadId,
skip_may_block_check: bool,
},
}
enum GuestCallKind {
DeliverEvent {
instance: Instance,
set: Option<TableId<WaitableSet>>,
},
StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
}
impl fmt::Debug for GuestCallKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::DeliverEvent { instance, set } => f
.debug_struct("DeliverEvent")
.field("instance", instance)
.field("set", set)
.finish(),
Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
}
}
}
#[derive(Debug)]
struct GuestCall {
thread: QualifiedThreadId,
kind: GuestCallKind,
}
impl GuestCall {
fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
let instance = store
.concurrent_state_mut()
.get_mut(self.thread.task)?
.instance;
let state = store.instance_state(instance).concurrent_state();
let ready = match &self.kind {
GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
GuestCallKind::StartExplicit(_) => true,
};
log::trace!(
"call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
state.do_not_enter,
state.backpressure
);
Ok(ready)
}
}
enum WorkerItem {
GuestCall(GuestCall),
Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
}
enum WorkItem {
PushFuture(AlwaysMut<HostTaskFuture>),
ResumeFiber(StoreFiber<'static>),
GuestCall(GuestCall),
WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
}
impl fmt::Debug for WorkItem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
}
}
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub(crate) enum WaitResult {
Cancelled,
Completed,
}
pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
store: &mut dyn VMStore,
future: impl Future<Output = Result<R>> + Send + 'static,
caller_instance: RuntimeInstance,
) -> Result<R> {
let state = store.concurrent_state_mut();
let caller = state.guest_thread.unwrap();
let old_result = state
.get_mut(caller.task)
.with_context(|| format!("bad handle: {caller:?}"))?
.result
.take();
let task = state.push(HostTask::new(caller_instance, None))?;
log::trace!("new host task child of {caller:?}: {task:?}");
let mut future = Box::pin(async move {
let result = future.await?;
tls::get(move |store| {
let state = store.concurrent_state_mut();
state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
Waitable::Host(task).set_event(
state,
Some(Event::Subtask {
status: Status::Returned,
}),
)?;
Ok(())
})
}) as HostTaskFuture;
let poll = tls::set(store, || {
future
.as_mut()
.poll(&mut Context::from_waker(&Waker::noop()))
});
match poll {
Poll::Ready(result) => {
result?;
log::trace!("delete host task {task:?} (already ready)");
store.concurrent_state_mut().delete(task)?;
}
Poll::Pending => {
let state = store.concurrent_state_mut();
state.push_future(future);
let set = state.get_mut(caller.task)?.sync_call_set;
Waitable::Host(task).join(state, Some(set))?;
store.suspend(SuspendReason::Waiting {
set,
thread: caller,
skip_may_block_check: false,
})?;
}
}
Ok(*mem::replace(
&mut store.concurrent_state_mut().get_mut(caller.task)?.result,
old_result,
)
.unwrap()
.downcast()
.unwrap())
}
fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
let mut next = Some(call);
while let Some(call) = next.take() {
match call.kind {
GuestCallKind::DeliverEvent { instance, set } => {
let (event, waitable) = instance
.get_event(store, call.thread.task, set, true)?
.unwrap();
let state = store.concurrent_state_mut();
let task = state.get_mut(call.thread.task)?;
let runtime_instance = task.instance;
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
log::trace!(
"use callback to deliver event {event:?} to {:?} for {waitable:?}",
call.thread,
);
let old_thread = store.set_thread(Some(call.thread));
log::trace!(
"GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
call.thread
);
store.maybe_push_call_context(call.thread.task)?;
store.enter_instance(runtime_instance);
let callback = store
.concurrent_state_mut()
.get_mut(call.thread.task)?
.callback
.take()
.unwrap();
let code = callback(store, event, handle)?;
store
.concurrent_state_mut()
.get_mut(call.thread.task)?
.callback = Some(callback);
store.exit_instance(runtime_instance)?;
store.maybe_pop_call_context(call.thread.task)?;
store.set_thread(old_thread);
next = instance.handle_callback_code(
store,
call.thread,
runtime_instance.index,
code,
)?;
log::trace!(
"GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
);
}
GuestCallKind::StartImplicit(fun) => {
next = fun(store)?;
}
GuestCallKind::StartExplicit(fun) => {
fun(store)?;
}
}
}
Ok(())
}
impl<T> Store<T> {
pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
where
T: Send + 'static,
{
ensure!(
self.as_context().0.concurrency_support(),
"cannot use `run_concurrent` when Config::concurrency_support disabled",
);
self.as_context_mut().run_concurrent(fun).await
}
#[doc(hidden)]
pub fn assert_concurrent_state_empty(&mut self) {
self.as_context_mut().assert_concurrent_state_empty();
}
pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
where
T: 'static,
{
self.as_context_mut().spawn(task)
}
}
impl<T> StoreContextMut<'_, T> {
#[doc(hidden)]
pub fn assert_concurrent_state_empty(self) {
let store = self.0;
store
.store_data_mut()
.components
.assert_instance_states_empty();
let state = store.concurrent_state_mut();
assert!(
state.table.get_mut().is_empty(),
"non-empty table: {:?}",
state.table.get_mut()
);
assert!(state.high_priority.is_empty());
assert!(state.low_priority.is_empty());
assert!(state.guest_thread.is_none());
assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
assert!(state.global_error_context_ref_counts.is_empty());
}
pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
where
T: 'static,
{
let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
self.spawn_with_accessor(accessor, task)
}
fn spawn_with_accessor<D>(
self,
accessor: Accessor<T, D>,
task: impl AccessorTask<T, D>,
) -> JoinHandle
where
T: 'static,
D: HasData + ?Sized,
{
let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
self.0
.concurrent_state_mut()
.push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
handle
}
pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
where
T: Send + 'static,
{
ensure!(
self.0.concurrency_support(),
"cannot use `run_concurrent` when Config::concurrency_support disabled",
);
self.do_run_concurrent(fun, false).await
}
pub(super) async fn run_concurrent_trap_on_idle<R>(
self,
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
) -> Result<R>
where
T: Send + 'static,
{
self.do_run_concurrent(fun, true).await
}
async fn do_run_concurrent<R>(
mut self,
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
trap_on_idle: bool,
) -> Result<R>
where
T: Send + 'static,
{
debug_assert!(self.0.concurrency_support());
check_recursive_run();
let token = StoreToken::new(self.as_context_mut());
struct Dropper<'a, T: 'static, V> {
store: StoreContextMut<'a, T>,
value: ManuallyDrop<V>,
}
impl<'a, T, V> Drop for Dropper<'a, T, V> {
fn drop(&mut self) {
tls::set(self.store.0, || {
unsafe { ManuallyDrop::drop(&mut self.value) }
});
}
}
let accessor = &Accessor::new(token);
let dropper = &mut Dropper {
store: self,
value: ManuallyDrop::new(fun(accessor)),
};
let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
dropper
.store
.as_context_mut()
.poll_until(future, trap_on_idle)
.await
}
async fn poll_until<R>(
mut self,
mut future: Pin<&mut impl Future<Output = R>>,
trap_on_idle: bool,
) -> Result<R>
where
T: Send + 'static,
{
struct Reset<'a, T: 'static> {
store: StoreContextMut<'a, T>,
futures: Option<FuturesUnordered<HostTaskFuture>>,
}
impl<'a, T> Drop for Reset<'a, T> {
fn drop(&mut self) {
if let Some(futures) = self.futures.take() {
*self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
}
}
}
loop {
let futures = self.0.concurrent_state_mut().futures.get_mut().take();
let mut reset = Reset {
store: self.as_context_mut(),
futures,
};
let mut next = pin!(reset.futures.as_mut().unwrap().next());
enum PollResult<R> {
Complete(R),
ProcessWork(Vec<WorkItem>),
}
let result = future::poll_fn(|cx| {
if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
return Poll::Ready(Ok(PollResult::Complete(value)));
}
let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
Poll::Ready(Some(output)) => {
match output {
Err(e) => return Poll::Ready(Err(e)),
Ok(()) => {}
}
Poll::Ready(true)
}
Poll::Ready(None) => Poll::Ready(false),
Poll::Pending => Poll::Pending,
};
let state = reset.store.0.concurrent_state_mut();
let ready = state.collect_work_items_to_run();
if !ready.is_empty() {
return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
}
return match next {
Poll::Ready(true) => {
Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
}
Poll::Ready(false) => {
if let Poll::Ready(value) =
tls::set(reset.store.0, || future.as_mut().poll(cx))
{
Poll::Ready(Ok(PollResult::Complete(value)))
} else {
if trap_on_idle {
Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
} else {
Poll::Pending
}
}
}
Poll::Pending => Poll::Pending,
};
})
.await;
drop(reset);
match result? {
PollResult::Complete(value) => break Ok(value),
PollResult::ProcessWork(ready) => {
struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
store: StoreContextMut<'a, T>,
ready: I,
}
impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
fn drop(&mut self) {
while let Some(item) = self.ready.next() {
match item {
WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
WorkItem::PushFuture(future) => {
tls::set(self.store.0, move || drop(future))
}
_ => {}
}
}
}
}
let mut dispose = Dispose {
store: self.as_context_mut(),
ready: ready.into_iter(),
};
while let Some(item) = dispose.ready.next() {
dispose
.store
.as_context_mut()
.handle_work_item(item)
.await?;
}
}
}
}
}
async fn handle_work_item(self, item: WorkItem) -> Result<()>
where
T: Send,
{
log::trace!("handle work item {item:?}");
match item {
WorkItem::PushFuture(future) => {
self.0
.concurrent_state_mut()
.futures
.get_mut()
.as_mut()
.unwrap()
.push(future.into_inner());
}
WorkItem::ResumeFiber(fiber) => {
self.0.resume_fiber(fiber).await?;
}
WorkItem::GuestCall(call) => {
if call.is_ready(self.0)? {
self.run_on_worker(WorkerItem::GuestCall(call)).await?;
} else {
let state = self.0.concurrent_state_mut();
let task = state.get_mut(call.thread.task)?;
if !task.starting_sent {
task.starting_sent = true;
if let GuestCallKind::StartImplicit(_) = &call.kind {
Waitable::Guest(call.thread.task).set_event(
state,
Some(Event::Subtask {
status: Status::Starting,
}),
)?;
}
}
let instance = state.get_mut(call.thread.task)?.instance;
self.0
.instance_state(instance)
.concurrent_state()
.pending
.insert(call.thread, call.kind);
}
}
WorkItem::WorkerFunction(fun) => {
self.run_on_worker(WorkerItem::Function(fun)).await?;
}
}
Ok(())
}
async fn run_on_worker(self, item: WorkerItem) -> Result<()>
where
T: Send,
{
let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
fiber
} else {
fiber::make_fiber(self.0, move |store| {
loop {
match store.concurrent_state_mut().worker_item.take().unwrap() {
WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
WorkerItem::Function(fun) => fun.into_inner()(store)?,
}
store.suspend(SuspendReason::NeedWork)?;
}
})?
};
let worker_item = &mut self.0.concurrent_state_mut().worker_item;
assert!(worker_item.is_none());
*worker_item = Some(item);
self.0.resume_fiber(worker).await
}
pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
where
T: 'static,
F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
+ Send
+ Sync
+ 'static,
R: Send + Sync + 'static,
{
let token = StoreToken::new(self);
async move {
let mut accessor = Accessor::new(token);
closure(&mut accessor).await
}
}
}
impl StoreOpaque {
pub(crate) fn enter_sync_call(
&mut self,
guest_caller: Option<RuntimeInstance>,
callee_async: bool,
callee: RuntimeInstance,
) -> Result<()> {
log::trace!("enter sync call {callee:?}");
let state = self.concurrent_state_mut();
let thread = state.guest_thread;
let instance = if let Some(thread) = thread {
Some(state.get_mut(thread.task)?.instance)
} else {
None
};
let task = GuestTask::new(
state,
Box::new(move |_, _| unreachable!()),
LiftResult {
lift: Box::new(move |_, _| unreachable!()),
ty: TypeTupleIndex::reserved_value(),
memory: None,
string_encoding: StringEncoding::Utf8,
},
if let Some(caller) = guest_caller {
assert_eq!(caller, instance.unwrap());
Caller::Guest {
thread: thread.unwrap(),
}
} else {
Caller::Host {
tx: None,
exit_tx: Arc::new(oneshot::channel().0),
host_future_present: false,
caller: state.guest_thread,
}
},
None,
callee,
callee_async,
)?;
let guest_task = state.push(task)?;
let new_thread = GuestThread::new_implicit(guest_task);
let guest_thread = state.push(new_thread)?;
Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
guest_thread,
self,
callee.index,
)?;
let state = self.concurrent_state_mut();
state.get_mut(guest_task)?.threads.insert(guest_thread);
if guest_caller.is_some() {
let thread = state.guest_thread.unwrap();
state.get_mut(thread.task)?.subtasks.insert(guest_task);
}
self.set_thread(Some(QualifiedThreadId {
task: guest_task,
thread: guest_thread,
}));
Ok(())
}
pub(crate) fn exit_sync_call(&mut self, guest_caller: bool) -> Result<()> {
let thread = self.set_thread(None).unwrap();
let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
log::trace!("exit sync call {instance:?}");
Instance::from_wasmtime(self, instance.instance).cleanup_thread(
self,
thread,
instance.index,
)?;
let state = self.concurrent_state_mut();
let task = state.get_mut(thread.task)?;
let caller = match &task.caller {
&Caller::Guest { thread } => {
assert!(guest_caller);
Some(thread)
}
&Caller::Host { caller, .. } => {
assert!(!guest_caller);
caller
}
};
self.set_thread(caller);
let state = self.concurrent_state_mut();
let task = state.get_mut(thread.task)?;
if task.ready_to_delete() {
state.delete(thread.task)?.dispose(state, thread.task)?;
}
Ok(())
}
pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
if !self.concurrency_support() {
return !self.trapped();
}
let state = self.concurrent_state_mut();
if let Some(caller) = state.guest_thread {
instance != state.get_mut(caller.task).unwrap().instance
&& self.may_enter_from_caller(caller.task, instance)
} else {
!self.trapped()
}
}
fn may_enter_task(&mut self, task: TableId<GuestTask>) -> bool {
let instance = self.concurrent_state_mut().get_mut(task).unwrap().instance;
self.may_enter_from_caller(task, instance)
}
fn may_enter_from_caller(
&mut self,
mut guest_task: TableId<GuestTask>,
instance: RuntimeInstance,
) -> bool {
!self.trapped() && {
let state = self.concurrent_state_mut();
let guest_instance = instance.instance;
loop {
let next_thread = match &state.get_mut(guest_task).unwrap().caller {
Caller::Host { caller: None, .. } => break true,
&Caller::Host {
caller: Some(caller),
..
} => {
let instance = state.get_mut(caller.task).unwrap().instance;
if instance.instance == guest_instance {
break false;
} else {
caller
}
}
&Caller::Guest { thread } => {
if state.get_mut(thread.task).unwrap().instance.instance == guest_instance {
break false;
} else {
thread
}
}
};
guest_task = next_thread.task;
}
}
}
fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
self.component_instance_mut(instance.instance)
.instance_state(instance.index)
}
fn set_thread(&mut self, thread: Option<QualifiedThreadId>) -> Option<QualifiedThreadId> {
let state = self.concurrent_state_mut();
let old_thread = state.guest_thread.take();
if let Some(old_thread) = old_thread {
let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
self.component_instance_mut(instance)
.set_task_may_block(false)
}
self.concurrent_state_mut().guest_thread = thread;
if thread.is_some() {
self.set_task_may_block();
}
old_thread
}
fn set_task_may_block(&mut self) {
let state = self.concurrent_state_mut();
let guest_thread = state.guest_thread.unwrap();
let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
self.component_instance_mut(instance)
.set_task_may_block(may_block)
}
pub(crate) fn check_blocking(&mut self) -> Result<()> {
if !self.concurrency_support() {
return Ok(());
}
let state = self.concurrent_state_mut();
let task = state.guest_thread.unwrap().task;
let instance = state.get_mut(task).unwrap().instance.instance;
let task_may_block = self.component_instance(instance).get_task_may_block();
if task_may_block {
Ok(())
} else {
Err(Trap::CannotBlockSyncTask.into())
}
}
fn enter_instance(&mut self, instance: RuntimeInstance) {
log::trace!("enter {instance:?}");
self.instance_state(instance)
.concurrent_state()
.do_not_enter = true;
}
fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
log::trace!("exit {instance:?}");
self.instance_state(instance)
.concurrent_state()
.do_not_enter = false;
self.partition_pending(instance)
}
fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
for (thread, kind) in
mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
{
let call = GuestCall { thread, kind };
if call.is_ready(self)? {
self.concurrent_state_mut()
.push_high_priority(WorkItem::GuestCall(call));
} else {
self.instance_state(instance)
.concurrent_state()
.pending
.insert(call.thread, call.kind);
}
}
Ok(())
}
pub(crate) fn backpressure_modify(
&mut self,
caller_instance: RuntimeInstance,
modify: impl FnOnce(u16) -> Option<u16>,
) -> Result<()> {
let state = self.instance_state(caller_instance).concurrent_state();
let old = state.backpressure;
let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
state.backpressure = new;
if old > 0 && new == 0 {
self.partition_pending(caller_instance)?;
}
Ok(())
}
async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
let old_thread = self.concurrent_state_mut().guest_thread;
log::trace!("resume_fiber: save current thread {old_thread:?}");
let fiber = fiber::resolve_or_release(self, fiber).await?;
self.set_thread(old_thread);
let state = self.concurrent_state_mut();
if let Some(ref ot) = old_thread {
state.get_mut(ot.thread)?.state = GuestThreadState::Running;
}
log::trace!("resume_fiber: restore current thread {old_thread:?}");
if let Some(mut fiber) = fiber {
log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
match state.suspend_reason.take().unwrap() {
SuspendReason::NeedWork => {
if state.worker.is_none() {
state.worker = Some(fiber);
} else {
fiber.dispose(self);
}
}
SuspendReason::Yielding { thread, .. } => {
state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
state.push_low_priority(WorkItem::ResumeFiber(fiber));
}
SuspendReason::ExplicitlySuspending { thread, .. } => {
state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
}
SuspendReason::Waiting { set, thread, .. } => {
let old = state
.get_mut(set)?
.waiting
.insert(thread, WaitMode::Fiber(fiber));
assert!(old.is_none());
}
};
} else {
log::trace!("resume_fiber: fiber has exited");
}
Ok(())
}
fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
log::trace!("suspend fiber: {reason:?}");
let task = match &reason {
SuspendReason::Yielding { thread, .. }
| SuspendReason::Waiting { thread, .. }
| SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
SuspendReason::NeedWork => None,
};
let old_guest_thread = if let Some(task) = task {
self.maybe_pop_call_context(task)?;
self.concurrent_state_mut().guest_thread
} else {
None
};
assert!(
matches!(
reason,
SuspendReason::ExplicitlySuspending {
skip_may_block_check: true,
..
} | SuspendReason::Waiting {
skip_may_block_check: true,
..
} | SuspendReason::Yielding {
skip_may_block_check: true,
..
}
) || old_guest_thread
.map(|thread| self.concurrent_state_mut().may_block(thread.task))
.unwrap_or(true)
);
let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
assert!(suspend_reason.is_none());
*suspend_reason = Some(reason);
self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
if let Some(task) = task {
self.set_thread(old_guest_thread);
self.maybe_push_call_context(task)?;
}
Ok(())
}
fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
let task = self.concurrent_state_mut().get_mut(guest_task)?;
if !task.returned_or_cancelled() {
log::trace!("push call context for {guest_task:?}");
let call_context = task.call_context.take().unwrap();
self.component_resource_state().0.push(call_context);
}
Ok(())
}
fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
if !self
.concurrent_state_mut()
.get_mut(guest_task)?
.returned_or_cancelled()
{
log::trace!("pop call context for {guest_task:?}");
let call_context = Some(self.component_resource_state().0.pop().unwrap());
self.concurrent_state_mut()
.get_mut(guest_task)?
.call_context = call_context;
}
Ok(())
}
fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
let state = self.concurrent_state_mut();
let caller = state.guest_thread.unwrap();
let old_set = waitable.common(state)?.set;
let set = state.get_mut(caller.task)?.sync_call_set;
waitable.join(state, Some(set))?;
self.suspend(SuspendReason::Waiting {
set,
thread: caller,
skip_may_block_check: false,
})?;
let state = self.concurrent_state_mut();
waitable.join(state, old_set)
}
}
impl Instance {
fn get_event(
self,
store: &mut StoreOpaque,
guest_task: TableId<GuestTask>,
set: Option<TableId<WaitableSet>>,
cancellable: bool,
) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
let state = store.concurrent_state_mut();
if let Some(event) = state.get_mut(guest_task)?.event.take() {
log::trace!("deliver event {event:?} to {guest_task:?}");
if cancellable || !matches!(event, Event::Cancelled) {
return Ok(Some((event, None)));
} else {
state.get_mut(guest_task)?.event = Some(event);
}
}
Ok(
if let Some((set, waitable)) = set
.and_then(|set| {
state
.get_mut(set)
.map(|v| v.ready.pop_first().map(|v| (set, v)))
.transpose()
})
.transpose()?
{
let common = waitable.common(state)?;
let handle = common.handle.unwrap();
let event = common.event.take().unwrap();
log::trace!(
"deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
);
waitable.on_delivery(store, self, event);
Some((event, Some((waitable, handle))))
} else {
None
},
)
}
fn handle_callback_code(
self,
store: &mut StoreOpaque,
guest_thread: QualifiedThreadId,
runtime_instance: RuntimeComponentInstanceIndex,
code: u32,
) -> Result<Option<GuestCall>> {
let (code, set) = unpack_callback_code(code);
log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
let state = store.concurrent_state_mut();
let get_set = |store: &mut StoreOpaque, handle| {
if handle == 0 {
bail!("invalid waitable-set handle");
}
let set = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.handle_table()
.waitable_set_rep(handle)?;
Ok(TableId::<WaitableSet>::new(set))
};
Ok(match code {
callback_code::EXIT => {
log::trace!("implicit thread {guest_thread:?} completed");
self.cleanup_thread(store, guest_thread, runtime_instance)?;
let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
if task.threads.is_empty() && !task.returned_or_cancelled() {
bail!(Trap::NoAsyncResult);
}
match &task.caller {
Caller::Host { .. } => {
if task.ready_to_delete() {
Waitable::Guest(guest_thread.task)
.delete_from(store.concurrent_state_mut())?;
}
}
Caller::Guest { .. } => {
task.exited = true;
task.callback = None;
}
}
None
}
callback_code::YIELD => {
let task = state.get_mut(guest_thread.task)?;
if let Some(event) = task.event {
assert!(matches!(event, Event::None | Event::Cancelled));
} else {
task.event = Some(Event::None);
}
let call = GuestCall {
thread: guest_thread,
kind: GuestCallKind::DeliverEvent {
instance: self,
set: None,
},
};
if state.may_block(guest_thread.task) {
state.push_low_priority(WorkItem::GuestCall(call));
None
} else {
Some(call)
}
}
callback_code::WAIT => {
state.check_blocking_for(guest_thread.task)?;
let set = get_set(store, set)?;
let state = store.concurrent_state_mut();
if state.get_mut(guest_thread.task)?.event.is_some()
|| !state.get_mut(set)?.ready.is_empty()
{
state.push_high_priority(WorkItem::GuestCall(GuestCall {
thread: guest_thread,
kind: GuestCallKind::DeliverEvent {
instance: self,
set: Some(set),
},
}));
} else {
let old = state
.get_mut(guest_thread.thread)?
.wake_on_cancel
.replace(set);
assert!(old.is_none());
let old = state
.get_mut(set)?
.waiting
.insert(guest_thread, WaitMode::Callback(self));
assert!(old.is_none());
}
None
}
_ => bail!("unsupported callback code: {code}"),
})
}
fn cleanup_thread(
self,
store: &mut StoreOpaque,
guest_thread: QualifiedThreadId,
runtime_instance: RuntimeComponentInstanceIndex,
) -> Result<()> {
let guest_id = store
.concurrent_state_mut()
.get_mut(guest_thread.thread)?
.instance_rep;
store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.thread_handle_table()
.guest_thread_remove(guest_id.unwrap())?;
store.concurrent_state_mut().delete(guest_thread.thread)?;
let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
task.threads.remove(&guest_thread.thread);
Ok(())
}
unsafe fn queue_call<T: 'static>(
self,
mut store: StoreContextMut<T>,
guest_thread: QualifiedThreadId,
callee: SendSyncPtr<VMFuncRef>,
param_count: usize,
result_count: usize,
async_: bool,
callback: Option<SendSyncPtr<VMFuncRef>>,
post_return: Option<SendSyncPtr<VMFuncRef>>,
) -> Result<()> {
unsafe fn make_call<T: 'static>(
store: StoreContextMut<T>,
guest_thread: QualifiedThreadId,
callee: SendSyncPtr<VMFuncRef>,
param_count: usize,
result_count: usize,
) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
+ Send
+ Sync
+ 'static
+ use<T> {
let token = StoreToken::new(store);
move |store: &mut dyn VMStore| {
let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
store
.concurrent_state_mut()
.get_mut(guest_thread.thread)?
.state = GuestThreadState::Running;
let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
let lower = task.lower_params.take().unwrap();
lower(store, &mut storage[..param_count])?;
let mut store = token.as_context_mut(store);
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
callee.as_non_null(),
NonNull::new(
&mut storage[..param_count.max(result_count)]
as *mut [MaybeUninit<ValRaw>] as _,
)
.unwrap(),
)?;
}
Ok(storage)
}
}
let call = unsafe {
make_call(
store.as_context_mut(),
guest_thread,
callee,
param_count,
result_count,
)
};
let callee_instance = store
.0
.concurrent_state_mut()
.get_mut(guest_thread.task)?
.instance;
let fun = if callback.is_some() {
assert!(async_);
Box::new(move |store: &mut dyn VMStore| {
self.add_guest_thread_to_instance_table(
guest_thread.thread,
store,
callee_instance.index,
)?;
let old_thread = store.set_thread(Some(guest_thread));
log::trace!(
"stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
);
store.maybe_push_call_context(guest_thread.task)?;
store.enter_instance(callee_instance);
let storage = call(store)?;
store.exit_instance(callee_instance)?;
store.maybe_pop_call_context(guest_thread.task)?;
store.set_thread(old_thread);
let state = store.concurrent_state_mut();
old_thread
.map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
log::trace!("stackless call: restored {old_thread:?} as current thread");
let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
self.handle_callback_code(store, guest_thread, callee_instance.index, code)
})
as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
} else {
let token = StoreToken::new(store.as_context_mut());
Box::new(move |store: &mut dyn VMStore| {
self.add_guest_thread_to_instance_table(
guest_thread.thread,
store,
callee_instance.index,
)?;
let old_thread = store.set_thread(Some(guest_thread));
log::trace!(
"sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
);
let flags = self.id().get(store).instance_flags(callee_instance.index);
store.maybe_push_call_context(guest_thread.task)?;
if !async_ {
store.enter_instance(callee_instance);
}
let storage = call(store)?;
if async_ {
let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
if task.threads.len() == 1 && !task.returned_or_cancelled() {
bail!(Trap::NoAsyncResult);
}
} else {
let lift = {
store.exit_instance(callee_instance)?;
let state = store.concurrent_state_mut();
assert!(state.get_mut(guest_thread.task)?.result.is_none());
state
.get_mut(guest_thread.task)?
.lift_result
.take()
.unwrap()
};
let result = (lift.lift)(store, unsafe {
mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
&storage[..result_count],
)
})?;
let post_return_arg = match result_count {
0 => ValRaw::i32(0),
1 => unsafe { storage[0].assume_init() },
_ => unreachable!(),
};
unsafe {
call_post_return(
token.as_context_mut(store),
post_return.map(|v| v.as_non_null()),
post_return_arg,
flags,
)?;
}
self.task_complete(store, guest_thread.task, result, Status::Returned)?;
}
self.cleanup_thread(store, guest_thread, callee_instance.index)?;
store.set_thread(old_thread);
store.maybe_pop_call_context(guest_thread.task)?;
let state = store.concurrent_state_mut();
let task = state.get_mut(guest_thread.task)?;
match &task.caller {
Caller::Host { .. } => {
if task.ready_to_delete() {
Waitable::Guest(guest_thread.task).delete_from(state)?;
}
}
Caller::Guest { .. } => {
task.exited = true;
}
}
Ok(None)
})
};
store
.0
.concurrent_state_mut()
.push_high_priority(WorkItem::GuestCall(GuestCall {
thread: guest_thread,
kind: GuestCallKind::StartImplicit(fun),
}));
Ok(())
}
unsafe fn prepare_call<T: 'static>(
self,
mut store: StoreContextMut<T>,
start: *mut VMFuncRef,
return_: *mut VMFuncRef,
caller_instance: RuntimeComponentInstanceIndex,
callee_instance: RuntimeComponentInstanceIndex,
task_return_type: TypeTupleIndex,
callee_async: bool,
memory: *mut VMMemoryDefinition,
string_encoding: u8,
caller_info: CallerInfo,
) -> Result<()> {
if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
store.0.check_blocking()?;
}
enum ResultInfo {
Heap { results: u32 },
Stack { result_count: u32 },
}
let result_info = match &caller_info {
CallerInfo::Async {
has_result: true,
params,
} => ResultInfo::Heap {
results: params.last().unwrap().get_u32(),
},
CallerInfo::Async {
has_result: false, ..
} => ResultInfo::Stack { result_count: 0 },
CallerInfo::Sync {
result_count,
params,
} if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
results: params.last().unwrap().get_u32(),
},
CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
result_count: *result_count,
},
};
let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
let start = SendSyncPtr::new(NonNull::new(start).unwrap());
let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
let token = StoreToken::new(store.as_context_mut());
let state = store.0.concurrent_state_mut();
let old_thread = state.guest_thread.unwrap();
assert_eq!(
state.get_mut(old_thread.task)?.instance,
RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
}
);
let new_task = GuestTask::new(
state,
Box::new(move |store, dst| {
let mut store = token.as_context_mut(store);
assert!(dst.len() <= MAX_FLAT_PARAMS);
let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
let count = match caller_info {
CallerInfo::Async { params, has_result } => {
let params = ¶ms[..params.len() - usize::from(has_result)];
for (param, src) in params.iter().zip(&mut src) {
src.write(*param);
}
params.len()
}
CallerInfo::Sync { params, .. } => {
for (param, src) in params.iter().zip(&mut src) {
src.write(*param);
}
params.len()
}
};
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
start.as_non_null(),
NonNull::new(
&mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
)
.unwrap(),
)?;
}
dst.copy_from_slice(&src[..dst.len()]);
let state = store.0.concurrent_state_mut();
Waitable::Guest(state.guest_thread.unwrap().task).set_event(
state,
Some(Event::Subtask {
status: Status::Started,
}),
)?;
Ok(())
}),
LiftResult {
lift: Box::new(move |store, src| {
let mut store = token.as_context_mut(store);
let mut my_src = src.to_owned(); if let ResultInfo::Heap { results } = &result_info {
my_src.push(ValRaw::u32(*results));
}
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
return_.as_non_null(),
my_src.as_mut_slice().into(),
)?;
}
let state = store.0.concurrent_state_mut();
let thread = state.guest_thread.unwrap();
if sync_caller {
state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
if let ResultInfo::Stack { result_count } = &result_info {
match result_count {
0 => None,
1 => Some(my_src[0]),
_ => unreachable!(),
}
} else {
None
},
);
}
Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
}),
ty: task_return_type,
memory: NonNull::new(memory).map(SendSyncPtr::new),
string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
},
Caller::Guest { thread: old_thread },
None,
RuntimeInstance {
instance: self.id().instance(),
index: callee_instance,
},
callee_async,
)?;
let guest_task = state.push(new_task)?;
let new_thread = GuestThread::new_implicit(guest_task);
let guest_thread = state.push(new_thread)?;
state.get_mut(guest_task)?.threads.insert(guest_thread);
store
.0
.concurrent_state_mut()
.get_mut(old_thread.task)?
.subtasks
.insert(guest_task);
store.0.set_thread(Some(QualifiedThreadId {
task: guest_task,
thread: guest_thread,
}));
log::trace!(
"pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
);
Ok(())
}
unsafe fn call_callback<T>(
self,
mut store: StoreContextMut<T>,
function: SendSyncPtr<VMFuncRef>,
event: Event,
handle: u32,
) -> Result<u32> {
let (ordinal, result) = event.parts();
let params = &mut [
ValRaw::u32(ordinal),
ValRaw::u32(handle),
ValRaw::u32(result),
];
unsafe {
crate::Func::call_unchecked_raw(
&mut store,
function.as_non_null(),
params.as_mut_slice().into(),
)?;
}
Ok(params[0].get_u32())
}
unsafe fn start_call<T: 'static>(
self,
mut store: StoreContextMut<T>,
callback: *mut VMFuncRef,
post_return: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
result_count: u32,
flags: u32,
storage: Option<&mut [MaybeUninit<ValRaw>]>,
) -> Result<u32> {
let token = StoreToken::new(store.as_context_mut());
let async_caller = storage.is_none();
let state = store.0.concurrent_state_mut();
let guest_thread = state.guest_thread.unwrap();
let callee_async = state.get_mut(guest_thread.task)?.async_function;
let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
let param_count = usize::try_from(param_count).unwrap();
assert!(param_count <= MAX_FLAT_PARAMS);
let result_count = usize::try_from(result_count).unwrap();
assert!(result_count <= MAX_FLAT_RESULTS);
let task = state.get_mut(guest_thread.task)?;
if !callback.is_null() {
let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
task.callback = Some(Box::new(move |store, event, handle| {
let store = token.as_context_mut(store);
unsafe { self.call_callback::<T>(store, callback, event, handle) }
}));
}
let Caller::Guest { thread: caller } = &task.caller else {
unreachable!()
};
let caller = *caller;
let caller_instance = state.get_mut(caller.task)?.instance;
unsafe {
self.queue_call(
store.as_context_mut(),
guest_thread,
callee,
param_count,
result_count,
(flags & START_FLAG_ASYNC_CALLEE) != 0,
NonNull::new(callback).map(SendSyncPtr::new),
NonNull::new(post_return).map(SendSyncPtr::new),
)?;
}
let state = store.0.concurrent_state_mut();
let guest_waitable = Waitable::Guest(guest_thread.task);
let old_set = guest_waitable.common(state)?.set;
let set = state.get_mut(caller.task)?.sync_call_set;
guest_waitable.join(state, Some(set))?;
let (status, waitable) = loop {
store.0.suspend(SuspendReason::Waiting {
set,
thread: caller,
skip_may_block_check: async_caller || !callee_async,
})?;
let state = store.0.concurrent_state_mut();
log::trace!("taking event for {:?}", guest_thread.task);
let event = guest_waitable.take_event(state)?;
let Some(Event::Subtask { status }) = event else {
unreachable!();
};
log::trace!("status {status:?} for {:?}", guest_thread.task);
if status == Status::Returned {
break (status, None);
} else if async_caller {
let handle = store
.0
.instance_state(caller_instance)
.handle_table()
.subtask_insert_guest(guest_thread.task.rep())?;
store
.0
.concurrent_state_mut()
.get_mut(guest_thread.task)?
.common
.handle = Some(handle);
break (status, Some(handle));
} else {
}
};
guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
store.0.set_thread(Some(caller));
store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
if let Some(storage) = storage {
let state = store.0.concurrent_state_mut();
let task = state.get_mut(guest_thread.task)?;
if let Some(result) = task.sync_result.take() {
if let Some(result) = result {
storage[0] = MaybeUninit::new(result);
}
if task.exited && task.ready_to_delete() {
Waitable::Guest(guest_thread.task).delete_from(state)?;
}
}
}
Ok(status.pack(waitable))
}
pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
self,
mut store: StoreContextMut<'_, T>,
future: impl Future<Output = Result<R>> + Send + 'static,
caller_instance: RuntimeComponentInstanceIndex,
lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
) -> Result<Option<u32>> {
let token = StoreToken::new(store.as_context_mut());
let state = store.0.concurrent_state_mut();
let caller = state.guest_thread.unwrap();
let (join_handle, future) = JoinHandle::run(async move {
let mut future = pin!(future);
let mut call_context = None;
future::poll_fn(move |cx| {
tls::get(|store| {
if let Some(call_context) = call_context.take() {
token
.as_context_mut(store)
.0
.component_resource_state()
.0
.push(call_context);
}
});
let result = future.as_mut().poll(cx);
if result.is_pending() {
tls::get(|store| {
call_context = Some(
token
.as_context_mut(store)
.0
.component_resource_state()
.0
.pop()
.unwrap(),
);
});
}
result
})
.await
});
let task = state.push(HostTask::new(
RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
},
Some(join_handle),
))?;
log::trace!("new host task child of {caller:?}: {task:?}");
let mut future = Box::pin(future);
let poll = tls::set(store.0, || {
future
.as_mut()
.poll(&mut Context::from_waker(&Waker::noop()))
});
Ok(match poll {
Poll::Ready(None) => unreachable!(),
Poll::Ready(Some(result)) => {
lower(store.as_context_mut(), result?)?;
log::trace!("delete host task {task:?} (already ready)");
store.0.concurrent_state_mut().delete(task)?;
None
}
Poll::Pending => {
let future =
Box::pin(async move {
let result = match future.await {
Some(result) => result?,
None => return Ok(()),
};
tls::get(move |store| {
store.concurrent_state_mut().push_high_priority(
WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
lower(token.as_context_mut(store), result)?;
let state = store.concurrent_state_mut();
state.get_mut(task)?.join_handle.take();
Waitable::Host(task).set_event(
state,
Some(Event::Subtask {
status: Status::Returned,
}),
)
}))),
);
Ok(())
})
});
store.0.concurrent_state_mut().push_future(future);
let handle = store
.0
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.subtask_insert_host(task.rep())?;
store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
log::trace!(
"assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
);
Some(handle)
}
})
}
pub(crate) fn task_return(
self,
store: &mut dyn VMStore,
ty: TypeTupleIndex,
options: OptionsIndex,
storage: &[ValRaw],
) -> Result<()> {
let state = store.concurrent_state_mut();
let guest_thread = state.guest_thread.unwrap();
let lift = state
.get_mut(guest_thread.task)?
.lift_result
.take()
.ok_or_else(|| {
format_err!("`task.return` or `task.cancel` called more than once for current task")
})?;
assert!(state.get_mut(guest_thread.task)?.result.is_none());
let CanonicalOptions {
string_encoding,
data_model,
..
} = &self.id().get(store).component().env_component().options[options];
let invalid = ty != lift.ty
|| string_encoding != &lift.string_encoding
|| match data_model {
CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
Some(memory) => {
let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
let actual = self.id().get(store).runtime_memory(memory);
expected != actual.as_ptr()
}
None => false,
},
CanonicalOptionsDataModel::Gc { .. } => true,
};
if invalid {
bail!("invalid `task.return` signature and/or options for current task");
}
log::trace!("task.return for {guest_thread:?}");
let result = (lift.lift)(store, storage)?;
self.task_complete(store, guest_thread.task, result, Status::Returned)
}
pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
let state = store.concurrent_state_mut();
let guest_thread = state.guest_thread.unwrap();
let task = state.get_mut(guest_thread.task)?;
if !task.cancel_sent {
bail!("`task.cancel` called by task which has not been cancelled")
}
_ = task.lift_result.take().ok_or_else(|| {
format_err!("`task.return` or `task.cancel` called more than once for current task")
})?;
assert!(task.result.is_none());
log::trace!("task.cancel for {guest_thread:?}");
self.task_complete(
store,
guest_thread.task,
Box::new(DummyResult),
Status::ReturnCancelled,
)
}
fn task_complete(
self,
store: &mut StoreOpaque,
guest_task: TableId<GuestTask>,
result: Box<dyn Any + Send + Sync>,
status: Status,
) -> Result<()> {
let (calls, host_table, _, instance) = store.component_resource_state_with_instance(self);
ResourceTables {
calls,
host_table: Some(host_table),
guest: Some(instance.instance_states()),
}
.exit_call()?;
let state = store.concurrent_state_mut();
let task = state.get_mut(guest_task)?;
if let Caller::Host { tx, .. } = &mut task.caller {
if let Some(tx) = tx.take() {
_ = tx.send(result);
}
} else {
task.result = Some(result);
Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
}
Ok(())
}
pub(crate) fn waitable_set_new(
self,
store: &mut StoreOpaque,
caller_instance: RuntimeComponentInstanceIndex,
) -> Result<u32> {
let set = store.concurrent_state_mut().push(WaitableSet::default())?;
let handle = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_insert(set.rep())?;
log::trace!("new waitable set {set:?} (handle {handle})");
Ok(handle)
}
pub(crate) fn waitable_set_drop(
self,
store: &mut StoreOpaque,
caller_instance: RuntimeComponentInstanceIndex,
set: u32,
) -> Result<()> {
let rep = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_remove(set)?;
log::trace!("drop waitable set {rep} (handle {set})");
let set = store
.concurrent_state_mut()
.delete(TableId::<WaitableSet>::new(rep))?;
if !set.waiting.is_empty() {
bail!("cannot drop waitable set with waiters");
}
Ok(())
}
pub(crate) fn waitable_join(
self,
store: &mut StoreOpaque,
caller_instance: RuntimeComponentInstanceIndex,
waitable_handle: u32,
set_handle: u32,
) -> Result<()> {
let mut instance = self.id().get_mut(store);
let waitable =
Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
let set = if set_handle == 0 {
None
} else {
let set = instance.instance_states().0[caller_instance]
.handle_table()
.waitable_set_rep(set_handle)?;
Some(TableId::<WaitableSet>::new(set))
};
log::trace!(
"waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
);
waitable.join(store.concurrent_state_mut(), set)
}
pub(crate) fn subtask_drop(
self,
store: &mut StoreOpaque,
caller_instance: RuntimeComponentInstanceIndex,
task_id: u32,
) -> Result<()> {
self.waitable_join(store, caller_instance, task_id, 0)?;
let (rep, is_host) = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.subtask_remove(task_id)?;
let concurrent_state = store.concurrent_state_mut();
let (waitable, expected_caller_instance, delete) = if is_host {
let id = TableId::<HostTask>::new(rep);
let task = concurrent_state.get_mut(id)?;
if task.join_handle.is_some() {
bail!("cannot drop a subtask which has not yet resolved");
}
(Waitable::Host(id), task.caller_instance, true)
} else {
let id = TableId::<GuestTask>::new(rep);
let task = concurrent_state.get_mut(id)?;
if task.lift_result.is_some() {
bail!("cannot drop a subtask which has not yet resolved");
}
if let &Caller::Guest { thread } = &task.caller {
(
Waitable::Guest(id),
concurrent_state.get_mut(thread.task)?.instance,
concurrent_state.get_mut(id)?.exited,
)
} else {
unreachable!()
}
};
waitable.common(concurrent_state)?.handle = None;
if waitable.take_event(concurrent_state)?.is_some() {
bail!("cannot drop a subtask with an undelivered event");
}
if delete {
waitable.delete_from(concurrent_state)?;
}
assert_eq!(
expected_caller_instance,
RuntimeInstance {
instance: self.id().instance(),
index: caller_instance
}
);
log::trace!("subtask_drop {waitable:?} (handle {task_id})");
Ok(())
}
pub(crate) fn waitable_set_wait(
self,
store: &mut StoreOpaque,
options: OptionsIndex,
set: u32,
payload: u32,
) -> Result<u32> {
if !self.options(store, options).async_ {
store.check_blocking()?;
}
let &CanonicalOptions {
cancellable,
instance: caller_instance,
..
} = &self.id().get(store).component().env_component().options[options];
let rep = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_rep(set)?;
self.waitable_check(
store,
cancellable,
WaitableCheck::Wait,
WaitableCheckParams {
set: TableId::new(rep),
options,
payload,
},
)
}
pub(crate) fn waitable_set_poll(
self,
store: &mut StoreOpaque,
options: OptionsIndex,
set: u32,
payload: u32,
) -> Result<u32> {
let &CanonicalOptions {
cancellable,
instance: caller_instance,
..
} = &self.id().get(store).component().env_component().options[options];
let rep = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.waitable_set_rep(set)?;
self.waitable_check(
store,
cancellable,
WaitableCheck::Poll,
WaitableCheckParams {
set: TableId::new(rep),
options,
payload,
},
)
}
pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
let thread_id = store.concurrent_state_mut().guest_thread.unwrap().thread;
Ok(store
.concurrent_state_mut()
.get_mut(thread_id)?
.instance_rep
.unwrap())
}
pub(crate) fn thread_new_indirect<T: 'static>(
self,
mut store: StoreContextMut<T>,
runtime_instance: RuntimeComponentInstanceIndex,
_func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex,
start_func_idx: u32,
context: i32,
) -> Result<u32> {
log::trace!("creating new thread");
let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
let (instance, registry) = self.id().get_mut_and_registry(store.0);
let callee = instance
.index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
.ok_or_else(|| {
format_err!("the start function index points to an uninitialized function")
})?;
if callee.type_index(store.0) != start_func_ty.type_index() {
bail!(
"start function does not match expected type (currently only `(i32) -> ()` is supported)"
);
}
let token = StoreToken::new(store.as_context_mut());
let start_func = Box::new(
move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
let old_thread = store.set_thread(Some(guest_thread));
log::trace!(
"thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
);
store.maybe_push_call_context(guest_thread.task)?;
let mut store = token.as_context_mut(store);
let mut params = [ValRaw::i32(context)];
unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
store.0.maybe_pop_call_context(guest_thread.task)?;
self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
log::trace!("explicit thread {guest_thread:?} completed");
let state = store.0.concurrent_state_mut();
let task = state.get_mut(guest_thread.task)?;
if task.threads.is_empty() && !task.returned_or_cancelled() {
bail!(Trap::NoAsyncResult);
}
store.0.set_thread(old_thread);
let state = store.0.concurrent_state_mut();
old_thread
.map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
if state.get_mut(guest_thread.task)?.ready_to_delete() {
Waitable::Guest(guest_thread.task).delete_from(state)?;
}
log::trace!("thread start: restored {old_thread:?} as current thread");
Ok(())
},
);
let state = store.0.concurrent_state_mut();
let current_thread = state.guest_thread.unwrap();
let parent_task = current_thread.task;
let new_thread = GuestThread::new_explicit(parent_task, start_func);
let thread_id = state.push(new_thread)?;
state.get_mut(parent_task)?.threads.insert(thread_id);
log::trace!("new thread with id {thread_id:?} created");
self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
}
pub(crate) fn resume_suspended_thread(
self,
store: &mut StoreOpaque,
runtime_instance: RuntimeComponentInstanceIndex,
thread_idx: u32,
high_priority: bool,
) -> Result<()> {
let thread_id =
GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
let state = store.concurrent_state_mut();
let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
let thread = state.get_mut(guest_thread.thread)?;
match mem::replace(&mut thread.state, GuestThreadState::Running) {
GuestThreadState::NotStartedExplicit(start_func) => {
log::trace!("starting thread {guest_thread:?}");
let guest_call = WorkItem::GuestCall(GuestCall {
thread: guest_thread,
kind: GuestCallKind::StartExplicit(Box::new(move |store| {
start_func(store, guest_thread)
})),
});
store
.concurrent_state_mut()
.push_work_item(guest_call, high_priority);
}
GuestThreadState::Suspended(fiber) => {
log::trace!("resuming thread {thread_id:?} that was suspended");
store
.concurrent_state_mut()
.push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
}
_ => {
bail!("cannot resume thread which is not suspended");
}
}
Ok(())
}
fn add_guest_thread_to_instance_table(
self,
thread_id: TableId<GuestThread>,
store: &mut StoreOpaque,
runtime_instance: RuntimeComponentInstanceIndex,
) -> Result<u32> {
let guest_id = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: runtime_instance,
})
.thread_handle_table()
.guest_thread_insert(thread_id.rep())?;
store
.concurrent_state_mut()
.get_mut(thread_id)?
.instance_rep = Some(guest_id);
Ok(guest_id)
}
pub(crate) fn suspension_intrinsic(
self,
store: &mut StoreOpaque,
caller: RuntimeComponentInstanceIndex,
cancellable: bool,
yielding: bool,
to_thread: Option<u32>,
) -> Result<WaitResult> {
if to_thread.is_none() {
let state = store.concurrent_state_mut();
if yielding {
if !state.may_block(state.guest_thread.unwrap().task) {
return Ok(WaitResult::Completed);
}
} else {
store.check_blocking()?;
}
}
if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
return Ok(WaitResult::Cancelled);
}
if let Some(thread) = to_thread {
self.resume_suspended_thread(store, caller, thread, true)?;
}
let state = store.concurrent_state_mut();
let guest_thread = state.guest_thread.unwrap();
let reason = if yielding {
SuspendReason::Yielding {
thread: guest_thread,
skip_may_block_check: to_thread.is_some(),
}
} else {
SuspendReason::ExplicitlySuspending {
thread: guest_thread,
skip_may_block_check: to_thread.is_some(),
}
};
store.suspend(reason)?;
if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
Ok(WaitResult::Cancelled)
} else {
Ok(WaitResult::Completed)
}
}
fn waitable_check(
self,
store: &mut StoreOpaque,
cancellable: bool,
check: WaitableCheck,
params: WaitableCheckParams,
) -> Result<u32> {
let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
let state = store.concurrent_state_mut();
let task = state.get_mut(guest_thread.task)?;
match &check {
WaitableCheck::Wait => {
let set = params.set;
if (task.event.is_none()
|| (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
&& state.get_mut(set)?.ready.is_empty()
{
if cancellable {
let old = state
.get_mut(guest_thread.thread)?
.wake_on_cancel
.replace(set);
assert!(old.is_none());
}
store.suspend(SuspendReason::Waiting {
set,
thread: guest_thread,
skip_may_block_check: false,
})?;
}
}
WaitableCheck::Poll => {}
}
log::trace!(
"waitable check for {guest_thread:?}; set {:?}, part two",
params.set
);
let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
let (ordinal, handle, result) = match &check {
WaitableCheck::Wait => {
let (event, waitable) = event.unwrap();
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
let (ordinal, result) = event.parts();
(ordinal, handle, result)
}
WaitableCheck::Poll => {
if let Some((event, waitable)) = event {
let handle = waitable.map(|(_, v)| v).unwrap_or(0);
let (ordinal, result) = event.parts();
(ordinal, handle, result)
} else {
log::trace!(
"no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
guest_thread.task,
params.set
);
let (ordinal, result) = Event::None.parts();
(ordinal, 0, result)
}
}
};
let memory = self.options_memory_mut(store, params.options);
let ptr = func::validate_inbounds_dynamic(
&CanonicalAbiInfo::POINTER_PAIR,
memory,
&ValRaw::u32(params.payload),
)?;
memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
Ok(ordinal)
}
pub(crate) fn subtask_cancel(
self,
store: &mut StoreOpaque,
caller_instance: RuntimeComponentInstanceIndex,
async_: bool,
task_id: u32,
) -> Result<u32> {
if !async_ {
store.check_blocking()?;
}
let (rep, is_host) = store
.instance_state(RuntimeInstance {
instance: self.id().instance(),
index: caller_instance,
})
.handle_table()
.subtask_rep(task_id)?;
let (waitable, expected_caller_instance) = if is_host {
let id = TableId::<HostTask>::new(rep);
(
Waitable::Host(id),
store.concurrent_state_mut().get_mut(id)?.caller_instance,
)
} else {
let id = TableId::<GuestTask>::new(rep);
if let &Caller::Guest { thread } = &store.concurrent_state_mut().get_mut(id)?.caller {
(
Waitable::Guest(id),
store.concurrent_state_mut().get_mut(thread.task)?.instance,
)
} else {
unreachable!()
}
};
assert_eq!(
expected_caller_instance,
RuntimeInstance {
instance: self.id().instance(),
index: caller_instance
}
);
log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
let concurrent_state = store.concurrent_state_mut();
if let Waitable::Host(host_task) = waitable {
if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
handle.abort();
return Ok(Status::ReturnCancelled as u32);
}
} else {
let caller = concurrent_state.guest_thread.unwrap();
let guest_task = TableId::<GuestTask>::new(rep);
let task = concurrent_state.get_mut(guest_task)?;
if !task.already_lowered_parameters() {
task.lower_params = None;
task.lift_result = None;
task.exited = true;
let instance = task.instance;
assert_eq!(1, task.threads.len());
let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
let concurrent_state = store.concurrent_state_mut();
concurrent_state.delete(thread)?;
assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
let pending = &mut store.instance_state(instance).concurrent_state().pending;
let pending_count = pending.len();
pending.retain(|thread, _| thread.task != guest_task);
if pending.len() == pending_count {
bail!("`subtask.cancel` called after terminal status delivered");
}
return Ok(Status::StartCancelled as u32);
} else if !task.returned_or_cancelled() {
task.cancel_sent = true;
task.event = Some(Event::Cancelled);
for thread in task.threads.clone() {
let thread = QualifiedThreadId {
task: guest_task,
thread,
};
if let Some(set) = concurrent_state
.get_mut(thread.thread)
.unwrap()
.wake_on_cancel
.take()
{
let item = match concurrent_state
.get_mut(set)?
.waiting
.remove(&thread)
.unwrap()
{
WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
thread,
kind: GuestCallKind::DeliverEvent {
instance,
set: None,
},
}),
};
concurrent_state.push_high_priority(item);
store.suspend(SuspendReason::Yielding {
thread: caller,
skip_may_block_check: false,
})?;
break;
}
}
let concurrent_state = store.concurrent_state_mut();
let task = concurrent_state.get_mut(guest_task)?;
if !task.returned_or_cancelled() {
if async_ {
return Ok(BLOCKED);
} else {
store.wait_for_event(Waitable::Guest(guest_task))?;
}
}
}
}
let event = waitable.take_event(store.concurrent_state_mut())?;
if let Some(Event::Subtask {
status: status @ (Status::Returned | Status::ReturnCancelled),
}) = event
{
Ok(status as u32)
} else {
bail!("`subtask.cancel` called after terminal status delivered");
}
}
pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
store.concurrent_state_mut().context_get(slot)
}
pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
store.concurrent_state_mut().context_set(slot, value)
}
}
pub trait VMComponentAsyncStore {
unsafe fn prepare_call(
&mut self,
instance: Instance,
memory: *mut VMMemoryDefinition,
start: *mut VMFuncRef,
return_: *mut VMFuncRef,
caller_instance: RuntimeComponentInstanceIndex,
callee_instance: RuntimeComponentInstanceIndex,
task_return_type: TypeTupleIndex,
callee_async: bool,
string_encoding: u8,
result_count: u32,
storage: *mut ValRaw,
storage_len: usize,
) -> Result<()>;
unsafe fn sync_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
storage: *mut MaybeUninit<ValRaw>,
storage_len: usize,
) -> Result<()>;
unsafe fn async_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
post_return: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
result_count: u32,
flags: u32,
) -> Result<u32>;
fn future_write(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32>;
fn future_read(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32>;
fn future_drop_writable(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
writer: u32,
) -> Result<()>;
fn stream_write(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn stream_read(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn flat_stream_write(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn flat_stream_read(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32>;
fn stream_drop_writable(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
writer: u32,
) -> Result<()>;
fn error_context_debug_message(
&mut self,
instance: Instance,
ty: TypeComponentLocalErrorContextTableIndex,
options: OptionsIndex,
err_ctx_handle: u32,
debug_msg_address: u32,
) -> Result<()>;
fn thread_new_indirect(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
func_ty_idx: TypeFuncIndex,
start_func_table_idx: RuntimeTableIndex,
start_func_idx: u32,
context: i32,
) -> Result<u32>;
}
impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
unsafe fn prepare_call(
&mut self,
instance: Instance,
memory: *mut VMMemoryDefinition,
start: *mut VMFuncRef,
return_: *mut VMFuncRef,
caller_instance: RuntimeComponentInstanceIndex,
callee_instance: RuntimeComponentInstanceIndex,
task_return_type: TypeTupleIndex,
callee_async: bool,
string_encoding: u8,
result_count_or_max_if_async: u32,
storage: *mut ValRaw,
storage_len: usize,
) -> Result<()> {
let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
unsafe {
instance.prepare_call(
StoreContextMut(self),
start,
return_,
caller_instance,
callee_instance,
task_return_type,
callee_async,
memory,
string_encoding,
match result_count_or_max_if_async {
PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
params,
has_result: false,
},
PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
params,
has_result: true,
},
result_count => CallerInfo::Sync {
params,
result_count,
},
},
)
}
}
unsafe fn sync_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
storage: *mut MaybeUninit<ValRaw>,
storage_len: usize,
) -> Result<()> {
unsafe {
instance
.start_call(
StoreContextMut(self),
callback,
ptr::null_mut(),
callee,
param_count,
1,
START_FLAG_ASYNC_CALLEE,
Some(std::slice::from_raw_parts_mut(storage, storage_len)),
)
.map(drop)
}
}
unsafe fn async_start(
&mut self,
instance: Instance,
callback: *mut VMFuncRef,
post_return: *mut VMFuncRef,
callee: *mut VMFuncRef,
param_count: u32,
result_count: u32,
flags: u32,
) -> Result<u32> {
unsafe {
instance.start_call(
StoreContextMut(self),
callback,
post_return,
callee,
param_count,
result_count,
flags,
None,
)
}
}
fn future_write(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32> {
instance
.guest_write(
StoreContextMut(self),
caller,
TransmitIndex::Future(ty),
options,
None,
future,
address,
1,
)
.map(|result| result.encode())
}
fn future_read(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeFutureTableIndex,
options: OptionsIndex,
future: u32,
address: u32,
) -> Result<u32> {
instance
.guest_read(
StoreContextMut(self),
caller,
TransmitIndex::Future(ty),
options,
None,
future,
address,
1,
)
.map(|result| result.encode())
}
fn stream_write(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_write(
StoreContextMut(self),
caller,
TransmitIndex::Stream(ty),
options,
None,
stream,
address,
count,
)
.map(|result| result.encode())
}
fn stream_read(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_read(
StoreContextMut(self),
caller,
TransmitIndex::Stream(ty),
options,
None,
stream,
address,
count,
)
.map(|result| result.encode())
}
fn future_drop_writable(
&mut self,
instance: Instance,
ty: TypeFutureTableIndex,
writer: u32,
) -> Result<()> {
instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
}
fn flat_stream_write(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_write(
StoreContextMut(self),
caller,
TransmitIndex::Stream(ty),
options,
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
.map(|result| result.encode())
}
fn flat_stream_read(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TypeStreamTableIndex,
options: OptionsIndex,
payload_size: u32,
payload_align: u32,
stream: u32,
address: u32,
count: u32,
) -> Result<u32> {
instance
.guest_read(
StoreContextMut(self),
caller,
TransmitIndex::Stream(ty),
options,
Some(FlatAbi {
size: payload_size,
align: payload_align,
}),
stream,
address,
count,
)
.map(|result| result.encode())
}
fn stream_drop_writable(
&mut self,
instance: Instance,
ty: TypeStreamTableIndex,
writer: u32,
) -> Result<()> {
instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
}
fn error_context_debug_message(
&mut self,
instance: Instance,
ty: TypeComponentLocalErrorContextTableIndex,
options: OptionsIndex,
err_ctx_handle: u32,
debug_msg_address: u32,
) -> Result<()> {
instance.error_context_debug_message(
StoreContextMut(self),
ty,
options,
err_ctx_handle,
debug_msg_address,
)
}
fn thread_new_indirect(
&mut self,
instance: Instance,
caller: RuntimeComponentInstanceIndex,
func_ty_idx: TypeFuncIndex,
start_func_table_idx: RuntimeTableIndex,
start_func_idx: u32,
context: i32,
) -> Result<u32> {
instance.thread_new_indirect(
StoreContextMut(self),
caller,
func_ty_idx,
start_func_table_idx,
start_func_idx,
context,
)
}
}
type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
struct HostTask {
common: WaitableCommon,
caller_instance: RuntimeInstance,
join_handle: Option<JoinHandle>,
}
impl HostTask {
fn new(caller_instance: RuntimeInstance, join_handle: Option<JoinHandle>) -> Self {
Self {
common: WaitableCommon::default(),
caller_instance,
join_handle,
}
}
}
impl TableDebug for HostTask {
fn type_name() -> &'static str {
"HostTask"
}
}
type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
enum Caller {
Host {
tx: Option<oneshot::Sender<LiftedResult>>,
exit_tx: Arc<oneshot::Sender<()>>,
host_future_present: bool,
caller: Option<QualifiedThreadId>,
},
Guest {
thread: QualifiedThreadId,
},
}
struct LiftResult {
lift: RawLift,
ty: TypeTupleIndex,
memory: Option<SendSyncPtr<VMMemoryDefinition>>,
string_encoding: StringEncoding,
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
struct QualifiedThreadId {
task: TableId<GuestTask>,
thread: TableId<GuestThread>,
}
impl QualifiedThreadId {
fn qualify(
state: &mut ConcurrentState,
thread: TableId<GuestThread>,
) -> Result<QualifiedThreadId> {
Ok(QualifiedThreadId {
task: state.get_mut(thread)?.parent_task,
thread,
})
}
}
impl fmt::Debug for QualifiedThreadId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("QualifiedThreadId")
.field(&self.task.rep())
.field(&self.thread.rep())
.finish()
}
}
enum GuestThreadState {
NotStartedImplicit,
NotStartedExplicit(
Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
),
Running,
Suspended(StoreFiber<'static>),
Pending,
Completed,
}
pub struct GuestThread {
context: [u32; 2],
parent_task: TableId<GuestTask>,
wake_on_cancel: Option<TableId<WaitableSet>>,
state: GuestThreadState,
instance_rep: Option<u32>,
}
impl GuestThread {
fn from_instance(
state: Pin<&mut ComponentInstance>,
caller_instance: RuntimeComponentInstanceIndex,
guest_thread: u32,
) -> Result<TableId<Self>> {
let rep = state.instance_states().0[caller_instance]
.thread_handle_table()
.guest_thread_rep(guest_thread)?;
Ok(TableId::new(rep))
}
fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
Self {
context: [0; 2],
parent_task,
wake_on_cancel: None,
state: GuestThreadState::NotStartedImplicit,
instance_rep: None,
}
}
fn new_explicit(
parent_task: TableId<GuestTask>,
start_func: Box<
dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
>,
) -> Self {
Self {
context: [0; 2],
parent_task,
wake_on_cancel: None,
state: GuestThreadState::NotStartedExplicit(start_func),
instance_rep: None,
}
}
}
impl TableDebug for GuestThread {
fn type_name() -> &'static str {
"GuestThread"
}
}
enum SyncResult {
NotProduced,
Produced(Option<ValRaw>),
Taken,
}
impl SyncResult {
fn take(&mut self) -> Option<Option<ValRaw>> {
match mem::replace(self, SyncResult::Taken) {
SyncResult::NotProduced => None,
SyncResult::Produced(val) => Some(val),
SyncResult::Taken => {
panic!("attempted to take a synchronous result that was already taken")
}
}
}
}
#[derive(Debug)]
enum HostFutureState {
NotApplicable,
Live,
Dropped,
}
pub(crate) struct GuestTask {
common: WaitableCommon,
lower_params: Option<RawLower>,
lift_result: Option<LiftResult>,
result: Option<LiftedResult>,
callback: Option<CallbackFn>,
caller: Caller,
call_context: Option<CallContext>,
sync_result: SyncResult,
cancel_sent: bool,
starting_sent: bool,
subtasks: HashSet<TableId<GuestTask>>,
sync_call_set: TableId<WaitableSet>,
instance: RuntimeInstance,
event: Option<Event>,
function_index: Option<ExportIndex>,
exited: bool,
threads: HashSet<TableId<GuestThread>>,
host_future_state: HostFutureState,
async_function: bool,
}
impl GuestTask {
fn already_lowered_parameters(&self) -> bool {
self.lower_params.is_none()
}
fn returned_or_cancelled(&self) -> bool {
self.lift_result.is_none()
}
fn ready_to_delete(&self) -> bool {
let threads_completed = self.threads.is_empty();
let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
let pending_completion_event = matches!(
self.common.event,
Some(Event::Subtask {
status: Status::Returned | Status::ReturnCancelled
})
);
let ready = threads_completed
&& !has_sync_result
&& !pending_completion_event
&& !matches!(self.host_future_state, HostFutureState::Live);
log::trace!(
"ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
threads_completed,
has_sync_result,
pending_completion_event,
self.host_future_state
);
ready
}
fn new(
state: &mut ConcurrentState,
lower_params: RawLower,
lift_result: LiftResult,
caller: Caller,
callback: Option<CallbackFn>,
instance: RuntimeInstance,
async_function: bool,
) -> Result<Self> {
let sync_call_set = state.push(WaitableSet::default())?;
let host_future_state = match &caller {
Caller::Guest { .. } => HostFutureState::NotApplicable,
Caller::Host {
host_future_present,
..
} => {
if *host_future_present {
HostFutureState::Live
} else {
HostFutureState::NotApplicable
}
}
};
Ok(Self {
common: WaitableCommon::default(),
lower_params: Some(lower_params),
lift_result: Some(lift_result),
result: None,
callback,
caller,
call_context: Some(CallContext::default()),
sync_result: SyncResult::NotProduced,
cancel_sent: false,
starting_sent: false,
subtasks: HashSet::new(),
sync_call_set,
instance,
event: None,
function_index: None,
exited: false,
threads: HashSet::new(),
host_future_state,
async_function,
})
}
fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
if let Some(Event::Subtask {
status: Status::Returned | Status::ReturnCancelled,
}) = waitable.common(state)?.event
{
waitable.delete_from(state)?;
}
}
assert!(self.threads.is_empty());
state.delete(self.sync_call_set)?;
match &self.caller {
Caller::Guest { thread } => {
let task_mut = state.get_mut(thread.task)?;
let present = task_mut.subtasks.remove(&me);
assert!(present);
for subtask in &self.subtasks {
task_mut.subtasks.insert(*subtask);
}
for subtask in &self.subtasks {
state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
}
}
Caller::Host {
exit_tx, caller, ..
} => {
for subtask in &self.subtasks {
state.get_mut(*subtask)?.caller = Caller::Host {
tx: None,
exit_tx: exit_tx.clone(),
host_future_present: false,
caller: *caller,
};
}
}
}
for subtask in self.subtasks {
let task = state.get_mut(subtask)?;
if task.exited && task.ready_to_delete() {
Waitable::Guest(subtask).delete_from(state)?;
}
}
Ok(())
}
}
impl TableDebug for GuestTask {
fn type_name() -> &'static str {
"GuestTask"
}
}
#[derive(Default)]
struct WaitableCommon {
event: Option<Event>,
set: Option<TableId<WaitableSet>>,
handle: Option<u32>,
}
#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
enum Waitable {
Host(TableId<HostTask>),
Guest(TableId<GuestTask>),
Transmit(TableId<TransmitHandle>),
}
impl Waitable {
fn from_instance(
state: Pin<&mut ComponentInstance>,
caller_instance: RuntimeComponentInstanceIndex,
waitable: u32,
) -> Result<Self> {
use crate::runtime::vm::component::Waitable;
let (waitable, kind) = state.instance_states().0[caller_instance]
.handle_table()
.waitable_rep(waitable)?;
Ok(match kind {
Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
})
}
fn rep(&self) -> u32 {
match self {
Self::Host(id) => id.rep(),
Self::Guest(id) => id.rep(),
Self::Transmit(id) => id.rep(),
}
}
fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
log::trace!("waitable {self:?} join set {set:?}",);
let old = mem::replace(&mut self.common(state)?.set, set);
if let Some(old) = old {
match *self {
Waitable::Host(id) => state.remove_child(id, old),
Waitable::Guest(id) => state.remove_child(id, old),
Waitable::Transmit(id) => state.remove_child(id, old),
}?;
state.get_mut(old)?.ready.remove(self);
}
if let Some(set) = set {
match *self {
Waitable::Host(id) => state.add_child(id, set),
Waitable::Guest(id) => state.add_child(id, set),
Waitable::Transmit(id) => state.add_child(id, set),
}?;
if self.common(state)?.event.is_some() {
self.mark_ready(state)?;
}
}
Ok(())
}
fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
Ok(match self {
Self::Host(id) => &mut state.get_mut(*id)?.common,
Self::Guest(id) => &mut state.get_mut(*id)?.common,
Self::Transmit(id) => &mut state.get_mut(*id)?.common,
})
}
fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
log::trace!("set event for {self:?}: {event:?}");
self.common(state)?.event = event;
self.mark_ready(state)
}
fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
let common = self.common(state)?;
let event = common.event.take();
if let Some(set) = self.common(state)?.set {
state.get_mut(set)?.ready.remove(self);
}
Ok(event)
}
fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
if let Some(set) = self.common(state)?.set {
state.get_mut(set)?.ready.insert(*self);
if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
let item = match mode {
WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
thread,
kind: GuestCallKind::DeliverEvent {
instance,
set: Some(set),
},
}),
};
state.push_high_priority(item);
}
}
Ok(())
}
fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
match self {
Self::Host(task) => {
log::trace!("delete host task {task:?}");
state.delete(*task)?;
}
Self::Guest(task) => {
log::trace!("delete guest task {task:?}");
state.delete(*task)?.dispose(state, *task)?;
}
Self::Transmit(task) => {
state.delete(*task)?;
}
}
Ok(())
}
}
impl fmt::Debug for Waitable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Host(id) => write!(f, "{id:?}"),
Self::Guest(id) => write!(f, "{id:?}"),
Self::Transmit(id) => write!(f, "{id:?}"),
}
}
}
#[derive(Default)]
struct WaitableSet {
ready: BTreeSet<Waitable>,
waiting: BTreeMap<QualifiedThreadId, WaitMode>,
}
impl TableDebug for WaitableSet {
fn type_name() -> &'static str {
"WaitableSet"
}
}
type RawLower =
Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
type RawLift = Box<
dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
>;
type LiftedResult = Box<dyn Any + Send + Sync>;
struct DummyResult;
#[derive(Default)]
pub struct ConcurrentInstanceState {
backpressure: u16,
do_not_enter: bool,
pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
}
impl ConcurrentInstanceState {
pub fn pending_is_empty(&self) -> bool {
self.pending.is_empty()
}
}
pub struct ConcurrentState {
guest_thread: Option<QualifiedThreadId>,
futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
table: AlwaysMut<ResourceTable>,
high_priority: Vec<WorkItem>,
low_priority: VecDeque<WorkItem>,
suspend_reason: Option<SuspendReason>,
worker: Option<StoreFiber<'static>>,
worker_item: Option<WorkerItem>,
global_error_context_ref_counts:
BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
}
impl Default for ConcurrentState {
fn default() -> Self {
Self {
guest_thread: None,
table: AlwaysMut::new(ResourceTable::new()),
futures: AlwaysMut::new(Some(FuturesUnordered::new())),
high_priority: Vec::new(),
low_priority: VecDeque::new(),
suspend_reason: None,
worker: None,
worker_item: None,
global_error_context_ref_counts: BTreeMap::new(),
}
}
}
impl ConcurrentState {
pub(crate) fn take_fibers_and_futures(
&mut self,
fibers: &mut Vec<StoreFiber<'static>>,
futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
) {
for entry in self.table.get_mut().iter_mut() {
if let Some(set) = entry.downcast_mut::<WaitableSet>() {
for mode in mem::take(&mut set.waiting).into_values() {
if let WaitMode::Fiber(fiber) = mode {
fibers.push(fiber);
}
}
} else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
if let GuestThreadState::Suspended(fiber) =
mem::replace(&mut thread.state, GuestThreadState::Completed)
{
fibers.push(fiber);
}
}
}
if let Some(fiber) = self.worker.take() {
fibers.push(fiber);
}
let mut handle_item = |item| match item {
WorkItem::ResumeFiber(fiber) => {
fibers.push(fiber);
}
WorkItem::PushFuture(future) => {
self.futures
.get_mut()
.as_mut()
.unwrap()
.push(future.into_inner());
}
_ => {}
};
for item in mem::take(&mut self.high_priority) {
handle_item(item);
}
for item in mem::take(&mut self.low_priority) {
handle_item(item);
}
if let Some(them) = self.futures.get_mut().take() {
futures.push(them);
}
}
fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
let mut ready = mem::take(&mut self.high_priority);
if ready.is_empty() {
if let Some(item) = self.low_priority.pop_back() {
ready.push(item);
}
}
ready
}
fn push<V: Send + Sync + 'static>(
&mut self,
value: V,
) -> Result<TableId<V>, ResourceTableError> {
self.table.get_mut().push(value).map(TableId::from)
}
fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
self.table.get_mut().get_mut(&Resource::from(id))
}
pub fn add_child<T: 'static, U: 'static>(
&mut self,
child: TableId<T>,
parent: TableId<U>,
) -> Result<(), ResourceTableError> {
self.table
.get_mut()
.add_child(Resource::from(child), Resource::from(parent))
}
pub fn remove_child<T: 'static, U: 'static>(
&mut self,
child: TableId<T>,
parent: TableId<U>,
) -> Result<(), ResourceTableError> {
self.table
.get_mut()
.remove_child(Resource::from(child), Resource::from(parent))
}
fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
self.table.get_mut().delete(Resource::from(id))
}
fn push_future(&mut self, future: HostTaskFuture) {
self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
}
fn push_high_priority(&mut self, item: WorkItem) {
log::trace!("push high priority: {item:?}");
self.high_priority.push(item);
}
fn push_low_priority(&mut self, item: WorkItem) {
log::trace!("push low priority: {item:?}");
self.low_priority.push_front(item);
}
fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
if high_priority {
self.push_high_priority(item);
} else {
self.push_low_priority(item);
}
}
pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
let thread = self.guest_thread.unwrap();
let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
Ok(val)
}
pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
let thread = self.guest_thread.unwrap();
log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
Ok(())
}
fn take_pending_cancellation(&mut self) -> bool {
let thread = self.guest_thread.unwrap();
if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
assert!(matches!(event, Event::Cancelled));
true
} else {
false
}
}
fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
if self.may_block(task) {
Ok(())
} else {
Err(Trap::CannotBlockSyncTask.into())
}
}
fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
let task = self.get_mut(task).unwrap();
task.async_function || task.returned_or_cancelled()
}
}
fn for_any_lower<
F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
>(
fun: F,
) -> F {
fun
}
fn for_any_lift<
F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
>(
fun: F,
) -> F {
fun
}
fn checked<F: Future + Send + 'static>(
id: StoreId,
fut: F,
) -> impl Future<Output = F::Output> + Send + 'static {
async move {
let mut fut = pin!(fut);
future::poll_fn(move |cx| {
let message = "\
`Future`s which depend on asynchronous component tasks, streams, or \
futures to complete may only be polled from the event loop of the \
store to which they belong. Please use \
`StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
";
tls::try_get(|store| {
let matched = match store {
tls::TryGet::Some(store) => store.id() == id,
tls::TryGet::Taken | tls::TryGet::None => false,
};
if !matched {
panic!("{message}")
}
});
fut.as_mut().poll(cx)
})
.await
}
}
fn check_recursive_run() {
tls::try_get(|store| {
if !matches!(store, tls::TryGet::None) {
panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
}
});
}
fn unpack_callback_code(code: u32) -> (u32, u32) {
(code & 0xF, code >> 4)
}
struct WaitableCheckParams {
set: TableId<WaitableSet>,
options: OptionsIndex,
payload: u32,
}
enum WaitableCheck {
Wait,
Poll,
}
pub(crate) struct PreparedCall<R> {
handle: Func,
thread: QualifiedThreadId,
param_count: usize,
rx: oneshot::Receiver<LiftedResult>,
exit_rx: oneshot::Receiver<()>,
_phantom: PhantomData<R>,
}
impl<R> PreparedCall<R> {
pub(crate) fn task_id(&self) -> TaskId {
TaskId {
task: self.thread.task,
}
}
}
pub(crate) struct TaskId {
task: TableId<GuestTask>,
}
impl TaskId {
pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
let task = store.0.concurrent_state_mut().get_mut(self.task)?;
if !task.already_lowered_parameters() {
Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
} else {
task.host_future_state = HostFutureState::Dropped;
if task.ready_to_delete() {
Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
}
}
Ok(())
}
}
pub(crate) fn prepare_call<T, R>(
mut store: StoreContextMut<T>,
handle: Func,
param_count: usize,
host_future_present: bool,
lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
+ Send
+ Sync
+ 'static,
lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
+ Send
+ Sync
+ 'static,
) -> Result<PreparedCall<R>> {
let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
let instance = handle.instance().id().get(store.0);
let options = &instance.component().env_component().options[options];
let ty = &instance.component().types()[ty];
let async_function = ty.async_;
let task_return_type = ty.results;
let component_instance = raw_options.instance;
let callback = options.callback.map(|i| instance.runtime_callback(i));
let memory = options
.memory()
.map(|i| instance.runtime_memory(i))
.map(SendSyncPtr::new);
let string_encoding = options.string_encoding;
let token = StoreToken::new(store.as_context_mut());
let state = store.0.concurrent_state_mut();
let (tx, rx) = oneshot::channel();
let (exit_tx, exit_rx) = oneshot::channel();
let caller = state.guest_thread;
let mut task = GuestTask::new(
state,
Box::new(for_any_lower(move |store, params| {
lower_params(handle, token.as_context_mut(store), params)
})),
LiftResult {
lift: Box::new(for_any_lift(move |store, result| {
lift_result(handle, store, result)
})),
ty: task_return_type,
memory,
string_encoding,
},
Caller::Host {
tx: Some(tx),
exit_tx: Arc::new(exit_tx),
host_future_present,
caller,
},
callback.map(|callback| {
let callback = SendSyncPtr::new(callback);
let instance = handle.instance();
Box::new(move |store: &mut dyn VMStore, event, handle| {
let store = token.as_context_mut(store);
unsafe { instance.call_callback(store, callback, event, handle) }
}) as CallbackFn
}),
RuntimeInstance {
instance: handle.instance().id().instance(),
index: component_instance,
},
async_function,
)?;
task.function_index = Some(handle.index());
let task = state.push(task)?;
let thread = state.push(GuestThread::new_implicit(task))?;
state.get_mut(task)?.threads.insert(thread);
if !store.0.may_enter_task(task) {
bail!(crate::Trap::CannotEnterComponent);
}
Ok(PreparedCall {
handle,
thread: QualifiedThreadId { task, thread },
param_count,
rx,
exit_rx,
_phantom: PhantomData,
})
}
pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
mut store: StoreContextMut<T>,
prepared: PreparedCall<R>,
) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
let PreparedCall {
handle,
thread,
param_count,
rx,
exit_rx,
..
} = prepared;
queue_call0(store.as_context_mut(), handle, thread, param_count)?;
Ok(checked(
store.0.id(),
rx.map(move |result| {
result
.map(|v| (*v.downcast().unwrap(), exit_rx))
.map_err(crate::Error::from)
}),
))
}
fn queue_call0<T: 'static>(
store: StoreContextMut<T>,
handle: Func,
guest_thread: QualifiedThreadId,
param_count: usize,
) -> Result<()> {
let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
let is_concurrent = raw_options.async_;
let callback = raw_options.callback;
let instance = handle.instance();
let callee = handle.lifted_core_func(store.0);
let post_return = handle.post_return_core_func(store.0);
let callback = callback.map(|i| {
let instance = instance.id().get(store.0);
SendSyncPtr::new(instance.runtime_callback(i))
});
log::trace!("queueing call {guest_thread:?}");
unsafe {
instance.queue_call(
store,
guest_thread,
SendSyncPtr::new(callee),
param_count,
1,
is_concurrent,
callback,
post_return.map(SendSyncPtr::new),
)
}
}