#![deny(missing_docs)]
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::{io, task};
use wasmer::RuntimeError;
use wasmer_wasi::WasiStateBuilder;
mod pipe;
mod stdio;
pub use stdio::{Stderr, Stdin, Stdout};
use pipe::LockPipe;
pub fn add_stdio(state: &mut WasiStateBuilder) -> &mut WasiStateBuilder {
state
.stdin(Box::new(stdio::Stdin))
.stdout(Box::new(stdio::Stdout))
.stderr(Box::new(stdio::Stderr))
}
tokio::task_local! {
static STDIN: LockPipe;
static STDOUT: LockPipe;
static STDERR: LockPipe;
}
pub struct WasiStdin {
inner: LockPipe,
}
impl AsyncWrite for WasiStdin {
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
Pin::new(&mut &self.inner).poll_write(cx, buf)
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut &self.inner).poll_flush(cx)
}
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut &self.inner).poll_shutdown(cx)
}
}
pub struct WasiStdout {
inner: LockPipe,
}
impl AsyncRead for WasiStdout {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut io::ReadBuf,
) -> Poll<io::Result<()>> {
Pin::new(&mut &self.inner).poll_read(cx, buf)
}
}
pub struct WasiStderr {
inner: LockPipe,
}
impl AsyncRead for WasiStderr {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut io::ReadBuf,
) -> Poll<io::Result<()>> {
Pin::new(&mut &self.inner).poll_read(cx, buf)
}
}
#[must_use = "WasiProcess does nothing without being polled or spawned. Try calling `.spawn()`"]
pub struct WasiProcess {
pub stdin: Option<WasiStdin>,
pub stdout: Option<WasiStdout>,
pub stderr: Option<WasiStderr>,
handle: Pin<Box<dyn Future<Output = Result<(), RuntimeError>> + Send + Sync>>,
}
#[derive(Debug, Copy, Clone)]
pub struct MaxBufSize {
pub stdin: usize,
pub stdout: usize,
pub stderr: usize,
}
const DEFAULT_BUF_SIZE: usize = 1024;
impl Default for MaxBufSize {
fn default() -> Self {
MaxBufSize {
stdin: DEFAULT_BUF_SIZE,
stdout: DEFAULT_BUF_SIZE,
stderr: DEFAULT_BUF_SIZE,
}
}
}
impl WasiProcess {
pub fn new(
instance: &wasmer::Instance,
buf_size: MaxBufSize,
) -> Result<Self, wasmer::ExportError> {
let start = instance.exports.get_function("_start")?.clone();
Ok(Self::with_function(start, buf_size))
}
pub fn with_function(start_function: wasmer::Function, buf_size: MaxBufSize) -> Self {
let stdin = LockPipe::new(buf_size.stdin);
let stdout = LockPipe::new(buf_size.stdout);
let stderr = LockPipe::new(buf_size.stderr);
let handle = STDIN.scope(
stdin.clone(),
STDOUT.scope(
stdout.clone(),
STDERR.scope(stderr.clone(), async move {
task::block_in_place(|| start_function.call(&[]).map(drop))
}),
),
);
Self {
stdin: Some(WasiStdin { inner: stdin }),
stdout: Some(WasiStdout { inner: stdout }),
stderr: Some(WasiStderr { inner: stderr }),
handle: Box::pin(handle),
}
}
pub fn spawn(self) -> SpawnHandle {
let inner = tokio::spawn(self);
SpawnHandle { inner }
}
}
impl Future for WasiProcess {
type Output = Result<(), RuntimeError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.handle.as_mut().poll(cx)
}
}
#[derive(Debug)]
pub struct SpawnHandle {
inner: tokio::task::JoinHandle<<WasiProcess as Future>::Output>,
}
impl Future for SpawnHandle {
type Output = Result<(), SpawnError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.inner)
.poll(cx)
.map(|res| res.map_err(SpawnError::Join)?.map_err(SpawnError::Wasi))
}
}
#[derive(Debug)]
pub enum SpawnError {
Wasi(RuntimeError),
Join(tokio::task::JoinError),
}
impl fmt::Display for SpawnError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Wasi(w) => write!(f, "runtime wasi/wasm error: {}", w),
Self::Join(j) => write!(f, "error while joining the tokio task: {}", j),
}
}
}
impl std::error::Error for SpawnError {}