use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::io::{AsyncRead, AsyncWrite};
pub struct WasiTcpStream {
#[cfg(target_arch = "wasm32")]
inner: Arc<wstd::net::TcpStream>,
#[cfg(target_arch = "wasm32")]
read_state: ReadState,
#[cfg(target_arch = "wasm32")]
write_state: WriteState,
#[cfg(not(target_arch = "wasm32"))]
_phantom: std::marker::PhantomData<()>,
}
#[cfg(target_arch = "wasm32")]
unsafe impl Send for WasiTcpStream {}
#[cfg(target_arch = "wasm32")]
unsafe impl Sync for WasiTcpStream {}
#[cfg(target_arch = "wasm32")]
type WasmBoxFut<T> = Pin<Box<dyn std::future::Future<Output = T>>>;
#[cfg(target_arch = "wasm32")]
enum ReadState {
Idle,
Pending(WasmBoxFut<io::Result<(Vec<u8>, usize)>>),
}
#[cfg(target_arch = "wasm32")]
enum WriteState {
Idle,
Writing(WasmBoxFut<io::Result<usize>>),
Flushing(WasmBoxFut<io::Result<()>>),
}
impl WasiTcpStream {
#[cfg(target_arch = "wasm32")]
pub fn new(stream: wstd::net::TcpStream) -> Self {
Self {
inner: Arc::new(stream),
read_state: ReadState::Idle,
write_state: WriteState::Idle,
}
}
}
impl AsyncRead for WasiTcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
#[cfg(target_arch = "wasm32")]
{
let this = self.get_mut();
loop {
match &mut this.read_state {
ReadState::Idle => {
let stream = Arc::clone(&this.inner);
let len = buf.len();
let fut: WasmBoxFut<_> = Box::pin(async move {
use wstd::io::AsyncRead as _;
let mut tmp = vec![0u8; len];
let mut s = &*stream;
let n = s.read(&mut tmp).await?;
Ok((tmp, n))
});
this.read_state = ReadState::Pending(fut);
}
ReadState::Pending(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok((tmp, n))) => {
let to_copy = n.min(buf.len());
buf[..to_copy].copy_from_slice(&tmp[..to_copy]);
this.read_state = ReadState::Idle;
return Poll::Ready(Ok(to_copy));
}
Poll::Ready(Err(e)) => {
this.read_state = ReadState::Idle;
return Poll::Ready(Err(e));
}
},
}
}
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (cx, buf);
Poll::Ready(Err(unsupported()))
}
}
}
impl AsyncWrite for WasiTcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
#[cfg(target_arch = "wasm32")]
{
let this = self.get_mut();
loop {
match &mut this.write_state {
WriteState::Idle => {
let stream = Arc::clone(&this.inner);
let data = buf.to_vec();
let fut: WasmBoxFut<_> = Box::pin(async move {
use wstd::io::AsyncWrite as _;
let mut s = &*stream;
s.write(&data).await
});
this.write_state = WriteState::Writing(fut);
}
WriteState::Writing(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(result) => {
this.write_state = WriteState::Idle;
return Poll::Ready(result);
}
},
WriteState::Flushing(_) => return Poll::Pending,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
{
let _ = (cx, buf);
Poll::Ready(Err(unsupported()))
}
}
fn poll_flush(
self: Pin<&mut Self>,
#[allow(unused_variables)] cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
#[cfg(target_arch = "wasm32")]
{
let this = self.get_mut();
loop {
match &mut this.write_state {
WriteState::Idle => {
let stream = Arc::clone(&this.inner);
let fut: WasmBoxFut<_> = Box::pin(async move {
use wstd::io::AsyncWrite as _;
let mut s = &*stream;
s.flush().await
});
this.write_state = WriteState::Flushing(fut);
}
WriteState::Writing(_) => return Poll::Pending,
WriteState::Flushing(fut) => match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(result) => {
this.write_state = WriteState::Idle;
return Poll::Ready(result);
}
},
}
}
}
#[cfg(not(target_arch = "wasm32"))]
Poll::Ready(Ok(()))
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.as_mut().poll_flush(cx)
}
}
#[cfg(not(target_arch = "wasm32"))]
fn unsupported() -> io::Error {
io::Error::new(io::ErrorKind::Unsupported, "WasiTcpStream is only functional on wasm32-wasip2")
}