use std::time::Instant;
use wasmtime::Result as WResult;
use wasmtime::component::Resource;
use wasmtime_wasi::p2::bindings::sync::io::poll as wasi_poll;
use crate::engine::wasm::bindings::astrid::io::poll::{self as astrid_poll, ErrorCode, Pollable};
use crate::engine::wasm::host_state::HostState;
const MAX_POLL_LIST: usize = 256;
fn map_inner_err(err: wasmtime::Error) -> ErrorCode {
ErrorCode::Unknown(err.to_string())
}
impl astrid_poll::Host for HostState {
fn poll(&mut self, pollables: Vec<Resource<Pollable>>) -> Result<Vec<u32>, ErrorCode> {
if pollables.is_empty() {
return Err(ErrorCode::InvalidInput);
}
if pollables.len() > MAX_POLL_LIST {
return Err(ErrorCode::TooLarge);
}
let capsule_id = self.capsule_id.as_str().to_owned();
let principal = self.effective_principal();
let count = pollables.len();
let started = Instant::now();
let cancel = self.cancel_token.clone();
let result = if cancel.is_cancelled() {
Err(ErrorCode::Cancelled)
} else {
wasi_poll::Host::poll(&mut self.resource_table, pollables).map_err(map_inner_err)
};
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
match &result {
Ok(ready) => tracing::debug!(
target: "astrid.audit.io",
%capsule_id,
%principal,
fn = "astrid:io/poll.poll",
pollable_count = count,
ready_count = ready.len(),
elapsed_ms,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.io",
%capsule_id,
%principal,
fn = "astrid:io/poll.poll",
pollable_count = count,
error = ?e,
elapsed_ms,
"audit",
),
}
result
}
}
impl astrid_poll::HostPollable for HostState {
fn ready(&mut self, self_: Resource<Pollable>) -> bool {
wasi_poll::HostPollable::ready(&mut self.resource_table, self_).unwrap_or(false)
}
fn block(&mut self, self_: Resource<Pollable>) -> Result<(), ErrorCode> {
let capsule_id = self.capsule_id.as_str().to_owned();
let principal = self.effective_principal();
let started = Instant::now();
let cancel = self.cancel_token.clone();
let result = if cancel.is_cancelled() {
Err(ErrorCode::Cancelled)
} else {
wasi_poll::HostPollable::block(&mut self.resource_table, self_).map_err(map_inner_err)
};
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
match &result {
Ok(()) => tracing::debug!(
target: "astrid.audit.io",
%capsule_id,
%principal,
fn = "astrid:io/poll/pollable.block",
elapsed_ms,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.io",
%capsule_id,
%principal,
fn = "astrid:io/poll/pollable.block",
error = ?e,
elapsed_ms,
"audit",
),
}
result
}
fn drop(&mut self, rep: Resource<Pollable>) -> WResult<()> {
wasi_poll::HostPollable::drop(&mut self.resource_table, rep)
}
}
mod astrid_error_impl {
use super::*;
use crate::engine::wasm::bindings::astrid::io::error::{self as astrid_error, Error};
use wasmtime_wasi::p2::IoError;
impl astrid_error::Host for HostState {}
impl astrid_error::HostError for HostState {
fn to_debug_string(&mut self, self_: Resource<Error>) -> String {
let rep = self_.rep();
self.resource_table
.get::<IoError>(&Resource::new_borrow(rep))
.map(|e| format!("{e:?}"))
.unwrap_or_else(|_| "error resource not found".to_string())
}
fn drop(&mut self, rep: Resource<Error>) -> WResult<()> {
let _ = self
.resource_table
.delete::<IoError>(Resource::new_own(rep.rep()));
Ok(())
}
}
}
mod astrid_streams_impl {
use super::*;
use crate::engine::wasm::bindings::astrid::io::poll::Pollable;
use crate::engine::wasm::bindings::astrid::io::streams::{
self as astrid_streams, HostInputStream, HostOutputStream, InputStream, OutputStream,
};
use wasmtime::component::Resource;
use wasmtime_wasi::p2::StreamError as RtStreamError;
use wasmtime_wasi::p2::bindings::sync::io::streams as wasi_streams;
type StreamResult<T> = Result<T, RtStreamError>;
fn cancel_guard(state: &HostState) -> Result<(), RtStreamError> {
if state.cancel_token.is_cancelled() {
Err(RtStreamError::Closed)
} else {
Ok(())
}
}
fn audit_io<T, E: std::fmt::Debug>(
state: &HostState,
op: &'static str,
bytes: u64,
elapsed_ms: u64,
result: &Result<T, E>,
) {
let capsule_id = state.capsule_id.as_str();
let principal = state.effective_principal();
match result {
Ok(_) => tracing::debug!(
target: "astrid.audit.io",
%capsule_id,
%principal,
fn = op,
bytes,
elapsed_ms,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.io",
%capsule_id,
%principal,
fn = op,
bytes,
elapsed_ms,
error = ?e,
"audit",
),
}
}
impl astrid_streams::Host for HostState {
fn convert_stream_error(
&mut self,
err: RtStreamError,
) -> WResult<astrid_streams::StreamError> {
use wasmtime_wasi::p2::bindings::sync::io::streams::Host as WasiHost;
let wasi_variant = WasiHost::convert_stream_error(&mut self.resource_table, err)?;
Ok(match wasi_variant {
wasi_streams::StreamError::Closed => astrid_streams::StreamError::Closed,
wasi_streams::StreamError::LastOperationFailed(e) => {
astrid_streams::StreamError::LastOperationFailed(Resource::new_own(e.rep()))
},
})
}
}
impl HostInputStream for HostState {
fn read(&mut self, self_: Resource<InputStream>, len: u64) -> StreamResult<Vec<u8>> {
cancel_guard(self)?;
let started = Instant::now();
let stream = Resource::new_borrow(self_.rep());
let result = wasi_streams::HostInputStream::read(&mut self.resource_table, stream, len);
let bytes = result.as_ref().map(|v| v.len() as u64).unwrap_or(0);
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
audit_io(
self,
"astrid:io/streams/input-stream.read",
bytes,
elapsed_ms,
&result,
);
result
}
fn blocking_read(
&mut self,
self_: Resource<InputStream>,
len: u64,
) -> StreamResult<Vec<u8>> {
cancel_guard(self)?;
let started = Instant::now();
let stream = Resource::new_borrow(self_.rep());
let result =
wasi_streams::HostInputStream::blocking_read(&mut self.resource_table, stream, len);
let bytes = result.as_ref().map(|v| v.len() as u64).unwrap_or(0);
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
audit_io(
self,
"astrid:io/streams/input-stream.blocking-read",
bytes,
elapsed_ms,
&result,
);
result
}
fn skip(&mut self, self_: Resource<InputStream>, len: u64) -> StreamResult<u64> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostInputStream::skip(&mut self.resource_table, stream, len)
}
fn blocking_skip(&mut self, self_: Resource<InputStream>, len: u64) -> StreamResult<u64> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostInputStream::blocking_skip(&mut self.resource_table, stream, len)
}
fn subscribe(&mut self, self_: Resource<InputStream>) -> WResult<Resource<Pollable>> {
let stream = Resource::new_borrow(self_.rep());
let p = wasi_streams::HostInputStream::subscribe(&mut self.resource_table, stream)?;
Ok(Resource::new_own(p.rep()))
}
fn drop(&mut self, rep: Resource<InputStream>) -> WResult<()> {
wasi_streams::HostInputStream::drop(
&mut self.resource_table,
Resource::new_own(rep.rep()),
)
}
}
impl HostOutputStream for HostState {
fn check_write(&mut self, self_: Resource<OutputStream>) -> StreamResult<u64> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostOutputStream::check_write(&mut self.resource_table, stream)
}
fn write(&mut self, self_: Resource<OutputStream>, contents: Vec<u8>) -> StreamResult<()> {
cancel_guard(self)?;
let started = Instant::now();
let bytes = contents.len() as u64;
let stream = Resource::new_borrow(self_.rep());
let result =
wasi_streams::HostOutputStream::write(&mut self.resource_table, stream, contents);
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
audit_io(
self,
"astrid:io/streams/output-stream.write",
bytes,
elapsed_ms,
&result,
);
result
}
fn blocking_write_and_flush(
&mut self,
self_: Resource<OutputStream>,
contents: Vec<u8>,
) -> StreamResult<()> {
cancel_guard(self)?;
let started = Instant::now();
let bytes = contents.len() as u64;
let stream = Resource::new_borrow(self_.rep());
let result = wasi_streams::HostOutputStream::blocking_write_and_flush(
&mut self.resource_table,
stream,
contents,
);
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
audit_io(
self,
"astrid:io/streams/output-stream.blocking-write-and-flush",
bytes,
elapsed_ms,
&result,
);
result
}
fn flush(&mut self, self_: Resource<OutputStream>) -> StreamResult<()> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostOutputStream::flush(&mut self.resource_table, stream)
}
fn blocking_flush(&mut self, self_: Resource<OutputStream>) -> StreamResult<()> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostOutputStream::blocking_flush(&mut self.resource_table, stream)
}
fn subscribe(&mut self, self_: Resource<OutputStream>) -> WResult<Resource<Pollable>> {
let stream = Resource::new_borrow(self_.rep());
let p = wasi_streams::HostOutputStream::subscribe(&mut self.resource_table, stream)?;
Ok(Resource::new_own(p.rep()))
}
fn write_zeroes(&mut self, self_: Resource<OutputStream>, len: u64) -> StreamResult<()> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostOutputStream::write_zeroes(&mut self.resource_table, stream, len)
}
fn blocking_write_zeroes_and_flush(
&mut self,
self_: Resource<OutputStream>,
len: u64,
) -> StreamResult<()> {
cancel_guard(self)?;
let stream = Resource::new_borrow(self_.rep());
wasi_streams::HostOutputStream::blocking_write_zeroes_and_flush(
&mut self.resource_table,
stream,
len,
)
}
fn splice(
&mut self,
self_: Resource<OutputStream>,
src: Resource<InputStream>,
len: u64,
) -> StreamResult<u64> {
cancel_guard(self)?;
let started = Instant::now();
let dst = Resource::new_borrow(self_.rep());
let src_borrow = Resource::new_borrow(src.rep());
let result = wasi_streams::HostOutputStream::splice(
&mut self.resource_table,
dst,
src_borrow,
len,
);
let bytes = result.as_ref().copied().unwrap_or(0);
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
audit_io(
self,
"astrid:io/streams/output-stream.splice",
bytes,
elapsed_ms,
&result,
);
result
}
fn blocking_splice(
&mut self,
self_: Resource<OutputStream>,
src: Resource<InputStream>,
len: u64,
) -> StreamResult<u64> {
cancel_guard(self)?;
let started = Instant::now();
let dst = Resource::new_borrow(self_.rep());
let src_borrow = Resource::new_borrow(src.rep());
let result = wasi_streams::HostOutputStream::blocking_splice(
&mut self.resource_table,
dst,
src_borrow,
len,
);
let bytes = result.as_ref().copied().unwrap_or(0);
let elapsed_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
audit_io(
self,
"astrid:io/streams/output-stream.blocking-splice",
bytes,
elapsed_ms,
&result,
);
result
}
fn drop(&mut self, rep: Resource<OutputStream>) -> WResult<()> {
wasi_streams::HostOutputStream::drop(
&mut self.resource_table,
Resource::new_own(rep.rep()),
)
}
}
}