use super::{AsyncPollable, AsyncRead, AsyncWrite};
use std::cell::RefCell;
use std::io::Result;
use wasi::io::streams::{InputStream, OutputStream, StreamError};
#[derive(Debug)]
pub struct AsyncInputStream {
subscription: RefCell<Option<AsyncPollable>>,
stream: InputStream,
}
impl AsyncInputStream {
pub fn new(stream: InputStream) -> Self {
Self {
subscription: RefCell::new(None),
stream,
}
}
async fn ready(&self) {
if self.subscription.borrow().is_none() {
self.subscription
.replace(Some(AsyncPollable::new(self.stream.subscribe())));
}
self.subscription
.borrow()
.as_ref()
.expect("populated refcell")
.wait_for()
.await;
}
}
impl AsyncRead for AsyncInputStream {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.ready().await;
let read = match self.stream.read(buf.len() as u64) {
Ok(r) => r,
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(err.to_debug_string()))
}
};
let len = read.len();
buf[0..len].copy_from_slice(&read);
Ok(len)
}
}
#[derive(Debug)]
pub struct AsyncOutputStream {
subscription: RefCell<Option<AsyncPollable>>,
stream: OutputStream,
}
impl AsyncOutputStream {
pub fn new(stream: OutputStream) -> Self {
Self {
subscription: RefCell::new(None),
stream,
}
}
async fn ready(&self) {
if self.subscription.borrow().is_none() {
self.subscription
.replace(Some(AsyncPollable::new(self.stream.subscribe())));
}
self.subscription
.borrow()
.as_ref()
.expect("populated refcell")
.wait_for()
.await;
}
}
impl AsyncWrite for AsyncOutputStream {
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
loop {
match self.stream.check_write() {
Ok(0) => {
self.ready().await;
continue;
}
Ok(some) => {
let writable = some.try_into().unwrap_or(usize::MAX).min(buf.len());
match self.stream.write(&buf[0..writable]) {
Ok(()) => return Ok(writable),
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(err.to_debug_string()))
}
}
}
Err(StreamError::Closed) => return Ok(0),
Err(StreamError::LastOperationFailed(err)) => {
return Err(std::io::Error::other(err.to_debug_string()))
}
}
}
}
async fn flush(&mut self) -> Result<()> {
match self.stream.flush() {
Ok(()) => {
self.ready().await;
Ok(())
}
Err(StreamError::Closed) => Ok(()),
Err(StreamError::LastOperationFailed(err)) => {
Err(std::io::Error::other(err.to_debug_string()))
}
}
}
}