use super::table::{TableDebug, TableId};
use super::{Event, GlobalErrorContextRefCount, Waitable, WaitableCommon};
use crate::component::concurrent::{ConcurrentState, QualifiedThreadId, WorkItem, tls};
use crate::component::func::{self, LiftContext, LowerContext};
use crate::component::matching::InstanceType;
use crate::component::types;
use crate::component::values::ErrorContextAny;
use crate::component::{
AsAccessor, ComponentInstanceId, ComponentType, FutureAny, Instance, Lift, Lower, StreamAny,
Val, WasmList,
};
use crate::prelude::*;
use crate::store::{StoreOpaque, StoreToken};
use crate::try_mutex::{TryMutex, TryMutexGuard};
use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
use crate::vm::{AlwaysMut, VMStore};
use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
use crate::{
Error, Result, Trap, bail, bail_bug, ensure,
error::{Context as _, format_err},
};
use alloc::sync::Arc;
use buffers::{Extender, SliceBuffer, UntypedWriteBuffer};
use core::any::{Any, TypeId};
use core::fmt;
use core::future;
use core::iter;
use core::marker::PhantomData;
use core::mem::{self, ManuallyDrop, MaybeUninit};
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::task::{Context, Poll, Waker, ready};
use futures::channel::oneshot;
use futures::{FutureExt as _, stream};
use wasmtime_environ::component::{
CanonicalAbiInfo, ComponentTypes, InterfaceType, OptionsIndex, RuntimeComponentInstanceIndex,
TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
TypeFutureTableIndex, TypeStreamTableIndex,
};
pub use buffers::{ReadBuffer, VecBuffer, WriteBuffer};
mod buffers;
#[derive(Copy, Clone, Debug)]
pub enum TransmitKind {
Stream,
Future,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum ReturnCode {
Blocked,
Completed(ItemCount),
Dropped(ItemCount),
Cancelled(ItemCount),
}
impl ReturnCode {
pub fn encode(&self) -> u32 {
const BLOCKED: u32 = 0xffff_ffff;
const COMPLETED: u32 = 0x0;
const DROPPED: u32 = 0x1;
const CANCELLED: u32 = 0x2;
match self {
ReturnCode::Blocked => BLOCKED,
ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED,
ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED,
ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED,
}
}
fn completed(kind: TransmitKind, count: ItemCount) -> Self {
Self::Completed(if let TransmitKind::Future = kind {
ItemCount::ZERO
} else {
count
})
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[repr(transparent)]
pub struct ItemCount {
raw: u32,
}
impl ItemCount {
const MAX: u32 = 1 << 28;
const ZERO: ItemCount = ItemCount { raw: 0 };
fn new(count: u32) -> Result<Self, Trap> {
if count < Self::MAX {
Ok(Self { raw: count })
} else {
Err(Trap::StreamOpTooBig)
}
}
fn new_usize(count: usize) -> Result<Self, Trap> {
let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?;
Self::new(count)
}
fn as_u32(&self) -> u32 {
self.raw
}
fn as_usize(&self) -> usize {
usize::try_from(self.raw).unwrap()
}
fn inc(&mut self, amt: usize) -> Result<(), Trap> {
let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?;
let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?;
if new_raw < Self::MAX {
self.raw = new_raw;
Ok(())
} else {
Err(Trap::StreamOpTooBig)
}
}
fn add(&self, other: ItemCount) -> Result<ItemCount> {
match self.raw.checked_add(other.raw) {
Some(raw) => Ok(ItemCount::new(raw)?),
None => bail_bug!("overflow in `ItemCount::add`"),
}
}
fn sub(&self, other: ItemCount) -> Result<ItemCount> {
match self.raw.checked_sub(other.raw) {
Some(raw) => Ok(ItemCount { raw }),
None => bail_bug!("underflow in `ItemCount::sub`"),
}
}
}
impl fmt::Display for ItemCount {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.raw.fmt(f)
}
}
impl fmt::Debug for ItemCount {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.raw.fmt(f)
}
}
impl PartialEq<u32> for ItemCount {
fn eq(&self, other: &u32) -> bool {
self.raw == *other
}
}
impl PartialOrd<u32> for ItemCount {
fn partial_cmp(&self, other: &u32) -> Option<core::cmp::Ordering> {
self.raw.partial_cmp(other)
}
}
#[derive(Copy, Clone, Debug)]
pub enum TransmitIndex {
Stream(TypeStreamTableIndex),
Future(TypeFutureTableIndex),
}
impl TransmitIndex {
pub fn kind(&self) -> TransmitKind {
match self {
TransmitIndex::Stream(_) => TransmitKind::Stream,
TransmitIndex::Future(_) => TransmitKind::Future,
}
}
fn payload<'a>(&self, types: &'a ComponentTypes) -> Option<&'a InterfaceType> {
match self {
TransmitIndex::Stream(i) => {
let ty = types[*i].ty;
types[ty].payload.as_ref()
}
TransmitIndex::Future(i) => {
let ty = types[*i].ty;
types[ty].payload.as_ref()
}
}
}
}
fn get_mut_by_index_from(
handle_table: &mut HandleTable,
ty: TransmitIndex,
index: u32,
) -> Result<(u32, &mut TransmitLocalState)> {
match ty {
TransmitIndex::Stream(ty) => handle_table.stream_rep(ty, index),
TransmitIndex::Future(ty) => handle_table.future_rep(ty, index),
}
}
fn lower<T: func::Lower + Send + 'static, B: WriteBuffer<T>, U: 'static>(
mut store: StoreContextMut<U>,
instance: Instance,
caller_thread: QualifiedThreadId,
options: OptionsIndex,
ty: TransmitIndex,
address: usize,
count: usize,
buffer: &mut B,
) -> Result<()> {
let count = buffer.remaining().len().min(count);
let (lower, old_thread) = if T::MAY_REQUIRE_REALLOC {
let old_thread = store.0.set_thread(caller_thread)?;
(
&mut LowerContext::new(store.as_context_mut(), options, instance),
Some(old_thread),
)
} else {
(
&mut LowerContext::new_without_realloc(store.as_context_mut(), options, instance),
None,
)
};
if address % usize::try_from(T::ALIGN32)? != 0 {
bail!("read pointer not aligned");
}
lower
.as_slice_mut()
.get_mut(address..)
.and_then(|b| b.get_mut(..T::SIZE32 * count))
.ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))?;
if let Some(ty) = ty.payload(lower.types) {
T::linear_store_list_to_memory(lower, *ty, address, &buffer.remaining()[..count])?;
}
if let Some(old_thread) = old_thread {
store.0.set_thread(old_thread)?;
}
buffer.skip(count);
Ok(())
}
fn lift<T: func::Lift + Send + 'static, B: ReadBuffer<T>>(
lift: &mut LiftContext<'_>,
ty: Option<InterfaceType>,
buffer: &mut B,
address: usize,
count: usize,
) -> Result<()> {
let count = count.min(buffer.remaining_capacity());
if T::IS_RUST_UNIT_TYPE {
buffer.extend(
iter::repeat_with(|| unsafe { MaybeUninit::uninit().assume_init() }).take(count),
)
} else {
let ty = match ty {
Some(ty) => ty,
None => bail_bug!("type required for non-unit lift"),
};
if address % usize::try_from(T::ALIGN32)? != 0 {
bail!("write pointer not aligned");
}
lift.memory()
.get(address..)
.and_then(|b| b.get(..T::SIZE32 * count))
.ok_or_else(|| crate::format_err!("write pointer out of bounds of memory"))?;
let list = &WasmList::new(address, count, lift, ty)?;
T::linear_lift_into_from_memory(lift, list, &mut Extender(buffer))?
}
Ok(())
}
#[derive(Debug, PartialEq, Eq, PartialOrd)]
pub(super) struct ErrorContextState {
pub(crate) debug_msg: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct FlatAbi {
pub(super) size: u32,
pub(super) align: u32,
}
struct HostBuffer<'a> {
dst: &'a mut Vec<u8>,
marked_written: &'a mut usize,
}
impl HostBuffer<'_> {
fn reborrow(&mut self) -> HostBuffer<'_> {
HostBuffer {
dst: &mut *self.dst,
marked_written: &mut *self.marked_written,
}
}
}
pub struct Destination<'a, T, B> {
id: TableId<TransmitState>,
buffer: &'a mut B,
host_buffer: Option<HostBuffer<'a>>,
_phantom: PhantomData<fn() -> T>,
}
impl<'a, T, B> Destination<'a, T, B> {
pub fn reborrow(&mut self) -> Destination<'_, T, B> {
Destination {
id: self.id,
buffer: &mut *self.buffer,
host_buffer: self.host_buffer.as_mut().map(|b| b.reborrow()),
_phantom: PhantomData,
}
}
pub fn take_buffer(&mut self) -> B
where
B: Default,
{
mem::take(self.buffer)
}
pub fn set_buffer(&mut self, buffer: B) {
*self.buffer = buffer;
}
pub fn remaining(&self, mut store: impl AsContextMut) -> Option<usize> {
self.remaining_(store.as_context_mut().0).unwrap()
}
fn remaining_(&self, store: &mut StoreOpaque) -> Result<Option<usize>> {
let transmit = store.concurrent_state_mut().get_mut(self.id)?;
if let &ReadState::GuestReady { count, .. } = &transmit.read {
let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
bail_bug!("expected WriteState::HostReady")
};
Ok(Some(count.as_usize() - guest_offset.as_usize()))
} else {
Ok(None)
}
}
}
impl<'a, B> Destination<'a, u8, B> {
pub fn as_direct<D>(
mut self,
store: StoreContextMut<'a, D>,
capacity: usize,
) -> DirectDestination<'a, D> {
if let Some(buffer) = &mut self.host_buffer {
*buffer.marked_written = 0;
buffer.dst.resize(capacity, 0);
}
DirectDestination {
id: self.id,
host_buffer: self.host_buffer,
store,
}
}
}
pub struct DirectDestination<'a, D: 'static> {
id: TableId<TransmitState>,
host_buffer: Option<HostBuffer<'a>>,
store: StoreContextMut<'a, D>,
}
#[cfg(feature = "std")]
impl<D: 'static> std::io::Write for DirectDestination<'_, D> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let rem = self.remaining();
let n = rem.len().min(buf.len());
rem[..n].copy_from_slice(&buf[..n]);
self.mark_written(n);
Ok(n)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<D: 'static> DirectDestination<'_, D> {
pub fn remaining(&mut self) -> &mut [u8] {
self.remaining_().unwrap()
}
fn remaining_(&mut self) -> Result<&mut [u8]> {
if let Some(buffer) = self.host_buffer.as_mut() {
return Ok(buffer.dst);
}
let transmit = self
.store
.as_context_mut()
.0
.concurrent_state_mut()
.get_mut(self.id)?;
let &ReadState::GuestReady {
address,
count,
options,
instance,
..
} = &transmit.read
else {
bail_bug!("expected ReadState::GuestReady")
};
let &WriteState::HostReady { guest_offset, .. } = &transmit.write else {
bail_bug!("expected WriteState::HostReady")
};
let memory = instance
.options_memory_mut(self.store.0, options)
.get_mut((address + guest_offset.as_usize())..)
.and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize())));
match memory {
Some(memory) => Ok(memory),
None => bail_bug!("guest buffer unexpectedly out of bounds"),
}
}
pub fn mark_written(&mut self, count: usize) {
self.mark_written_(count).unwrap()
}
fn mark_written_(&mut self, count: usize) -> Result<()> {
if let Some(buffer) = self.host_buffer.as_mut() {
*buffer.marked_written = buffer.marked_written.checked_add(count).unwrap();
} else {
let transmit = self
.store
.as_context_mut()
.0
.concurrent_state_mut()
.get_mut(self.id)?;
let ReadState::GuestReady {
count: read_count, ..
} = &transmit.read
else {
bail_bug!("expected ReadState::GuestReady")
};
let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
bail_bug!("expected WriteState::HostReady");
};
if guest_offset.as_usize() + count > read_count.as_usize() {
panic!(
"write count ({count}) must be less than or equal to read count ({read_count})"
)
} else {
guest_offset.inc(count)?;
}
}
Ok(())
}
}
#[derive(Copy, Clone, Debug)]
pub enum StreamResult {
Completed,
Cancelled,
Dropped,
}
pub trait StreamProducer<D>: Send + 'static {
type Item;
type Buffer: WriteBuffer<Self::Item> + Default;
fn poll_produce<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<'a, D>,
destination: Destination<'a, Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<Result<StreamResult>>;
fn try_into(me: Pin<Box<Self>>, _ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
Err(me)
}
}
impl<T, D> StreamProducer<D> for iter::Empty<T>
where
T: Send + Sync + 'static,
{
type Item = T;
type Buffer = Option<Self::Item>;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: StoreContextMut<'a, D>,
_: Destination<'a, Self::Item, Self::Buffer>,
_: bool,
) -> Poll<Result<StreamResult>> {
Poll::Ready(Ok(StreamResult::Dropped))
}
}
impl<T, D> StreamProducer<D> for stream::Empty<T>
where
T: Send + Sync + 'static,
{
type Item = T;
type Buffer = Option<Self::Item>;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: StoreContextMut<'a, D>,
_: Destination<'a, Self::Item, Self::Buffer>,
_: bool,
) -> Poll<Result<StreamResult>> {
Poll::Ready(Ok(StreamResult::Dropped))
}
}
impl<T, D> StreamProducer<D> for Vec<T>
where
T: Unpin + Send + Sync + 'static,
{
type Item = T;
type Buffer = VecBuffer<T>;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: StoreContextMut<'a, D>,
mut dst: Destination<'a, Self::Item, Self::Buffer>,
_: bool,
) -> Poll<Result<StreamResult>> {
dst.set_buffer(mem::take(self.get_mut()).into());
Poll::Ready(Ok(StreamResult::Dropped))
}
}
impl<T, D> StreamProducer<D> for Box<[T]>
where
T: Unpin + Send + Sync + 'static,
{
type Item = T;
type Buffer = VecBuffer<T>;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: StoreContextMut<'a, D>,
mut dst: Destination<'a, Self::Item, Self::Buffer>,
_: bool,
) -> Poll<Result<StreamResult>> {
dst.set_buffer(mem::take(self.get_mut()).into_vec().into());
Poll::Ready(Ok(StreamResult::Dropped))
}
}
#[cfg(feature = "component-model-bytes")]
impl<D> StreamProducer<D> for bytes::Bytes {
type Item = u8;
type Buffer = Self;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_store: StoreContextMut<'a, D>,
mut dst: Destination<'a, Self::Item, Self::Buffer>,
_: bool,
) -> Poll<Result<StreamResult>> {
dst.set_buffer(mem::take(self.get_mut()));
Poll::Ready(Ok(StreamResult::Dropped))
}
}
#[cfg(feature = "component-model-bytes")]
impl<D> StreamProducer<D> for bytes::BytesMut {
type Item = u8;
type Buffer = Self;
fn poll_produce<'a>(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_store: StoreContextMut<'a, D>,
mut dst: Destination<'a, Self::Item, Self::Buffer>,
_: bool,
) -> Poll<Result<StreamResult>> {
dst.set_buffer(mem::take(self.get_mut()));
Poll::Ready(Ok(StreamResult::Dropped))
}
}
pub struct Source<'a, T> {
id: TableId<TransmitState>,
host_buffer: Option<&'a mut dyn WriteBuffer<T>>,
}
impl<'a, T> Source<'a, T> {
pub fn reborrow(&mut self) -> Source<'_, T> {
Source {
id: self.id,
host_buffer: self.host_buffer.as_deref_mut(),
}
}
pub fn read<B, S: AsContextMut>(&mut self, mut store: S, buffer: &mut B) -> Result<()>
where
T: func::Lift + 'static,
B: ReadBuffer<T>,
{
if let Some(input) = &mut self.host_buffer {
let count = input.remaining().len().min(buffer.remaining_capacity());
buffer.move_from(*input, count);
} else {
let store = store.as_context_mut();
let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
bail_bug!("expected ReadState::HostReady");
};
let &WriteState::GuestReady {
ty,
address,
count,
options,
instance,
..
} = &transmit.write
else {
bail_bug!("expected WriteState::GuestReady");
};
let cx = &mut LiftContext::new(store.0.store_opaque_mut(), options, instance);
let ty = ty.payload(cx.types);
let old_remaining = buffer.remaining_capacity();
lift::<T, B>(
cx,
ty.copied(),
buffer,
address + (T::SIZE32 * guest_offset.as_usize()),
count.as_usize() - guest_offset.as_usize(),
)?;
let transmit = store.0.concurrent_state_mut().get_mut(self.id)?;
let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
bail_bug!("expected ReadState::HostReady");
};
guest_offset.inc(old_remaining - buffer.remaining_capacity())?;
}
Ok(())
}
pub fn remaining(&self, mut store: impl AsContextMut) -> usize
where
T: 'static,
{
self.remaining_(store.as_context_mut().0).unwrap()
}
fn remaining_(&self, store: &mut StoreOpaque) -> Result<usize>
where
T: 'static,
{
let transmit = store.concurrent_state_mut().get_mut(self.id)?;
if let &WriteState::GuestReady { count, .. } = &transmit.write {
let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
bail_bug!("expected ReadState::HostReady")
};
Ok(count.as_usize() - guest_offset.as_usize())
} else if let Some(host_buffer) = &self.host_buffer {
Ok(host_buffer.remaining().len())
} else {
bail_bug!("expected either WriteState::GuestReady or host buffer")
}
}
}
impl<'a> Source<'a, u8> {
pub fn as_direct<D>(self, store: StoreContextMut<'a, D>) -> DirectSource<'a, D> {
DirectSource {
id: self.id,
host_buffer: self.host_buffer,
store,
}
}
}
pub struct DirectSource<'a, D: 'static> {
id: TableId<TransmitState>,
host_buffer: Option<&'a mut dyn WriteBuffer<u8>>,
store: StoreContextMut<'a, D>,
}
#[cfg(feature = "std")]
impl<D: 'static> std::io::Read for DirectSource<'_, D> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let rem = self.remaining();
let n = rem.len().min(buf.len());
buf[..n].copy_from_slice(&rem[..n]);
self.mark_read(n);
Ok(n)
}
}
impl<D: 'static> DirectSource<'_, D> {
pub fn remaining(&mut self) -> &[u8] {
self.remaining_().unwrap()
}
fn remaining_(&mut self) -> Result<&[u8]> {
if let Some(buffer) = self.host_buffer.as_deref_mut() {
return Ok(buffer.remaining());
}
let transmit = self
.store
.as_context_mut()
.0
.concurrent_state_mut()
.get_mut(self.id)?;
let &WriteState::GuestReady {
address,
count,
options,
instance,
..
} = &transmit.write
else {
bail_bug!("expected WriteState::GuestReady")
};
let &ReadState::HostReady { guest_offset, .. } = &transmit.read else {
bail_bug!("expected ReadState::HostReady")
};
let memory = instance
.options_memory(self.store.0, options)
.get((address + guest_offset.as_usize())..)
.and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize())));
match memory {
Some(memory) => Ok(memory),
None => bail_bug!("guest buffer unexpectedly out of bounds"),
}
}
pub fn mark_read(&mut self, count: usize) {
self.mark_read_(count).unwrap()
}
fn mark_read_(&mut self, count: usize) -> Result<()> {
if let Some(buffer) = self.host_buffer.as_deref_mut() {
buffer.skip(count);
return Ok(());
}
let transmit = self
.store
.as_context_mut()
.0
.concurrent_state_mut()
.get_mut(self.id)?;
let WriteState::GuestReady {
count: write_count, ..
} = &transmit.write
else {
bail_bug!("expected WriteState::GuestReady");
};
let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
bail_bug!("expected ReadState::HostReady");
};
if guest_offset.as_usize() + count > write_count.as_usize() {
panic!("read count ({count}) must be less than or equal to write count ({write_count})")
} else {
guest_offset.inc(count)?;
}
Ok(())
}
}
pub trait StreamConsumer<D>: Send + 'static {
type Item;
fn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
source: Source<'_, Self::Item>,
finish: bool,
) -> Poll<Result<StreamResult>>;
}
pub trait FutureProducer<D>: Send + 'static {
type Item;
fn poll_produce(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
finish: bool,
) -> Poll<Result<Option<Self::Item>>>;
}
impl<T, E, D, Fut> FutureProducer<D> for Fut
where
E: Into<Error>,
Fut: Future<Output = Result<T, E>> + ?Sized + Send + 'static,
{
type Item = T;
fn poll_produce<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
_: StoreContextMut<'a, D>,
finish: bool,
) -> Poll<Result<Option<T>>> {
match self.poll(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(Some(v))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending if finish => Poll::Ready(Ok(None)),
Poll::Pending => Poll::Pending,
}
}
}
pub trait FutureConsumer<D>: Send + 'static {
type Item;
fn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
source: Source<'_, Self::Item>,
finish: bool,
) -> Poll<Result<()>>;
}
pub struct FutureReader<T> {
id: TableId<TransmitHandle>,
_phantom: PhantomData<T>,
}
impl<T> FutureReader<T> {
pub fn new<S: AsContextMut>(
mut store: S,
producer: impl FutureProducer<S::Data, Item = T>,
) -> Result<Self>
where
T: func::Lower + func::Lift + Send + Sync + 'static,
{
ensure!(
store.as_context().0.concurrency_support(),
"concurrency support is not enabled"
);
struct Producer<P>(P);
impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
for Producer<P>
{
type Item = P::Item;
type Buffer = Option<P::Item>;
fn poll_produce<'a>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
mut destination: Destination<'a, Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<Result<StreamResult>> {
let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
Poll::Ready(Ok(
if let Some(value) = ready!(producer.poll_produce(cx, store, finish))? {
destination.set_buffer(Some(value));
StreamResult::Completed
} else {
StreamResult::Cancelled
},
))
}
}
Ok(Self::new_(
store
.as_context_mut()
.new_transmit(TransmitKind::Future, Producer(producer))?,
))
}
pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
Self {
id,
_phantom: PhantomData,
}
}
pub(super) fn id(&self) -> TableId<TransmitHandle> {
self.id
}
pub fn pipe<S: AsContextMut>(
self,
mut store: S,
consumer: impl FutureConsumer<S::Data, Item = T> + Unpin,
) -> Result<()>
where
T: func::Lift + 'static,
{
struct Consumer<C>(C);
impl<D: 'static, T: func::Lift + 'static, C: FutureConsumer<D, Item = T>> StreamConsumer<D>
for Consumer<C>
{
type Item = T;
fn poll_consume(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut store: StoreContextMut<D>,
mut source: Source<Self::Item>,
finish: bool,
) -> Poll<Result<StreamResult>> {
let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
ready!(consumer.poll_consume(
cx,
store.as_context_mut(),
source.reborrow(),
finish
))?;
Poll::Ready(Ok(if source.remaining(store) == 0 {
StreamResult::Completed
} else {
StreamResult::Cancelled
}))
}
}
store
.as_context_mut()
.set_consumer(self.id, TransmitKind::Future, Consumer(consumer))
}
fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
let id = lift_index_to_future(cx, ty, index)?;
Ok(Self::new_(id))
}
pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
future_close(store.as_context_mut().0, &mut self.id)
}
pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
accessor.as_accessor().with(|access| self.close(access))
}
pub fn guard<A>(self, accessor: A) -> GuardedFutureReader<T, A>
where
A: AsAccessor,
{
GuardedFutureReader::new(accessor, self)
}
pub fn try_into_future_any(self, store: impl AsContextMut) -> Result<FutureAny>
where
T: ComponentType + 'static,
{
FutureAny::try_from_future_reader(store, self)
}
pub fn try_from_future_any(future: FutureAny) -> Result<Self>
where
T: ComponentType + 'static,
{
future.try_into_future_reader()
}
}
impl<T> fmt::Debug for FutureReader<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FutureReader")
.field("id", &self.id)
.finish()
}
}
pub(super) fn future_close(
store: &mut StoreOpaque,
id: &mut TableId<TransmitHandle>,
) -> Result<()> {
let id = mem::replace(id, TableId::new(u32::MAX));
store.host_drop_reader(id, TransmitKind::Future)
}
pub(super) fn lift_index_to_future(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
index: u32,
) -> Result<TableId<TransmitHandle>> {
match ty {
InterfaceType::Future(src) => {
let (state, instance) = cx.concurrent_state_and_instance_mut();
lift_index_to_transmit(instance, state, TransmitIndex::Future(src), index)
}
_ => func::bad_type_info(),
}
}
pub(super) fn lower_future_to_index<U>(
id: TableId<TransmitHandle>,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
) -> Result<u32> {
match ty {
InterfaceType::Future(dst) => {
cx.instance_handle()
.lower_transmit_to_index(cx.store.0, TransmitIndex::Future(dst), id)
}
_ => func::bad_type_info(),
}
}
unsafe impl<T: ComponentType> ComponentType for FutureReader<T> {
const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
type Lower = <u32 as func::ComponentType>::Lower;
fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
match ty {
InterfaceType::Future(ty) => {
let ty = types.types[*ty].ty;
types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
}
other => bail!("expected `future`, found `{}`", func::desc(other)),
}
}
}
unsafe impl<T: ComponentType> func::Lower for FutureReader<T> {
fn linear_lower_to_flat<U>(
&self,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
dst: &mut MaybeUninit<Self::Lower>,
) -> Result<()> {
lower_future_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
}
fn linear_lower_to_memory<U>(
&self,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
offset: usize,
) -> Result<()> {
lower_future_to_index(self.id, cx, ty)?.linear_lower_to_memory(
cx,
InterfaceType::U32,
offset,
)
}
}
unsafe impl<T: ComponentType> func::Lift for FutureReader<T> {
fn linear_lift_from_flat(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
src: &Self::Lower,
) -> Result<Self> {
let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
Self::lift_from_index(cx, ty, index)
}
fn linear_lift_from_memory(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
bytes: &[u8],
) -> Result<Self> {
let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
Self::lift_from_index(cx, ty, index)
}
}
pub struct GuardedFutureReader<T, A>
where
A: AsAccessor,
{
reader: Option<FutureReader<T>>,
accessor: A,
}
impl<T, A> GuardedFutureReader<T, A>
where
A: AsAccessor,
{
pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
assert!(
accessor
.as_accessor()
.with(|a| a.as_context().0.concurrency_support())
);
Self {
reader: Some(reader),
accessor,
}
}
pub fn into_future(self) -> FutureReader<T> {
self.into()
}
}
impl<T, A> From<GuardedFutureReader<T, A>> for FutureReader<T>
where
A: AsAccessor,
{
fn from(mut guard: GuardedFutureReader<T, A>) -> Self {
guard.reader.take().unwrap()
}
}
impl<T, A> Drop for GuardedFutureReader<T, A>
where
A: AsAccessor,
{
fn drop(&mut self) {
if let Some(reader) = &mut self.reader {
let result = reader.close_with(&self.accessor);
debug_assert!(result.is_ok());
}
}
}
pub struct StreamReader<T> {
id: TableId<TransmitHandle>,
_phantom: PhantomData<T>,
}
impl<T> StreamReader<T> {
pub fn new<S: AsContextMut>(
mut store: S,
producer: impl StreamProducer<S::Data, Item = T>,
) -> Result<Self>
where
T: func::Lower + func::Lift + Send + Sync + 'static,
{
ensure!(
store.as_context().0.concurrency_support(),
"concurrency support is not enabled",
);
Ok(Self::new_(
store
.as_context_mut()
.new_transmit(TransmitKind::Stream, producer)?,
))
}
pub(super) fn new_(id: TableId<TransmitHandle>) -> Self {
Self {
id,
_phantom: PhantomData,
}
}
pub(super) fn id(&self) -> TableId<TransmitHandle> {
self.id
}
pub fn try_into<V: 'static>(mut self, mut store: impl AsContextMut) -> Result<V, Self> {
let store = store.as_context_mut();
let state = store.0.concurrent_state_mut();
let id = state.get_mut(self.id).unwrap().state;
if let WriteState::HostReady { try_into, .. } = &state.get_mut(id).unwrap().write {
match try_into(TypeId::of::<V>()) {
Some(result) => {
self.close(store).unwrap();
Ok(*result.downcast::<V>().unwrap())
}
None => Err(self),
}
} else {
Err(self)
}
}
pub fn pipe<S: AsContextMut>(
self,
mut store: S,
consumer: impl StreamConsumer<S::Data, Item = T>,
) -> Result<()>
where
T: 'static,
{
store
.as_context_mut()
.set_consumer(self.id, TransmitKind::Stream, consumer)
}
fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
let id = lift_index_to_stream(cx, ty, index)?;
Ok(Self::new_(id))
}
pub fn close(&mut self, mut store: impl AsContextMut) -> Result<()> {
stream_close(store.as_context_mut().0, &mut self.id)
}
pub fn close_with(&mut self, accessor: impl AsAccessor) -> Result<()> {
accessor.as_accessor().with(|access| self.close(access))
}
pub fn guard<A>(self, accessor: A) -> GuardedStreamReader<T, A>
where
A: AsAccessor,
{
GuardedStreamReader::new(accessor, self)
}
pub fn try_into_stream_any(self, store: impl AsContextMut) -> Result<StreamAny>
where
T: ComponentType + 'static,
{
StreamAny::try_from_stream_reader(store, self)
}
pub fn try_from_stream_any(stream: StreamAny) -> Result<Self>
where
T: ComponentType + 'static,
{
stream.try_into_stream_reader()
}
}
impl<T> fmt::Debug for StreamReader<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamReader")
.field("id", &self.id)
.finish()
}
}
pub(super) fn stream_close(
store: &mut StoreOpaque,
id: &mut TableId<TransmitHandle>,
) -> Result<()> {
let id = mem::replace(id, TableId::new(u32::MAX));
store.host_drop_reader(id, TransmitKind::Stream)
}
pub(super) fn lift_index_to_stream(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
index: u32,
) -> Result<TableId<TransmitHandle>> {
match ty {
InterfaceType::Stream(src) => {
let (state, instance) = cx.concurrent_state_and_instance_mut();
lift_index_to_transmit(instance, state, TransmitIndex::Stream(src), index)
}
_ => func::bad_type_info(),
}
}
pub(super) fn lower_stream_to_index<U>(
id: TableId<TransmitHandle>,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
) -> Result<u32> {
match ty {
InterfaceType::Stream(dst) => {
cx.instance_handle()
.lower_transmit_to_index(cx.store.0, TransmitIndex::Stream(dst), id)
}
_ => func::bad_type_info(),
}
}
unsafe impl<T: ComponentType> ComponentType for StreamReader<T> {
const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
type Lower = <u32 as func::ComponentType>::Lower;
fn typecheck(ty: &InterfaceType, types: &InstanceType<'_>) -> Result<()> {
match ty {
InterfaceType::Stream(ty) => {
let ty = types.types[*ty].ty;
types::typecheck_payload::<T>(types.types[ty].payload.as_ref(), types)
}
other => bail!("expected `stream`, found `{}`", func::desc(other)),
}
}
}
unsafe impl<T: ComponentType> func::Lower for StreamReader<T> {
fn linear_lower_to_flat<U>(
&self,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
dst: &mut MaybeUninit<Self::Lower>,
) -> Result<()> {
lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_flat(cx, InterfaceType::U32, dst)
}
fn linear_lower_to_memory<U>(
&self,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
offset: usize,
) -> Result<()> {
lower_stream_to_index(self.id, cx, ty)?.linear_lower_to_memory(
cx,
InterfaceType::U32,
offset,
)
}
}
unsafe impl<T: ComponentType> func::Lift for StreamReader<T> {
fn linear_lift_from_flat(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
src: &Self::Lower,
) -> Result<Self> {
let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
Self::lift_from_index(cx, ty, index)
}
fn linear_lift_from_memory(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
bytes: &[u8],
) -> Result<Self> {
let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
Self::lift_from_index(cx, ty, index)
}
}
pub struct GuardedStreamReader<T, A>
where
A: AsAccessor,
{
reader: Option<StreamReader<T>>,
accessor: A,
}
impl<T, A> GuardedStreamReader<T, A>
where
A: AsAccessor,
{
pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
assert!(
accessor
.as_accessor()
.with(|a| a.as_context().0.concurrency_support())
);
Self {
reader: Some(reader),
accessor,
}
}
pub fn into_stream(self) -> StreamReader<T> {
self.into()
}
}
impl<T, A> From<GuardedStreamReader<T, A>> for StreamReader<T>
where
A: AsAccessor,
{
fn from(mut guard: GuardedStreamReader<T, A>) -> Self {
guard.reader.take().unwrap()
}
}
impl<T, A> Drop for GuardedStreamReader<T, A>
where
A: AsAccessor,
{
fn drop(&mut self) {
if let Some(reader) = &mut self.reader {
let result = reader.close_with(&self.accessor);
debug_assert!(result.is_ok());
}
}
}
pub struct ErrorContext {
rep: u32,
}
impl ErrorContext {
pub(crate) fn new(rep: u32) -> Self {
Self { rep }
}
pub fn into_val(self) -> Val {
Val::ErrorContext(ErrorContextAny(self.rep))
}
pub fn from_val(_: impl AsContextMut, value: &Val) -> Result<Self> {
let Val::ErrorContext(ErrorContextAny(rep)) = value else {
bail!("expected `error-context`; got `{}`", value.desc());
};
Ok(Self::new(*rep))
}
fn lift_from_index(cx: &mut LiftContext<'_>, ty: InterfaceType, index: u32) -> Result<Self> {
match ty {
InterfaceType::ErrorContext(src) => {
let rep = cx
.instance_mut()
.table_for_error_context(src)
.error_context_rep(index)?;
Ok(Self { rep })
}
_ => func::bad_type_info(),
}
}
}
pub(crate) fn lower_error_context_to_index<U>(
rep: u32,
cx: &mut LowerContext<'_, U>,
ty: InterfaceType,
) -> Result<u32> {
match ty {
InterfaceType::ErrorContext(dst) => {
let tbl = cx.instance_mut().table_for_error_context(dst);
tbl.error_context_insert(rep)
}
_ => func::bad_type_info(),
}
}
unsafe impl func::ComponentType for ErrorContext {
const ABI: CanonicalAbiInfo = CanonicalAbiInfo::SCALAR4;
type Lower = <u32 as func::ComponentType>::Lower;
fn typecheck(ty: &InterfaceType, _types: &InstanceType<'_>) -> Result<()> {
match ty {
InterfaceType::ErrorContext(_) => Ok(()),
other => bail!("expected `error`, found `{}`", func::desc(other)),
}
}
}
unsafe impl func::Lower for ErrorContext {
fn linear_lower_to_flat<T>(
&self,
cx: &mut LowerContext<'_, T>,
ty: InterfaceType,
dst: &mut MaybeUninit<Self::Lower>,
) -> Result<()> {
lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_flat(
cx,
InterfaceType::U32,
dst,
)
}
fn linear_lower_to_memory<T>(
&self,
cx: &mut LowerContext<'_, T>,
ty: InterfaceType,
offset: usize,
) -> Result<()> {
lower_error_context_to_index(self.rep, cx, ty)?.linear_lower_to_memory(
cx,
InterfaceType::U32,
offset,
)
}
}
unsafe impl func::Lift for ErrorContext {
fn linear_lift_from_flat(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
src: &Self::Lower,
) -> Result<Self> {
let index = u32::linear_lift_from_flat(cx, InterfaceType::U32, src)?;
Self::lift_from_index(cx, ty, index)
}
fn linear_lift_from_memory(
cx: &mut LiftContext<'_>,
ty: InterfaceType,
bytes: &[u8],
) -> Result<Self> {
let index = u32::linear_lift_from_memory(cx, InterfaceType::U32, bytes)?;
Self::lift_from_index(cx, ty, index)
}
}
pub(super) struct TransmitHandle {
pub(super) common: WaitableCommon,
state: TableId<TransmitState>,
}
impl TransmitHandle {
fn new(state: TableId<TransmitState>) -> Self {
Self {
common: WaitableCommon::default(),
state,
}
}
}
impl TableDebug for TransmitHandle {
fn type_name() -> &'static str {
"TransmitHandle"
}
}
struct TransmitState {
write_handle: TableId<TransmitHandle>,
read_handle: TableId<TransmitHandle>,
write: WriteState,
read: ReadState,
done: bool,
pub(super) origin: TransmitOrigin,
}
#[derive(Copy, Clone)]
pub(super) enum TransmitOrigin {
Host,
GuestFuture(ComponentInstanceId, TypeFutureTableIndex),
GuestStream(ComponentInstanceId, TypeStreamTableIndex),
}
impl TransmitState {
fn new(origin: TransmitOrigin) -> Self {
Self {
write_handle: TableId::new(u32::MAX),
read_handle: TableId::new(u32::MAX),
read: ReadState::Open,
write: WriteState::Open,
done: false,
origin,
}
}
}
impl TableDebug for TransmitState {
fn type_name() -> &'static str {
"TransmitState"
}
}
impl TransmitOrigin {
fn guest(id: ComponentInstanceId, index: TransmitIndex) -> Self {
match index {
TransmitIndex::Future(ty) => TransmitOrigin::GuestFuture(id, ty),
TransmitIndex::Stream(ty) => TransmitOrigin::GuestStream(id, ty),
}
}
}
type PollStream = Box<
dyn Fn() -> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>> + Send + Sync,
>;
type TryInto = Box<dyn Fn(TypeId) -> Option<Box<dyn Any>> + Send + Sync>;
enum WriteState {
Open,
GuestReady {
instance: Instance,
caller: RuntimeComponentInstanceIndex,
ty: TransmitIndex,
flat_abi: Option<FlatAbi>,
options: OptionsIndex,
address: usize,
count: ItemCount,
handle: u32,
},
HostReady {
produce: PollStream,
try_into: TryInto,
guest_offset: ItemCount,
cancel: bool,
cancel_waker: Option<Waker>,
},
Dropped,
}
impl fmt::Debug for WriteState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Open => f.debug_tuple("Open").finish(),
Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
Self::Dropped => f.debug_tuple("Dropped").finish(),
}
}
}
enum ReadState {
Open,
GuestReady {
ty: TransmitIndex,
caller_instance: RuntimeComponentInstanceIndex,
caller_thread: QualifiedThreadId,
flat_abi: Option<FlatAbi>,
instance: Instance,
options: OptionsIndex,
address: usize,
count: ItemCount,
handle: u32,
},
HostReady {
consume: PollStream,
guest_offset: ItemCount,
cancel: bool,
cancel_waker: Option<Waker>,
},
HostToHost {
accept: Box<
dyn for<'a> Fn(
&'a mut UntypedWriteBuffer<'a>,
)
-> Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'a>>
+ Send
+ Sync,
>,
buffer: Vec<u8>,
limit: usize,
},
Dropped,
}
impl fmt::Debug for ReadState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Open => f.debug_tuple("Open").finish(),
Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(),
Self::HostReady { .. } => f.debug_tuple("HostReady").finish(),
Self::HostToHost { .. } => f.debug_tuple("HostToHost").finish(),
Self::Dropped => f.debug_tuple("Dropped").finish(),
}
}
}
fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result<ReturnCode> {
Ok(match state {
StreamResult::Dropped => ReturnCode::Dropped(count),
StreamResult::Completed => ReturnCode::completed(kind, count),
StreamResult::Cancelled => ReturnCode::Cancelled(count),
})
}
impl StoreOpaque {
fn pipe_from_guest(
&mut self,
kind: TransmitKind,
id: TableId<TransmitState>,
future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
) {
let future = async move {
let stream_state = future.await?;
tls::get(|store| {
let state = store.concurrent_state_mut();
let transmit = state.get_mut(id)?;
let ReadState::HostReady {
consume,
guest_offset,
..
} = mem::replace(&mut transmit.read, ReadState::Open)
else {
bail_bug!("expected ReadState::HostReady")
};
let code = return_code(kind, stream_state, guest_offset)?;
transmit.read = match stream_state {
StreamResult::Dropped => ReadState::Dropped,
StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady {
consume,
guest_offset: ItemCount::ZERO,
cancel: false,
cancel_waker: None,
},
};
let WriteState::GuestReady { ty, handle, .. } =
mem::replace(&mut transmit.write, WriteState::Open)
else {
bail_bug!("expected WriteState::GuestReady")
};
state.send_write_result(ty, id, handle, code)?;
Ok(())
})
};
self.concurrent_state_mut().push_future(future.boxed());
}
fn pipe_to_guest(
&mut self,
kind: TransmitKind,
id: TableId<TransmitState>,
future: Pin<Box<dyn Future<Output = Result<StreamResult>> + Send + 'static>>,
) {
let future = async move {
let stream_state = future.await?;
tls::get(|store| {
let state = store.concurrent_state_mut();
let transmit = state.get_mut(id)?;
let WriteState::HostReady {
produce,
try_into,
guest_offset,
..
} = mem::replace(&mut transmit.write, WriteState::Open)
else {
bail_bug!("expected WriteState::HostReady")
};
let code = return_code(kind, stream_state, guest_offset)?;
transmit.write = match stream_state {
StreamResult::Dropped => WriteState::Dropped,
StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady {
produce,
try_into,
guest_offset: ItemCount::ZERO,
cancel: false,
cancel_waker: None,
},
};
let ReadState::GuestReady { ty, handle, .. } =
mem::replace(&mut transmit.read, ReadState::Open)
else {
bail_bug!("expected ReadState::GuestReady")
};
state.send_read_result(ty, id, handle, code)?;
Ok(())
})
};
self.concurrent_state_mut().push_future(future.boxed());
}
fn host_drop_reader(&mut self, id: TableId<TransmitHandle>, kind: TransmitKind) -> Result<()> {
let state = self.concurrent_state_mut();
Waitable::Transmit(id).join(state, None)?;
let transmit_id = state.get_mut(id)?.state;
let transmit = state
.get_mut(transmit_id)
.with_context(|| format!("error closing reader {transmit_id:?}"))?;
log::trace!(
"host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}",
transmit.read,
transmit.write
);
transmit.read = ReadState::Dropped;
let new_state = if let WriteState::Dropped = &transmit.write {
WriteState::Dropped
} else {
WriteState::Open
};
let write_handle = transmit.write_handle;
match mem::replace(&mut transmit.write, new_state) {
WriteState::GuestReady { ty, handle, .. } => {
state.update_event(
write_handle.rep(),
match ty {
TransmitIndex::Future(ty) => Event::FutureWrite {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: Some((ty, handle)),
},
TransmitIndex::Stream(ty) => Event::StreamWrite {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: Some((ty, handle)),
},
},
)?;
}
WriteState::Open => {
state.update_event(
write_handle.rep(),
match kind {
TransmitKind::Future => Event::FutureWrite {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: None,
},
TransmitKind::Stream => Event::StreamWrite {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: None,
},
},
)?;
}
WriteState::Dropped | WriteState::HostReady { .. } => {
log::trace!("host_drop_reader delete {transmit_id:?}");
state.delete_transmit(transmit_id)?;
}
}
Ok(())
}
fn host_drop_writer(
&mut self,
id: TableId<TransmitHandle>,
on_drop_open: Option<fn() -> Result<()>>,
) -> Result<()> {
let state = self.concurrent_state_mut();
Waitable::Transmit(id).join(state, None)?;
let transmit_id = state.get_mut(id)?.state;
let transmit = state
.get_mut(transmit_id)
.with_context(|| format!("error closing writer {transmit_id:?}"))?;
log::trace!(
"host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}",
transmit.read,
transmit.write
);
match &mut transmit.write {
WriteState::GuestReady { .. } => {
bail_bug!("can't call `host_drop_writer` on a guest-owned writer");
}
WriteState::HostReady { .. } => {}
v @ WriteState::Open => {
if let (Some(on_drop_open), false) = (
on_drop_open,
transmit.done || matches!(transmit.read, ReadState::Dropped),
) {
on_drop_open()?;
} else {
*v = WriteState::Dropped;
}
}
WriteState::Dropped => bail_bug!("write state is already dropped"),
}
let transmit = self.concurrent_state_mut().get_mut(transmit_id)?;
let new_state = if let ReadState::Dropped = &transmit.read {
ReadState::Dropped
} else {
ReadState::Open
};
let read_handle = transmit.read_handle;
match mem::replace(&mut transmit.read, new_state) {
ReadState::GuestReady { ty, handle, .. } => {
self.concurrent_state_mut().update_event(
read_handle.rep(),
match ty {
TransmitIndex::Future(ty) => Event::FutureRead {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: Some((ty, handle)),
},
TransmitIndex::Stream(ty) => Event::StreamRead {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: Some((ty, handle)),
},
},
)?;
}
ReadState::Open => {
self.concurrent_state_mut().update_event(
read_handle.rep(),
match on_drop_open {
Some(_) => Event::FutureRead {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: None,
},
None => Event::StreamRead {
code: ReturnCode::Dropped(ItemCount::ZERO),
pending: None,
},
},
)?;
}
ReadState::Dropped | ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
log::trace!("host_drop_writer delete {transmit_id:?}");
self.concurrent_state_mut().delete_transmit(transmit_id)?;
}
}
Ok(())
}
pub(super) fn transmit_origin(
&mut self,
id: TableId<TransmitHandle>,
) -> Result<TransmitOrigin> {
let state = self.concurrent_state_mut();
let state_id = state.get_mut(id)?.state;
Ok(state.get_mut(state_id)?.origin)
}
}
impl<T> StoreContextMut<'_, T> {
fn new_transmit<P: StreamProducer<T>>(
mut self,
kind: TransmitKind,
producer: P,
) -> Result<TableId<TransmitHandle>>
where
P::Item: func::Lower,
{
let token = StoreToken::new(self.as_context_mut());
let state = self.0.concurrent_state_mut();
let (_, read) = state.new_transmit(TransmitOrigin::Host)?;
let producer = Arc::new(LockedState::new((Box::pin(producer), P::Buffer::default())));
let id = state.get_mut(read)?.state;
let mut dropped = false;
let produce = Box::new({
let producer = producer.clone();
move || {
let producer = producer.clone();
async move {
let mut state = producer.take()?;
let (mine, buffer) = &mut *state;
let (result, cancelled) = if buffer.remaining().is_empty() {
future::poll_fn(|cx| {
tls::get(|store| {
let transmit = store.concurrent_state_mut().get_mut(id)?;
let &WriteState::HostReady { cancel, .. } = &transmit.write else {
bail_bug!("expected WriteState::HostReady")
};
let mut host_written = 0;
let mut host_buffer =
if let ReadState::HostToHost { buffer, .. } = &mut transmit.read {
Some(mem::take(buffer))
} else {
None
};
let poll = mine.as_mut().poll_produce(
cx,
token.as_context_mut(store),
Destination {
id,
buffer,
host_buffer: host_buffer.as_mut().map(|b| {
HostBuffer {
dst: b,
marked_written: &mut host_written,
}
}),
_phantom: PhantomData,
},
cancel,
);
let transmit = store.concurrent_state_mut().get_mut(id)?;
let host_offset = if let (
Some(host_buffer),
ReadState::HostToHost { buffer, limit, .. },
) = (host_buffer, &mut transmit.read)
{
*limit = host_written;
*buffer = host_buffer;
*limit
} else {
0
};
{
let WriteState::HostReady {
guest_offset,
cancel,
cancel_waker,
..
} = &mut transmit.write
else {
bail_bug!("expected WriteState::HostReady")
};
if poll.is_pending() {
if !buffer.remaining().is_empty()
|| *guest_offset > 0
|| host_offset > 0
{
bail!(
"StreamProducer::poll_produce returned Poll::Pending \
after producing at least one item"
)
}
*cancel_waker = Some(cx.waker().clone());
} else {
*cancel_waker = None;
*cancel = false;
}
}
Ok(poll.map(|v| v.map(|result| (result, cancel))))
})?
})
.await?
} else {
(StreamResult::Completed, false)
};
let (guest_offset, host_offset, count) = tls::get(|store| {
let transmit = store.concurrent_state_mut().get_mut(id)?;
let (count, host_offset) = match &transmit.read {
&ReadState::GuestReady { count, .. } => (count.as_u32(), 0),
&ReadState::HostToHost { limit, .. } => (1, limit),
_ => bail_bug!("invalid read state"),
};
let guest_offset = match &transmit.write {
&WriteState::HostReady { guest_offset, .. } => guest_offset,
_ => bail_bug!("invalid write state"),
};
Ok((guest_offset, host_offset, count))
})?;
match result {
StreamResult::Completed => {
if count > 1
&& buffer.remaining().is_empty()
&& guest_offset == 0
&& host_offset == 0
{
bail!(
"StreamProducer::poll_produce returned StreamResult::Completed \
without producing any items"
);
}
}
StreamResult::Cancelled => {
if !cancelled {
bail!(
"StreamProducer::poll_produce returned StreamResult::Cancelled \
without being given a `finish` parameter value of true"
);
}
}
StreamResult::Dropped => {
dropped = true;
}
}
let write_buffer = !buffer.remaining().is_empty() || host_offset > 0;
drop(state);
if write_buffer {
write(token, id, producer.clone(), kind).await?;
}
Ok(if dropped {
if producer.with(|p| p.1.remaining().is_empty())? {
StreamResult::Dropped
} else {
StreamResult::Completed
}
} else {
result
})
}
.boxed()
}
});
let try_into = Box::new(move |ty| {
let (mine, buffer) = producer.try_lock().ok()?.take()?;
match P::try_into(mine, ty) {
Ok(value) => Some(value),
Err(mine) => {
*producer.try_lock().ok()? = Some((mine, buffer));
None
}
}
});
state.get_mut(id)?.write = WriteState::HostReady {
produce,
try_into,
guest_offset: ItemCount::ZERO,
cancel: false,
cancel_waker: None,
};
Ok(read)
}
fn set_consumer<C: StreamConsumer<T>>(
mut self,
id: TableId<TransmitHandle>,
kind: TransmitKind,
consumer: C,
) -> Result<()> {
let token = StoreToken::new(self.as_context_mut());
let state = self.0.concurrent_state_mut();
let id = state.get_mut(id)?.state;
let transmit = state.get_mut(id)?;
let consumer = Arc::new(LockedState::new(Box::pin(consumer)));
let consume_with_buffer = {
let consumer = consumer.clone();
async move |mut host_buffer: Option<&mut dyn WriteBuffer<C::Item>>| {
let mut mine = consumer.take()?;
let host_buffer_remaining_before =
host_buffer.as_deref_mut().map(|v| v.remaining().len());
let (result, cancelled) = future::poll_fn(|cx| {
tls::get(|store| {
let cancel = match &store.concurrent_state_mut().get_mut(id)?.read {
&ReadState::HostReady { cancel, .. } => cancel,
ReadState::Open => false,
_ => bail_bug!("unexpected read state"),
};
let poll = mine.as_mut().poll_consume(
cx,
token.as_context_mut(store),
Source {
id,
host_buffer: host_buffer.as_deref_mut(),
},
cancel,
);
if let ReadState::HostReady {
cancel_waker,
cancel,
..
} = &mut store.concurrent_state_mut().get_mut(id)?.read
{
if poll.is_pending() {
*cancel_waker = Some(cx.waker().clone());
} else {
*cancel_waker = None;
*cancel = false;
}
}
Ok(poll.map(|v| v.map(|result| (result, cancel))))
})?
})
.await?;
let (guest_offset, count) = tls::get(|store| {
let transmit = store.concurrent_state_mut().get_mut(id)?;
Ok((
match &transmit.read {
&ReadState::HostReady { guest_offset, .. } => guest_offset,
ReadState::Open => ItemCount::ZERO,
_ => bail_bug!("invalid read state"),
},
match &transmit.write {
WriteState::GuestReady { count, .. } => count.as_usize(),
WriteState::HostReady { .. } => match host_buffer_remaining_before {
Some(n) => n,
None => bail_bug!("host_buffer_remaining_before should be set"),
},
_ => bail_bug!("invalid write state"),
},
))
})?;
match result {
StreamResult::Completed => {
if count > 0
&& guest_offset == 0
&& host_buffer_remaining_before
.zip(host_buffer.map(|v| v.remaining().len()))
.map(|(before, after)| before == after)
.unwrap_or(false)
{
bail!(
"StreamConsumer::poll_consume returned StreamResult::Completed \
without consuming any items"
);
}
if let TransmitKind::Future = kind {
tls::get(|store| {
store.concurrent_state_mut().get_mut(id)?.done = true;
crate::error::Ok(())
})?;
}
}
StreamResult::Cancelled => {
if !cancelled {
bail!(
"StreamConsumer::poll_consume returned StreamResult::Cancelled \
without being given a `finish` parameter value of true"
);
}
}
StreamResult::Dropped => {}
}
Ok(result)
}
};
let consume = {
let consume = consume_with_buffer.clone();
Box::new(move || {
let consume = consume.clone();
async move { consume(None).await }.boxed()
})
};
match &transmit.write {
WriteState::Open => {
transmit.read = ReadState::HostReady {
consume,
guest_offset: ItemCount::ZERO,
cancel: false,
cancel_waker: None,
};
}
&WriteState::GuestReady { .. } => {
let future = consume();
transmit.read = ReadState::HostReady {
consume,
guest_offset: ItemCount::ZERO,
cancel: false,
cancel_waker: None,
};
self.0.pipe_from_guest(kind, id, future);
}
WriteState::HostReady { .. } => {
let WriteState::HostReady { produce, .. } = mem::replace(
&mut transmit.write,
WriteState::HostReady {
produce: Box::new(|| {
Box::pin(async { bail_bug!("unexpected invocation of `produce`") })
}),
try_into: Box::new(|_| None),
guest_offset: ItemCount::ZERO,
cancel: false,
cancel_waker: None,
},
) else {
bail_bug!("expected WriteState::HostReady")
};
transmit.read = ReadState::HostToHost {
accept: Box::new(move |input| {
let consume = consume_with_buffer.clone();
async move { consume(Some(input.get_mut::<C::Item>())).await }.boxed()
}),
buffer: Vec::new(),
limit: 0,
};
let future = async move {
loop {
if tls::get(|store| {
crate::error::Ok(matches!(
store.concurrent_state_mut().get_mut(id)?.read,
ReadState::Dropped
))
})? {
break Ok(());
}
match produce().await? {
StreamResult::Completed | StreamResult::Cancelled => {}
StreamResult::Dropped => break Ok(()),
}
if let TransmitKind::Future = kind {
break Ok(());
}
}
}
.map(move |result| {
tls::get(|store| store.concurrent_state_mut().delete_transmit(id))?;
result
});
state.push_future(Box::pin(future));
}
WriteState::Dropped => {
let reader = transmit.read_handle;
self.0.host_drop_reader(reader, kind)?;
}
}
Ok(())
}
}
async fn write<D: 'static, P: Send + 'static, T: func::Lower + 'static, B: WriteBuffer<T>>(
token: StoreToken<D>,
id: TableId<TransmitState>,
pair: Arc<LockedState<(P, B)>>,
kind: TransmitKind,
) -> Result<()> {
let (read, guest_offset) = tls::get(|store| {
let transmit = store.concurrent_state_mut().get_mut(id)?;
let guest_offset = if let &WriteState::HostReady { guest_offset, .. } = &transmit.write {
Some(guest_offset)
} else {
None
};
crate::error::Ok((
mem::replace(&mut transmit.read, ReadState::Open),
guest_offset,
))
})?;
match read {
ReadState::GuestReady {
ty,
flat_abi,
options,
address,
count,
handle,
instance,
caller_instance,
caller_thread,
} => {
let guest_offset = match guest_offset {
Some(i) => i,
None => bail_bug!("guest_offset should be present if ready"),
};
if let TransmitKind::Future = kind {
tls::get(|store| {
store.concurrent_state_mut().get_mut(id)?.done = true;
crate::error::Ok(())
})?;
}
let old_remaining = pair.with(|p| p.1.remaining().len())?;
let accept = {
let pair = pair.clone();
move |mut store: StoreContextMut<D>| {
let mut state = pair.take()?;
lower::<T, B, D>(
store.as_context_mut(),
instance,
caller_thread,
options,
ty,
address + (T::SIZE32 * guest_offset.as_usize()),
count.as_usize() - guest_offset.as_usize(),
&mut state.1,
)?;
crate::error::Ok(())
}
};
if guest_offset < count {
if T::MAY_REQUIRE_REALLOC {
let (tx, rx) = oneshot::channel();
tls::get(move |store| {
store
.concurrent_state_mut()
.push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
move |store| {
_ = tx.send(accept(token.as_context_mut(store))?);
Ok(())
},
))))
});
match rx.await {
Ok(r) => r,
Err(oneshot::Canceled) => bail_bug!("work cancelled"),
}
} else {
tls::get(|store| accept(token.as_context_mut(store)))?
}
}
tls::get(|store| {
let count = old_remaining - pair.with(|p| p.1.remaining().len())?;
let transmit = store.concurrent_state_mut().get_mut(id)?;
let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
bail_bug!("expected WriteState::HostReady")
};
guest_offset.inc(count)?;
transmit.read = ReadState::GuestReady {
ty,
flat_abi,
options,
address,
count: ItemCount::new_usize(count)?,
handle,
instance,
caller_instance,
caller_thread,
};
crate::error::Ok(())
})?;
Ok(())
}
ReadState::HostToHost {
accept,
mut buffer,
limit,
} => {
let mut state = StreamResult::Completed;
let mut position = 0;
while !matches!(state, StreamResult::Dropped) && position < limit {
let mut slice_buffer = SliceBuffer::new(buffer, position, limit);
state = accept(&mut UntypedWriteBuffer::new(&mut slice_buffer)).await?;
(buffer, position, _) = slice_buffer.into_parts();
}
{
let mut pair = pair.take()?;
let (_, buffer) = &mut *pair;
while !(matches!(state, StreamResult::Dropped) || buffer.remaining().is_empty()) {
state = accept(&mut UntypedWriteBuffer::new(buffer)).await?;
}
}
tls::get(|store| {
store.concurrent_state_mut().get_mut(id)?.read = match state {
StreamResult::Dropped => ReadState::Dropped,
StreamResult::Completed | StreamResult::Cancelled => ReadState::HostToHost {
accept,
buffer,
limit: 0,
},
};
crate::error::Ok(())
})?;
Ok(())
}
_ => bail_bug!("unexpected read state"),
}
}
impl Instance {
fn consume(
self,
store: &mut dyn VMStore,
kind: TransmitKind,
transmit_id: TableId<TransmitState>,
consume: PollStream,
guest_offset: ItemCount,
cancel: bool,
) -> Result<ReturnCode> {
let mut future = consume();
store.concurrent_state_mut().get_mut(transmit_id)?.read = ReadState::HostReady {
consume,
guest_offset,
cancel,
cancel_waker: None,
};
let poll = tls::set(store, || {
future
.as_mut()
.poll(&mut Context::from_waker(&Waker::noop()))
});
Ok(match poll {
Poll::Ready(state) => {
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else {
bail_bug!("expected ReadState::HostReady")
};
let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
transmit.write = WriteState::Open;
code
}
Poll::Pending => {
store.pipe_from_guest(kind, transmit_id, future);
ReturnCode::Blocked
}
})
}
fn produce(
self,
store: &mut dyn VMStore,
kind: TransmitKind,
transmit_id: TableId<TransmitState>,
produce: PollStream,
try_into: TryInto,
guest_offset: ItemCount,
cancel: bool,
) -> Result<ReturnCode> {
let mut future = produce();
store.concurrent_state_mut().get_mut(transmit_id)?.write = WriteState::HostReady {
produce,
try_into,
guest_offset,
cancel,
cancel_waker: None,
};
let poll = tls::set(store, || {
future
.as_mut()
.poll(&mut Context::from_waker(&Waker::noop()))
});
Ok(match poll {
Poll::Ready(state) => {
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else {
bail_bug!("expected WriteState::HostReady")
};
let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?;
transmit.read = ReadState::Open;
code
}
Poll::Pending => {
store.pipe_to_guest(kind, transmit_id, future);
ReturnCode::Blocked
}
})
}
pub(super) fn guest_drop_writable(
self,
store: &mut StoreOpaque,
ty: TransmitIndex,
writer: u32,
) -> Result<()> {
let table = self.id().get_mut(store).table_for_transmit(ty);
let transmit_rep = match ty {
TransmitIndex::Future(ty) => table.future_remove_writable(ty, writer)?,
TransmitIndex::Stream(ty) => table.stream_remove_writable(ty, writer)?,
};
let id = TableId::<TransmitHandle>::new(transmit_rep);
log::trace!("guest_drop_writable: drop writer {id:?}");
match ty {
TransmitIndex::Stream(_) => store.host_drop_writer(id, None),
TransmitIndex::Future(_) => store.host_drop_writer(
id,
Some(|| {
Err(format_err!(
"cannot drop future write end without first writing a value"
))
}),
),
}
}
fn copy<T: 'static>(
store: StoreContextMut<T>,
flat_abi: Option<FlatAbi>,
write_instance: Instance,
write_caller_instance: RuntimeComponentInstanceIndex,
write_ty: TransmitIndex,
write_options: OptionsIndex,
write_address: usize,
read_instance: Instance,
read_caller_instance: RuntimeComponentInstanceIndex,
read_caller_thread: QualifiedThreadId,
read_ty: TransmitIndex,
read_options: OptionsIndex,
read_address: usize,
count: ItemCount,
rep: u32,
) -> Result<()> {
let (write_component, store) = write_instance.component_and_store_mut(store.0);
let (read_component, mut store) = read_instance.component_and_store_mut(store);
let write_types = write_component.types();
let read_types = read_component.types();
let count = count.as_usize();
let write_payload_ty = write_ty.payload(write_types);
let write_abi = match write_payload_ty {
Some(ty) => write_types.canonical_abi(ty),
None => &CanonicalAbiInfo::ZERO,
};
let write_length_in_bytes = match flat_abi {
Some(abi) => usize::try_from(abi.size)? * count,
None => usize::try_from(write_abi.size32)? * count,
};
if write_length_in_bytes > 0 {
if write_address % usize::try_from(write_abi.align32)? != 0 {
bail!("write pointer not aligned");
}
write_instance
.options_memory(store, write_options)
.get(write_address..)
.and_then(|b| b.get(..write_length_in_bytes))
.ok_or_else(|| crate::format_err!("write pointer out of bounds"))?;
}
let read_payload_ty = read_ty.payload(read_types);
let read_abi = match read_payload_ty {
Some(ty) => read_types.canonical_abi(ty),
None => &CanonicalAbiInfo::ZERO,
};
let read_length_in_bytes = match flat_abi {
Some(abi) => usize::try_from(abi.size)? * count,
None => usize::try_from(read_abi.size32)? * count,
};
if read_length_in_bytes > 0 {
if read_address % usize::try_from(read_abi.align32)? != 0 {
bail!("read pointer not aligned");
}
read_instance
.options_memory(store, read_options)
.get(read_address..)
.and_then(|b| b.get(..read_length_in_bytes))
.ok_or_else(|| crate::format_err!("read pointer out of bounds"))?;
}
if write_caller_instance == read_caller_instance
&& !allow_intra_component_read_write(write_payload_ty)
{
bail!(
"cannot read from and write to intra-component future/stream with non-numeric payload"
)
}
match (write_ty, read_ty) {
(TransmitIndex::Future(_), TransmitIndex::Future(_)) => {
if count != 1 {
bail_bug!("futures can only send 1 item");
}
let val = write_payload_ty
.map(|ty| {
let lift = &mut LiftContext::new(store, write_options, write_instance);
let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
Val::load(lift, *ty, bytes)
})
.transpose()?;
if let Some(val) = val {
let old_thread = store.set_thread(read_caller_thread)?;
let lower =
&mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
let ptr = func::validate_inbounds_dynamic(
read_abi,
lower.as_slice_mut(),
&ValRaw::u32(read_address.try_into()?),
)?;
let ty = match read_payload_ty {
Some(ty) => ty,
None => bail_bug!("expected read payload type to be present"),
};
val.store(lower, *ty, ptr)?;
store.set_thread(old_thread)?;
}
}
(TransmitIndex::Stream(_), TransmitIndex::Stream(_)) => {
if write_length_in_bytes == 0 {
return Ok(());
}
let write_payload_ty = match write_payload_ty {
Some(ty) => ty,
None => bail_bug!("expected write payload type to be present"),
};
let read_payload_ty = match read_payload_ty {
Some(ty) => ty,
None => bail_bug!("expected read payload type to be present"),
};
if flat_abi.is_some() {
let store_opaque = store.store_opaque_mut();
assert_eq!(read_length_in_bytes, write_length_in_bytes);
if read_instance
.options_memory(store_opaque, read_options)
.as_ptr()
== write_instance
.options_memory(store_opaque, write_options)
.as_ptr()
{
let memory = read_instance.options_memory_mut(store_opaque, read_options);
memory.copy_within(
write_address..write_address + write_length_in_bytes,
read_address,
);
} else {
let src = write_instance.options_memory(store_opaque, write_options)
[write_address..][..write_length_in_bytes]
.as_ptr();
let dst = read_instance.options_memory_mut(store_opaque, read_options)
[read_address..][..read_length_in_bytes]
.as_mut_ptr();
unsafe {
src.copy_to_nonoverlapping(dst, write_length_in_bytes);
}
}
} else {
let store_opaque = store.store_opaque_mut();
let lift = &mut LiftContext::new(store_opaque, write_options, write_instance);
let bytes = &lift.memory()[write_address..][..write_length_in_bytes];
lift.consume_fuel_array(count, size_of::<Val>())?;
let values = (0..count)
.map(|index| {
let size = usize::try_from(write_abi.size32)?;
Val::load(lift, *write_payload_ty, &bytes[(index * size)..][..size])
})
.collect::<Result<Vec<_>>>()?;
let id = TableId::<TransmitHandle>::new(rep);
log::trace!("copy values {values:?} for {id:?}");
let old_thread = store.set_thread(read_caller_thread)?;
let lower =
&mut LowerContext::new(store.as_context_mut(), read_options, read_instance);
let mut ptr = read_address;
for value in values {
value.store(lower, *read_payload_ty, ptr)?;
ptr += usize::try_from(read_abi.size32)?;
}
store.set_thread(old_thread)?;
}
}
_ => bail_bug!("mismatched transmit types in copy"),
}
Ok(())
}
fn check_bounds(
self,
store: &StoreOpaque,
options: OptionsIndex,
ty: TransmitIndex,
address: usize,
count: usize,
) -> Result<()> {
let types = self.id().get(store).component().types();
let size = usize::try_from(
match ty {
TransmitIndex::Future(ty) => types[types[ty].ty]
.payload
.map(|ty| types.canonical_abi(&ty).size32),
TransmitIndex::Stream(ty) => types[types[ty].ty]
.payload
.map(|ty| types.canonical_abi(&ty).size32),
}
.unwrap_or(0),
)?;
if count > 0 && size > 0 {
self.options_memory(store, options)
.get(address..)
.and_then(|b| b.get(..size.checked_mul(count)?))
.map(drop)
.ok_or_else(|| crate::format_err!("read pointer out of bounds of memory"))
} else {
Ok(())
}
}
pub(super) fn guest_write<T: 'static>(
self,
mut store: StoreContextMut<T>,
caller: RuntimeComponentInstanceIndex,
ty: TransmitIndex,
options: OptionsIndex,
flat_abi: Option<FlatAbi>,
handle: u32,
address: u32,
count: u32,
) -> Result<ReturnCode> {
let count = ItemCount::new(count)?;
if !self.options(store.0, options).async_ {
store.0.check_blocking()?;
}
let address = usize::try_from(address)?;
self.check_bounds(store.0, options, ty, address, count.as_usize())?;
let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
let TransmitLocalState::Write { done } = *state else {
bail!(Trap::ConcurrentFutureStreamOp);
};
if done {
bail!("cannot write after being notified that the readable end dropped");
}
*state = TransmitLocalState::Busy;
let transmit_handle = TableId::<TransmitHandle>::new(rep);
let concurrent_state = store.0.concurrent_state_mut();
let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
let transmit = concurrent_state.get_mut(transmit_id)?;
log::trace!(
"guest_write {count} to {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
transmit.read
);
if transmit.done {
bail!("cannot write to future after previous write succeeded or readable end dropped");
}
let new_state = if let ReadState::Dropped = &transmit.read {
ReadState::Dropped
} else {
ReadState::Open
};
let set_guest_ready = |me: &mut ConcurrentState| {
let transmit = me.get_mut(transmit_id)?;
if !matches!(&transmit.write, WriteState::Open) {
bail_bug!("expected `WriteState::Open`; got `{:?}`", transmit.write);
}
transmit.write = WriteState::GuestReady {
instance: self,
caller,
ty,
flat_abi,
options,
address,
count,
handle,
};
Ok::<_, crate::Error>(())
};
let mut result = match mem::replace(&mut transmit.read, new_state) {
ReadState::GuestReady {
ty: read_ty,
flat_abi: read_flat_abi,
options: read_options,
address: read_address,
count: read_count,
handle: read_handle,
instance: read_instance,
caller_instance: read_caller_instance,
caller_thread: read_caller_thread,
} => {
if flat_abi != read_flat_abi {
bail_bug!("expected flat ABI calculations to be the same");
}
if let TransmitIndex::Future(_) = ty {
transmit.done = true;
}
let write_complete = count == 0 || read_count > 0;
let read_complete = count > 0;
let read_buffer_remaining = count < read_count;
let read_handle_rep = transmit.read_handle.rep();
let count = count.min(read_count);
Instance::copy(
store.as_context_mut(),
flat_abi,
self,
caller,
ty,
options,
address,
read_instance,
read_caller_instance,
read_caller_thread,
read_ty,
read_options,
read_address,
count,
rep,
)?;
let instance = read_instance.id().get(store.0);
let types = instance.component().types();
let item_size = match read_ty.payload(types) {
Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
None => 0,
};
let concurrent_state = store.0.concurrent_state_mut();
if read_complete {
let total = if let Some(Event::StreamRead {
code: ReturnCode::Completed(old_total),
..
}) = concurrent_state.take_event(read_handle_rep)?
{
count.add(old_total)?
} else {
count
};
let code = ReturnCode::completed(ty.kind(), total);
concurrent_state.send_read_result(read_ty, transmit_id, read_handle, code)?;
}
if read_buffer_remaining || (count == 0 && read_count == 0) {
let transmit = concurrent_state.get_mut(transmit_id)?;
transmit.read = ReadState::GuestReady {
ty: read_ty,
flat_abi: read_flat_abi,
options: read_options,
address: read_address + (count.as_usize() * item_size),
count: read_count.sub(count)?,
handle: read_handle,
instance: read_instance,
caller_instance: read_caller_instance,
caller_thread: read_caller_thread,
};
}
if write_complete {
ReturnCode::completed(ty.kind(), count)
} else {
set_guest_ready(concurrent_state)?;
ReturnCode::Blocked
}
}
ReadState::HostReady {
consume,
guest_offset,
cancel,
cancel_waker,
} => {
if cancel_waker.is_some() {
bail_bug!("expected cancel_waker to be none");
}
if cancel {
bail_bug!("expected cancel to be false");
}
if guest_offset != 0 {
bail_bug!("expected guest_offset to be 0");
}
if let TransmitIndex::Future(_) = ty {
transmit.done = true;
}
set_guest_ready(concurrent_state)?;
self.consume(
store.0,
ty.kind(),
transmit_id,
consume,
ItemCount::ZERO,
false,
)?
}
ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"),
ReadState::Open => {
set_guest_ready(concurrent_state)?;
ReturnCode::Blocked
}
ReadState::Dropped => {
if let TransmitIndex::Future(_) = ty {
transmit.done = true;
}
ReturnCode::Dropped(ItemCount::ZERO)
}
};
if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
result = self.wait_for_write(store.0, transmit_handle)?;
}
if result != ReturnCode::Blocked {
*self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
TransmitLocalState::Write {
done: matches!(result, ReturnCode::Dropped(_)),
};
}
log::trace!(
"guest_write result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
);
Ok(result)
}
pub(super) fn guest_read<T: 'static>(
self,
mut store: StoreContextMut<T>,
caller_instance: RuntimeComponentInstanceIndex,
ty: TransmitIndex,
options: OptionsIndex,
flat_abi: Option<FlatAbi>,
handle: u32,
address: u32,
count: u32,
) -> Result<ReturnCode> {
let count = ItemCount::new(count)?;
if !self.options(store.0, options).async_ {
store.0.check_blocking()?;
}
let address = usize::try_from(address)?;
self.check_bounds(store.0, options, ty, address, count.as_usize())?;
let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?;
let TransmitLocalState::Read { done } = *state else {
bail!(Trap::ConcurrentFutureStreamOp);
};
if done {
bail!("cannot read after being notified that the writable end dropped");
}
*state = TransmitLocalState::Busy;
let transmit_handle = TableId::<TransmitHandle>::new(rep);
let concurrent_state = store.0.concurrent_state_mut();
let caller_thread = concurrent_state.current_guest_thread()?;
let transmit_id = concurrent_state.get_mut(transmit_handle)?.state;
let transmit = concurrent_state.get_mut(transmit_id)?;
log::trace!(
"guest_read {count} from {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}",
transmit.write
);
if transmit.done {
bail!("cannot read from future after previous read succeeded");
}
let new_state = if let WriteState::Dropped = &transmit.write {
WriteState::Dropped
} else {
WriteState::Open
};
let set_guest_ready = |me: &mut ConcurrentState| {
let transmit = me.get_mut(transmit_id)?;
if !matches!(&transmit.read, ReadState::Open) {
bail_bug!("expected `ReadState::Open`; got `{:?}`", transmit.read);
}
transmit.read = ReadState::GuestReady {
ty,
flat_abi,
options,
address,
count,
handle,
instance: self,
caller_instance,
caller_thread,
};
Ok::<_, crate::Error>(())
};
let mut result = match mem::replace(&mut transmit.write, new_state) {
WriteState::GuestReady {
instance: write_instance,
ty: write_ty,
flat_abi: write_flat_abi,
options: write_options,
address: write_address,
count: write_count,
handle: write_handle,
caller: write_caller,
} => {
if flat_abi != write_flat_abi {
bail_bug!("expected flat ABI calculations to be the same");
}
if let TransmitIndex::Future(_) = ty {
transmit.done = true;
}
let write_handle_rep = transmit.write_handle.rep();
let write_complete = write_count == 0 || count > 0;
let read_complete = write_count > 0;
let write_buffer_remaining = count < write_count;
let count = count.min(write_count);
Instance::copy(
store.as_context_mut(),
flat_abi,
write_instance,
write_caller,
write_ty,
write_options,
write_address,
self,
caller_instance,
caller_thread,
ty,
options,
address,
count,
rep,
)?;
let instance = write_instance.id().get(store.0);
let types = instance.component().types();
let item_size = match write_ty.payload(types) {
Some(ty) => usize::try_from(types.canonical_abi(ty).size32)?,
None => 0,
};
let concurrent_state = store.0.concurrent_state_mut();
if write_complete {
let total = if let Some(Event::StreamWrite {
code: ReturnCode::Completed(old_total),
..
}) = concurrent_state.take_event(write_handle_rep)?
{
count.add(old_total)?
} else {
count
};
let code = ReturnCode::completed(ty.kind(), total);
concurrent_state.send_write_result(
write_ty,
transmit_id,
write_handle,
code,
)?;
}
if write_buffer_remaining {
let transmit = concurrent_state.get_mut(transmit_id)?;
transmit.write = WriteState::GuestReady {
instance: write_instance,
caller: write_caller,
ty: write_ty,
flat_abi: write_flat_abi,
options: write_options,
address: write_address + (count.as_usize() * item_size),
count: write_count.sub(count)?,
handle: write_handle,
};
}
if read_complete {
ReturnCode::completed(ty.kind(), count)
} else {
set_guest_ready(concurrent_state)?;
ReturnCode::Blocked
}
}
WriteState::HostReady {
produce,
try_into,
guest_offset,
cancel,
cancel_waker,
} => {
if cancel_waker.is_some() {
bail_bug!("expected cancel_waker to be none");
}
if cancel {
bail_bug!("expected cancel to be false");
}
if guest_offset != 0 {
bail_bug!("expected guest_offset to be 0");
}
set_guest_ready(concurrent_state)?;
let code = self.produce(
store.0,
ty.kind(),
transmit_id,
produce,
try_into,
ItemCount::ZERO,
false,
)?;
if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) {
store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true;
}
code
}
WriteState::Open => {
set_guest_ready(concurrent_state)?;
ReturnCode::Blocked
}
WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO),
};
if result == ReturnCode::Blocked && !self.options(store.0, options).async_ {
result = self.wait_for_read(store.0, transmit_handle)?;
}
if result != ReturnCode::Blocked {
*self.id().get_mut(store.0).get_mut_by_index(ty, handle)?.1 =
TransmitLocalState::Read {
done: matches!(
(result, ty),
(ReturnCode::Dropped(_), TransmitIndex::Stream(_))
),
};
}
log::trace!(
"guest_read result for {transmit_handle:?} (handle {handle}; state {transmit_id:?}): {result:?}",
);
Ok(result)
}
fn wait_for_write(
self,
store: &mut StoreOpaque,
handle: TableId<TransmitHandle>,
) -> Result<ReturnCode> {
let waitable = Waitable::Transmit(handle);
store.wait_for_event(waitable)?;
let event = waitable.take_event(store.concurrent_state_mut())?;
if let Some(event @ (Event::StreamWrite { code, .. } | Event::FutureWrite { code, .. })) =
event
{
waitable.on_delivery(store, self, event)?;
Ok(code)
} else {
bail_bug!("expected either a stream or future write event")
}
}
fn cancel_write(
self,
store: &mut StoreOpaque,
transmit_id: TableId<TransmitState>,
async_: bool,
) -> Result<ReturnCode> {
let state = store.concurrent_state_mut();
let transmit = state.get_mut(transmit_id)?;
log::trace!(
"host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}",
transmit.read,
transmit.write
);
let waitable = Waitable::Transmit(transmit.write_handle);
let code = if let Some(event) = waitable.take_event(state)? {
let (Event::FutureWrite { code, .. } | Event::StreamWrite { code, .. }) = event else {
bail_bug!("expected either a stream or future write event")
};
waitable.on_delivery(store, self, event)?;
match (code, event) {
(ReturnCode::Completed(count), Event::StreamWrite { .. }) => {
ReturnCode::Cancelled(count)
}
(ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
_ => bail_bug!("unexpected code/event combo"),
}
} else if let ReadState::HostReady {
cancel,
cancel_waker,
..
} = &mut state.get_mut(transmit_id)?.read
{
*cancel = true;
if let Some(waker) = cancel_waker.take() {
waker.wake();
}
if async_ {
ReturnCode::Blocked
} else {
let handle = store
.concurrent_state_mut()
.get_mut(transmit_id)?
.write_handle;
self.wait_for_write(store, handle)?
}
} else {
ReturnCode::Cancelled(ItemCount::ZERO)
};
if !matches!(code, ReturnCode::Blocked) {
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
match &transmit.write {
WriteState::GuestReady { .. } => {
transmit.write = WriteState::Open;
}
WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
WriteState::Open | WriteState::Dropped => {}
}
}
log::trace!("cancelled write {transmit_id:?}: {code:?}");
Ok(code)
}
fn wait_for_read(
self,
store: &mut StoreOpaque,
handle: TableId<TransmitHandle>,
) -> Result<ReturnCode> {
let waitable = Waitable::Transmit(handle);
store.wait_for_event(waitable)?;
let event = waitable.take_event(store.concurrent_state_mut())?;
if let Some(event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. })) =
event
{
waitable.on_delivery(store, self, event)?;
Ok(code)
} else {
bail_bug!("expected either a stream or future read event")
}
}
fn cancel_read(
self,
store: &mut StoreOpaque,
transmit_id: TableId<TransmitState>,
async_: bool,
) -> Result<ReturnCode> {
let state = store.concurrent_state_mut();
let transmit = state.get_mut(transmit_id)?;
log::trace!(
"host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}",
transmit.read,
transmit.write
);
let waitable = Waitable::Transmit(transmit.read_handle);
let code = if let Some(event) = waitable.take_event(state)? {
let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else {
bail_bug!("expected either a stream or future read event")
};
waitable.on_delivery(store, self, event)?;
match (code, event) {
(ReturnCode::Completed(count), Event::StreamRead { .. }) => {
ReturnCode::Cancelled(count)
}
(ReturnCode::Dropped(_) | ReturnCode::Completed(_), _) => code,
_ => bail_bug!("unexpected code/event combo"),
}
} else if let WriteState::HostReady {
cancel,
cancel_waker,
..
} = &mut state.get_mut(transmit_id)?.write
{
*cancel = true;
if let Some(waker) = cancel_waker.take() {
waker.wake();
}
if async_ {
ReturnCode::Blocked
} else {
let handle = store
.concurrent_state_mut()
.get_mut(transmit_id)?
.read_handle;
self.wait_for_read(store, handle)?
}
} else {
ReturnCode::Cancelled(ItemCount::ZERO)
};
if !matches!(code, ReturnCode::Blocked) {
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
match &transmit.read {
ReadState::GuestReady { .. } => {
transmit.read = ReadState::Open;
}
ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
bail_bug!("support host read cancellation")
}
ReadState::Open | ReadState::Dropped => {}
}
}
log::trace!("cancelled read {transmit_id:?}: {code:?}");
Ok(code)
}
fn guest_cancel_write(
self,
store: &mut StoreOpaque,
ty: TransmitIndex,
async_: bool,
writer: u32,
) -> Result<ReturnCode> {
if !async_ {
store.check_blocking()?;
}
let (rep, state) =
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
let id = TableId::<TransmitHandle>::new(rep);
log::trace!("guest cancel write {id:?} (handle {writer})");
match state {
TransmitLocalState::Write { .. } => {
bail!("stream or future write cancelled when no write is pending")
}
TransmitLocalState::Read { .. } => {
bail!("passed read end to `{{stream|future}}.cancel-write`")
}
TransmitLocalState::Busy => {}
}
let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
let code = self.cancel_write(store, transmit_id, async_)?;
if !matches!(code, ReturnCode::Blocked) {
let state =
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?
.1;
if let TransmitLocalState::Busy = state {
*state = TransmitLocalState::Write { done: false };
}
}
Ok(code)
}
fn guest_cancel_read(
self,
store: &mut StoreOpaque,
ty: TransmitIndex,
async_: bool,
reader: u32,
) -> Result<ReturnCode> {
if !async_ {
store.check_blocking()?;
}
let (rep, state) =
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
let id = TableId::<TransmitHandle>::new(rep);
log::trace!("guest cancel read {id:?} (handle {reader})");
match state {
TransmitLocalState::Read { .. } => {
bail!("stream or future read cancelled when no read is pending")
}
TransmitLocalState::Write { .. } => {
bail!("passed write end to `{{stream|future}}.cancel-read`")
}
TransmitLocalState::Busy => {}
}
let transmit_id = store.concurrent_state_mut().get_mut(id)?.state;
let code = self.cancel_read(store, transmit_id, async_)?;
if !matches!(code, ReturnCode::Blocked) {
let state =
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?
.1;
if let TransmitLocalState::Busy = state {
*state = TransmitLocalState::Read { done: false };
}
}
Ok(code)
}
fn guest_drop_readable(
self,
store: &mut StoreOpaque,
ty: TransmitIndex,
reader: u32,
) -> Result<()> {
let table = self.id().get_mut(store).table_for_transmit(ty);
let (rep, _is_done) = match ty {
TransmitIndex::Stream(ty) => table.stream_remove_readable(ty, reader)?,
TransmitIndex::Future(ty) => table.future_remove_readable(ty, reader)?,
};
let kind = match ty {
TransmitIndex::Stream(_) => TransmitKind::Stream,
TransmitIndex::Future(_) => TransmitKind::Future,
};
let id = TableId::<TransmitHandle>::new(rep);
log::trace!("guest_drop_readable: drop reader {id:?}");
store.host_drop_reader(id, kind)
}
pub(crate) fn error_context_new(
self,
store: &mut StoreOpaque,
ty: TypeComponentLocalErrorContextTableIndex,
options: OptionsIndex,
debug_msg_address: u32,
debug_msg_len: u32,
) -> Result<u32> {
let lift_ctx = &mut LiftContext::new(store, options, self);
let debug_msg = String::linear_lift_from_flat(
lift_ctx,
InterfaceType::String,
&[ValRaw::u32(debug_msg_address), ValRaw::u32(debug_msg_len)],
)?;
let err_ctx = ErrorContextState { debug_msg };
let state = store.concurrent_state_mut();
let table_id = state.push(err_ctx)?;
let global_ref_count_idx =
TypeComponentGlobalErrorContextTableIndex::from_u32(table_id.rep());
let _ = state
.global_error_context_ref_counts
.insert(global_ref_count_idx, GlobalErrorContextRefCount(1));
let local_idx = self
.id()
.get_mut(store)
.table_for_error_context(ty)
.error_context_insert(table_id.rep())?;
Ok(local_idx)
}
pub(super) fn error_context_debug_message<T>(
self,
store: StoreContextMut<T>,
ty: TypeComponentLocalErrorContextTableIndex,
options: OptionsIndex,
err_ctx_handle: u32,
debug_msg_address: u32,
) -> Result<()> {
let handle_table_id_rep = self
.id()
.get_mut(store.0)
.table_for_error_context(ty)
.error_context_rep(err_ctx_handle)?;
let state = store.0.concurrent_state_mut();
let ErrorContextState { debug_msg } =
state.get_mut(TableId::<ErrorContextState>::new(handle_table_id_rep))?;
let debug_msg = debug_msg.clone();
let lower_cx = &mut LowerContext::new(store, options, self);
let debug_msg_address = usize::try_from(debug_msg_address)?;
let offset = lower_cx
.as_slice_mut()
.get(debug_msg_address..)
.and_then(|b| b.get(..8))
.map(|_| debug_msg_address)
.ok_or_else(|| crate::format_err!("invalid debug message pointer: out of bounds"))?;
debug_msg
.as_str()
.linear_lower_to_memory(lower_cx, InterfaceType::String, offset)?;
Ok(())
}
pub(crate) fn future_cancel_read(
self,
store: &mut StoreOpaque,
ty: TypeFutureTableIndex,
async_: bool,
reader: u32,
) -> Result<u32> {
self.guest_cancel_read(store, TransmitIndex::Future(ty), async_, reader)
.map(|v| v.encode())
}
pub(crate) fn future_cancel_write(
self,
store: &mut StoreOpaque,
ty: TypeFutureTableIndex,
async_: bool,
writer: u32,
) -> Result<u32> {
self.guest_cancel_write(store, TransmitIndex::Future(ty), async_, writer)
.map(|v| v.encode())
}
pub(crate) fn stream_cancel_read(
self,
store: &mut StoreOpaque,
ty: TypeStreamTableIndex,
async_: bool,
reader: u32,
) -> Result<u32> {
self.guest_cancel_read(store, TransmitIndex::Stream(ty), async_, reader)
.map(|v| v.encode())
}
pub(crate) fn stream_cancel_write(
self,
store: &mut StoreOpaque,
ty: TypeStreamTableIndex,
async_: bool,
writer: u32,
) -> Result<u32> {
self.guest_cancel_write(store, TransmitIndex::Stream(ty), async_, writer)
.map(|v| v.encode())
}
pub(crate) fn future_drop_readable(
self,
store: &mut StoreOpaque,
ty: TypeFutureTableIndex,
reader: u32,
) -> Result<()> {
self.guest_drop_readable(store, TransmitIndex::Future(ty), reader)
}
pub(crate) fn stream_drop_readable(
self,
store: &mut StoreOpaque,
ty: TypeStreamTableIndex,
reader: u32,
) -> Result<()> {
self.guest_drop_readable(store, TransmitIndex::Stream(ty), reader)
}
fn guest_new(self, store: &mut StoreOpaque, ty: TransmitIndex) -> Result<ResourcePair> {
let (write, read) = store
.concurrent_state_mut()
.new_transmit(TransmitOrigin::guest(self.id().instance(), ty))?;
let table = self.id().get_mut(store).table_for_transmit(ty);
let (read_handle, write_handle) = match ty {
TransmitIndex::Future(ty) => (
table.future_insert_read(ty, read.rep())?,
table.future_insert_write(ty, write.rep())?,
),
TransmitIndex::Stream(ty) => (
table.stream_insert_read(ty, read.rep())?,
table.stream_insert_write(ty, write.rep())?,
),
};
let state = store.concurrent_state_mut();
state.get_mut(read)?.common.handle = Some(read_handle);
state.get_mut(write)?.common.handle = Some(write_handle);
Ok(ResourcePair {
write: write_handle,
read: read_handle,
})
}
pub(crate) fn error_context_drop(
self,
store: &mut StoreOpaque,
ty: TypeComponentLocalErrorContextTableIndex,
error_context: u32,
) -> Result<()> {
let instance = self.id().get_mut(store);
let local_handle_table = instance.table_for_error_context(ty);
let rep = local_handle_table.error_context_drop(error_context)?;
let global_ref_count_idx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep);
let state = store.concurrent_state_mut();
let Some(GlobalErrorContextRefCount(global_ref_count)) = state
.global_error_context_ref_counts
.get_mut(&global_ref_count_idx)
else {
bail_bug!("retrieve concurrent state for error context during drop")
};
if *global_ref_count < 1 {
bail_bug!("ref count unexpectedly zero");
}
*global_ref_count -= 1;
if *global_ref_count == 0 {
state
.global_error_context_ref_counts
.remove(&global_ref_count_idx);
state
.delete(TableId::<ErrorContextState>::new(rep))
.context("deleting component-global error context data")?;
}
Ok(())
}
fn guest_transfer(
self,
store: &mut StoreOpaque,
src_idx: u32,
src: TransmitIndex,
dst: TransmitIndex,
) -> Result<u32> {
let id = self.lift_index_to_transmit(store, src, src_idx)?;
self.lower_transmit_to_index(store, dst, id)
}
fn lift_index_to_transmit(
self,
store: &mut StoreOpaque,
ty: TransmitIndex,
src_idx: u32,
) -> Result<TableId<TransmitHandle>> {
let (state, _, _, instance) = store.lift_context_parts(self);
lift_index_to_transmit(instance, state.concurrent_state_mut(), ty, src_idx)
}
fn lower_transmit_to_index(
self,
store: &mut StoreOpaque,
ty: TransmitIndex,
id: TableId<TransmitHandle>,
) -> Result<u32> {
let (state, _, _, instance) = store.lift_context_parts(self);
lower_transmit_to_index(instance, state.concurrent_state_mut(), ty, id)
}
pub(crate) fn future_new(
self,
store: &mut StoreOpaque,
ty: TypeFutureTableIndex,
) -> Result<ResourcePair> {
self.guest_new(store, TransmitIndex::Future(ty))
}
pub(crate) fn stream_new(
self,
store: &mut StoreOpaque,
ty: TypeStreamTableIndex,
) -> Result<ResourcePair> {
self.guest_new(store, TransmitIndex::Stream(ty))
}
pub(crate) fn future_transfer(
self,
store: &mut StoreOpaque,
src_idx: u32,
src: TypeFutureTableIndex,
dst: TypeFutureTableIndex,
) -> Result<u32> {
self.guest_transfer(
store,
src_idx,
TransmitIndex::Future(src),
TransmitIndex::Future(dst),
)
}
pub(crate) fn stream_transfer(
self,
store: &mut StoreOpaque,
src_idx: u32,
src: TypeStreamTableIndex,
dst: TypeStreamTableIndex,
) -> Result<u32> {
self.guest_transfer(
store,
src_idx,
TransmitIndex::Stream(src),
TransmitIndex::Stream(dst),
)
}
pub(crate) fn error_context_transfer(
self,
store: &mut StoreOpaque,
src_idx: u32,
src: TypeComponentLocalErrorContextTableIndex,
dst: TypeComponentLocalErrorContextTableIndex,
) -> Result<u32> {
let mut instance = self.id().get_mut(store);
let rep = instance
.as_mut()
.table_for_error_context(src)
.error_context_rep(src_idx)?;
let dst_idx = instance
.table_for_error_context(dst)
.error_context_insert(rep)?;
let global_ref_count = store
.concurrent_state_mut()
.global_error_context_ref_counts
.get_mut(&TypeComponentGlobalErrorContextTableIndex::from_u32(rep))
.context("global ref count present for existing (sub)component error context")?;
global_ref_count.0 = global_ref_count
.0
.checked_add(1)
.ok_or_else(|| format_err!(Trap::ReferenceCountOverflow))?;
Ok(dst_idx)
}
}
fn lift_index_to_transmit(
instance: Pin<&mut ComponentInstance>,
concurrent_state: &mut ConcurrentState,
ty: TransmitIndex,
src_idx: u32,
) -> Result<TableId<TransmitHandle>> {
let handle_table = instance.table_for_transmit(ty);
let (rep, is_done) = match ty {
TransmitIndex::Future(idx) => handle_table.future_remove_readable(idx, src_idx)?,
TransmitIndex::Stream(idx) => handle_table.stream_remove_readable(idx, src_idx)?,
};
let desc = match ty {
TransmitIndex::Future(_) => "future",
TransmitIndex::Stream(_) => "stream",
};
if is_done {
bail!("cannot lift {desc} after being notified that the writable end dropped");
}
let id = TableId::<TransmitHandle>::new(rep);
let future = concurrent_state.get_mut(id)?;
if future.common.set.is_some() {
bail!("cannot lift {desc} while it's in a waitable set");
}
future.common.handle = None;
let state = future.state;
if concurrent_state.get_mut(state)?.done {
bail!("cannot lift {desc} after previous read succeeded");
}
Ok(id)
}
fn lower_transmit_to_index(
instance: Pin<&mut ComponentInstance>,
concurrent_state: &mut ConcurrentState,
ty: TransmitIndex,
id: TableId<TransmitHandle>,
) -> Result<u32> {
let state = concurrent_state.get_mut(id)?.state;
debug_assert_eq!(concurrent_state.get_mut(state)?.read_handle, id);
let handle_table = instance.table_for_transmit(ty);
let handle = match ty {
TransmitIndex::Future(idx) => handle_table.future_insert_read(idx, id.rep()),
TransmitIndex::Stream(idx) => handle_table.stream_insert_read(idx, id.rep()),
}?;
concurrent_state.get_mut(id)?.common.handle = Some(handle);
Ok(handle)
}
impl ComponentInstance {
fn table_for_transmit(self: Pin<&mut Self>, ty: TransmitIndex) -> &mut HandleTable {
let (states, types) = self.instance_states();
let runtime_instance = match ty {
TransmitIndex::Stream(ty) => types[ty].instance,
TransmitIndex::Future(ty) => types[ty].instance,
};
states[runtime_instance].handle_table()
}
fn table_for_error_context(
self: Pin<&mut Self>,
ty: TypeComponentLocalErrorContextTableIndex,
) -> &mut HandleTable {
let (states, types) = self.instance_states();
let runtime_instance = types[ty].instance;
states[runtime_instance].handle_table()
}
fn get_mut_by_index(
self: Pin<&mut Self>,
ty: TransmitIndex,
index: u32,
) -> Result<(u32, &mut TransmitLocalState)> {
get_mut_by_index_from(self.table_for_transmit(ty), ty, index)
}
}
impl ConcurrentState {
fn send_write_result(
&mut self,
ty: TransmitIndex,
id: TableId<TransmitState>,
handle: u32,
code: ReturnCode,
) -> Result<()> {
let write_handle = self.get_mut(id)?.write_handle.rep();
self.set_event(
write_handle,
match ty {
TransmitIndex::Future(ty) => Event::FutureWrite {
code,
pending: Some((ty, handle)),
},
TransmitIndex::Stream(ty) => Event::StreamWrite {
code,
pending: Some((ty, handle)),
},
},
)
}
fn send_read_result(
&mut self,
ty: TransmitIndex,
id: TableId<TransmitState>,
handle: u32,
code: ReturnCode,
) -> Result<()> {
let read_handle = self.get_mut(id)?.read_handle.rep();
self.set_event(
read_handle,
match ty {
TransmitIndex::Future(ty) => Event::FutureRead {
code,
pending: Some((ty, handle)),
},
TransmitIndex::Stream(ty) => Event::StreamRead {
code,
pending: Some((ty, handle)),
},
},
)
}
fn take_event(&mut self, waitable: u32) -> Result<Option<Event>> {
Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).take_event(self)
}
fn set_event(&mut self, waitable: u32, event: Event) -> Result<()> {
Waitable::Transmit(TableId::<TransmitHandle>::new(waitable)).set_event(self, Some(event))
}
fn update_event(&mut self, waitable: u32, event: Event) -> Result<()> {
let waitable = Waitable::Transmit(TableId::<TransmitHandle>::new(waitable));
fn update_code(old: ReturnCode, new: ReturnCode) -> Result<ReturnCode> {
let (ReturnCode::Completed(count)
| ReturnCode::Dropped(count)
| ReturnCode::Cancelled(count)) = old
else {
bail_bug!("unexpected old return code")
};
Ok(match new {
ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count),
ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count),
_ => bail_bug!("unexpected new return code"),
})
}
let event = match (waitable.take_event(self)?, event) {
(None, _) => event,
(Some(old @ Event::FutureWrite { .. }), Event::FutureWrite { .. }) => old,
(Some(old @ Event::FutureRead { .. }), Event::FutureRead { .. }) => old,
(
Some(Event::StreamWrite {
code: old_code,
pending: old_pending,
}),
Event::StreamWrite { code, pending },
) => Event::StreamWrite {
code: update_code(old_code, code)?,
pending: old_pending.or(pending),
},
(
Some(Event::StreamRead {
code: old_code,
pending: old_pending,
}),
Event::StreamRead { code, pending },
) => Event::StreamRead {
code: update_code(old_code, code)?,
pending: old_pending.or(pending),
},
_ => bail_bug!("unexpected event combination"),
};
waitable.set_event(self, Some(event))
}
fn new_transmit(
&mut self,
origin: TransmitOrigin,
) -> Result<(TableId<TransmitHandle>, TableId<TransmitHandle>)> {
let state_id = self.push(TransmitState::new(origin))?;
let write = self.push(TransmitHandle::new(state_id))?;
let read = self.push(TransmitHandle::new(state_id))?;
let state = self.get_mut(state_id)?;
state.write_handle = write;
state.read_handle = read;
log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",);
Ok((write, read))
}
fn delete_transmit(&mut self, state_id: TableId<TransmitState>) -> Result<()> {
let state = self.delete(state_id)?;
self.delete(state.write_handle)?;
self.delete(state.read_handle)?;
log::trace!(
"delete transmit: state {state_id:?}; write {:?}; read {:?}",
state.write_handle,
state.read_handle,
);
Ok(())
}
}
pub(crate) struct ResourcePair {
pub(crate) write: u32,
pub(crate) read: u32,
}
impl Waitable {
pub(super) fn on_delivery(
&self,
store: &mut StoreOpaque,
instance: Instance,
event: Event,
) -> Result<()> {
let instance = instance.id().get_mut(store);
let (rep, state, code) = match event {
Event::FutureRead {
pending: Some((ty, handle)),
code,
}
| Event::FutureWrite {
pending: Some((ty, handle)),
code,
} => {
let runtime_instance = instance.component().types()[ty].instance;
let (rep, state) = instance.instance_states().0[runtime_instance]
.handle_table()
.future_rep(ty, handle)?;
(rep, state, code)
}
Event::StreamRead {
pending: Some((ty, handle)),
code,
}
| Event::StreamWrite {
pending: Some((ty, handle)),
code,
} => {
let runtime_instance = instance.component().types()[ty].instance;
let (rep, state) = instance.instance_states().0[runtime_instance]
.handle_table()
.stream_rep(ty, handle)?;
(rep, state, code)
}
_ => return Ok(()),
};
if rep != self.rep() {
bail_bug!("unexpected rep mismatch");
}
if *state != TransmitLocalState::Busy {
bail_bug!("expected state to be busy");
}
let done = matches!(code, ReturnCode::Dropped(_));
*state = match event {
Event::FutureRead { .. } | Event::StreamRead { .. } => {
TransmitLocalState::Read { done }
}
Event::FutureWrite { .. } | Event::StreamWrite { .. } => {
TransmitLocalState::Write { done }
}
_ => bail_bug!("unexpected event for stream"),
};
let transmit_handle = TableId::<TransmitHandle>::new(rep);
let state = store.concurrent_state_mut();
let transmit_id = state.get_mut(transmit_handle)?.state;
let transmit = state.get_mut(transmit_id)?;
match event {
Event::StreamRead { .. } => {
transmit.read = ReadState::Open;
}
Event::StreamWrite { .. } => transmit.write = WriteState::Open,
_ => {}
}
Ok(())
}
}
fn allow_intra_component_read_write(ty: Option<&InterfaceType>) -> bool {
matches!(
ty,
None | Some(
InterfaceType::S8
| InterfaceType::U8
| InterfaceType::S16
| InterfaceType::U16
| InterfaceType::S32
| InterfaceType::U32
| InterfaceType::S64
| InterfaceType::U64
| InterfaceType::Float32
| InterfaceType::Float64
)
)
}
struct LockedState<T> {
inner: TryMutex<Option<T>>,
}
impl<T> LockedState<T> {
fn new(value: T) -> Self {
Self {
inner: TryMutex::new(Some(value)),
}
}
fn try_lock(&self) -> Result<TryMutexGuard<'_, Option<T>>> {
match self.inner.try_lock() {
Some(lock) => Ok(lock),
None => bail_bug!("should not have contention on state lock"),
}
}
fn take(&self) -> Result<LockedStateGuard<'_, T>> {
let result = self.try_lock()?.take();
match result {
Some(result) => Ok(LockedStateGuard {
value: ManuallyDrop::new(result),
state: self,
}),
None => bail_bug!("lock value unexpectedly missing"),
}
}
fn with<R>(&self, f: impl FnOnce(&mut T) -> R) -> Result<R> {
let mut inner = self.try_lock()?;
match &mut *inner {
Some(state) => Ok(f(state)),
None => bail_bug!("lock value unexpectedly missing"),
}
}
}
struct LockedStateGuard<'a, T> {
value: ManuallyDrop<T>,
state: &'a LockedState<T>,
}
impl<T> Deref for LockedStateGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
&self.value
}
}
impl<T> DerefMut for LockedStateGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.value
}
}
impl<T> Drop for LockedStateGuard<'_, T> {
fn drop(&mut self) {
let value = unsafe { ManuallyDrop::take(&mut self.value) };
if let Ok(mut lock) = self.state.try_lock() {
*lock = Some(value);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Engine, Store};
use core::future::pending;
use core::pin::pin;
use std::sync::LazyLock;
static ENGINE: LazyLock<Engine> = LazyLock::new(Engine::default);
fn poll_future_producer<T>(rx: Pin<&mut T>, finish: bool) -> Poll<Result<Option<T::Item>>>
where
T: FutureProducer<()>,
{
rx.poll_produce(
&mut Context::from_waker(Waker::noop()),
Store::new(&ENGINE, ()).as_context_mut(),
finish,
)
}
#[test]
fn future_producer() {
let mut fut = pin!(async { crate::error::Ok(()) });
assert!(matches!(
poll_future_producer(fut.as_mut(), false),
Poll::Ready(Ok(Some(()))),
));
let mut fut = pin!(async { crate::error::Ok(()) });
assert!(matches!(
poll_future_producer(fut.as_mut(), true),
Poll::Ready(Ok(Some(()))),
));
let mut fut = pin!(pending::<Result<()>>());
assert!(matches!(
poll_future_producer(fut.as_mut(), false),
Poll::Pending,
));
assert!(matches!(
poll_future_producer(fut.as_mut(), true),
Poll::Ready(Ok(None)),
));
let (tx, rx) = oneshot::channel();
let mut rx = pin!(rx);
assert!(matches!(
poll_future_producer(rx.as_mut(), false),
Poll::Pending,
));
assert!(matches!(
poll_future_producer(rx.as_mut(), true),
Poll::Ready(Ok(None)),
));
tx.send(()).unwrap();
assert!(matches!(
poll_future_producer(rx.as_mut(), true),
Poll::Ready(Ok(Some(()))),
));
let (tx, rx) = oneshot::channel();
let mut rx = pin!(rx);
tx.send(()).unwrap();
assert!(matches!(
poll_future_producer(rx.as_mut(), false),
Poll::Ready(Ok(Some(()))),
));
let (tx, rx) = oneshot::channel::<()>();
let mut rx = pin!(rx);
drop(tx);
assert!(matches!(
poll_future_producer(rx.as_mut(), false),
Poll::Ready(Err(..)),
));
let (tx, rx) = oneshot::channel::<()>();
let mut rx = pin!(rx);
drop(tx);
assert!(matches!(
poll_future_producer(rx.as_mut(), true),
Poll::Ready(Err(..)),
));
}
}