use std::collections::HashMap;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Result;
use bytes::Bytes;
use http_body::Body as _;
use http_body_util::BodyExt;
use http_body_util::combinators::UnsyncBoxBody;
use tokio::sync::{RwLock, mpsc};
use tokio_util::sync::PollSender;
use wasmtime::AsContextMut;
use wasmtime::StoreContextMut;
use wasmtime::component::{
Accessor, Destination, HasData, Resource, ResourceTable, Source, StreamProducer, StreamReader,
StreamResult,
};
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode;
use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
mod runtime;
use crate::events::content::InboundContent;
use crate::plugins::capabilities::Capability;
use crate::wasm::bindgen::witmproxy::plugin::capabilities::{
CapabilityKind, HostAnnotatorClient, HostAnnotatorClientWithStore, HostCapabilityProvider,
HostCapabilityProviderWithStore, HostContent, HostContentWithStore, HostLocalStorageClient,
HostLocalStorageClientWithStore, HostLogger, HostLoggerWithStore,
};
pub use runtime::Runtime;
pub mod bindgen;
pub struct CapabilityProvider {
logger: Option<Logger>,
annotator: Option<AnnotatorClient>,
local_storage: Option<LocalStorageClient>,
}
impl Default for CapabilityProvider {
fn default() -> Self {
Self {
logger: None,
annotator: None,
local_storage: None,
}
}
}
impl CapabilityProvider {
pub fn new() -> Self {
Self::default()
}
pub fn with_logger(mut self, logger: Logger) -> Self {
self.logger = Some(logger);
self
}
pub fn with_annotator(mut self, annotator: AnnotatorClient) -> Self {
self.annotator = Some(annotator);
self
}
pub fn with_local_storage(mut self, local_storage: LocalStorageClient) -> Self {
self.local_storage = Some(local_storage);
self
}
pub fn logger(&self) -> Option<Logger> {
self.logger.clone()
}
pub fn annotator(&self) -> Option<AnnotatorClient> {
self.annotator.clone()
}
pub fn local_storage(&self) -> Option<LocalStorageClient> {
self.local_storage.clone()
}
}
impl From<&Vec<Capability>> for CapabilityProvider {
fn from(capabilities: &Vec<Capability>) -> Self {
let mut provider = CapabilityProvider::new();
for cap in capabilities {
if cap.granted {
match &cap.inner.kind {
CapabilityKind::Logger => {
provider = provider.with_logger(Logger::new());
}
CapabilityKind::Annotator => {
provider = provider.with_annotator(AnnotatorClient::new());
}
CapabilityKind::LocalStorage => {
provider = provider.with_local_storage(LocalStorageClient::new());
}
CapabilityKind::HandleEvent(_) => {
}
}
}
}
provider
}
}
#[derive(Clone)]
pub struct AnnotatorClient {}
impl AnnotatorClient {
pub fn new() -> Self {
Self {}
}
pub fn annotate(&self, _content: &InboundContent) {}
}
impl Default for AnnotatorClient {
fn default() -> Self {
Self::new()
}
}
pub struct BodyStreamProducer {
body: UnsyncBoxBody<Bytes, ErrorCode>,
}
impl BodyStreamProducer {
pub fn new(body: UnsyncBoxBody<Bytes, ErrorCode>) -> Self {
Self { body }
}
}
impl<D> StreamProducer<D> for BodyStreamProducer
where
D: 'static,
{
type Item = u8;
type Buffer = Cursor<Bytes>;
fn poll_produce<'a>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut store: StoreContextMut<'a, D>,
mut dst: Destination<'a, Self::Item, Self::Buffer>,
finish: bool,
) -> Poll<wasmtime::Result<StreamResult>> {
use core::num::NonZeroUsize;
let cap = match dst.remaining(&mut store).map(NonZeroUsize::new) {
Some(Some(cap)) => Some(cap),
Some(None) => {
if self.body.is_end_stream() {
return Poll::Ready(Ok(StreamResult::Dropped));
} else {
return Poll::Ready(Ok(StreamResult::Completed));
}
}
None => None,
};
match Pin::new(&mut self.body).poll_frame(cx) {
Poll::Ready(Some(Ok(frame))) => {
match frame.into_data().map_err(http_body::Frame::into_trailers) {
Ok(mut data_frame) => {
if let Some(cap) = cap {
let n = data_frame.len();
let cap_usize = cap.into();
if n > cap_usize {
dst.set_buffer(Cursor::new(data_frame.split_off(cap_usize)));
let mut dst_direct = dst.as_direct(store, cap_usize);
dst_direct.remaining().copy_from_slice(&data_frame);
dst_direct.mark_written(cap_usize);
} else {
let mut dst_direct = dst.as_direct(store, n);
dst_direct.remaining()[..n].copy_from_slice(&data_frame);
dst_direct.mark_written(n);
}
} else {
dst.set_buffer(Cursor::new(data_frame));
}
Poll::Ready(Ok(StreamResult::Completed))
}
Err(Ok(_trailers)) => {
Poll::Ready(Ok(StreamResult::Dropped))
}
Err(Err(..)) => {
Poll::Ready(Ok(StreamResult::Dropped))
}
}
}
Poll::Ready(Some(Err(_err))) => Poll::Ready(Ok(StreamResult::Dropped)),
Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Clone)]
pub struct Logger {}
impl Logger {
pub fn new() -> Self {
Self {}
}
pub fn info(&self, message: String) {
tracing::info!("{}", message);
}
pub fn warn(&self, message: String) {
tracing::warn!("{}", message);
}
pub fn error(&self, message: String) {
tracing::error!("{}", message);
}
pub fn debug(&self, message: String) {
tracing::debug!("{}", message);
}
}
impl Default for Logger {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct LocalStorageClient {
store: Arc<RwLock<HashMap<String, Bytes>>>,
}
impl Default for LocalStorageClient {
fn default() -> Self {
Self::new()
}
}
impl LocalStorageClient {
pub fn new() -> Self {
Self {
store: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn set(&self, key: String, value: Vec<u8>) {
self.store.write().await.insert(key, Bytes::from(value));
}
pub async fn get(&self, key: &str) -> Option<Bytes> {
self.store.read().await.get(key).cloned()
}
pub async fn delete(&self, key: &str) {
self.store.write().await.remove(key);
}
}
#[derive(Default)]
pub struct WitmProxyCtxBuilder {
}
impl WitmProxyCtxBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn build(self) -> WitmProxyCtx {
WitmProxyCtx {
}
}
}
pub struct WitmProxyCtx {
}
impl WitmProxyCtx {
pub fn builder() -> WitmProxyCtxBuilder {
WitmProxyCtxBuilder::new()
}
}
pub struct WitmProxyCtxView<'a> {
_ctx: &'a WitmProxyCtx,
pub table: &'a mut ResourceTable,
}
impl<'a> WitmProxyCtxView<'a> {
pub fn new(ctx: &'a WitmProxyCtx, table: &'a mut ResourceTable) -> Self {
Self { _ctx: ctx, table }
}
}
pub struct Host {
pub table: ResourceTable,
pub wasi: WasiCtx,
pub http: WasiHttpCtx,
pub p3_http: P3Ctx,
pub witmproxy_ctx: WitmProxyCtx,
}
impl Default for Host {
fn default() -> Self {
Self {
table: ResourceTable::new(),
wasi: WasiCtxBuilder::new().build(),
http: WasiHttpCtx::new(),
p3_http: P3Ctx {},
witmproxy_ctx: WitmProxyCtxBuilder::new().build(),
}
}
}
impl HostContentWithStore for WitmProxy {
async fn drop<T>(
accessor: &Accessor<T, Self>,
rep: wasmtime::component::Resource<InboundContent>,
) -> wasmtime::Result<()> {
accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
state.table.delete(rep)
})?;
Ok(())
}
async fn content_type<T>(
accessor: &Accessor<T, Self>,
self_: wasmtime::component::Resource<InboundContent>,
) -> wasmtime::Result<String> {
let content_type = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let content = state.table.get(&self_)?;
Ok::<String, wasmtime::component::ResourceTableError>(content.content_type())
})?;
Ok(content_type)
}
async fn body<T>(
accessor: &wasmtime::component::Accessor<T, Self>,
self_: wasmtime::component::Resource<InboundContent>,
) -> wasmtime::Result<wasmtime::component::StreamReader<u8>> {
let data = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let content = state.table.get_mut(&self_)?;
Ok::<Option<UnsyncBoxBody<Bytes, ErrorCode>>, wasmtime::component::ResourceTableError>(
content.body().unwrap_or(None),
)
})?;
let body = data.ok_or_else(|| {
wasmtime::Error::msg(
"Content data has already been consumed. Use set_data to refill it.",
)
})?;
let reader = accessor.with(|mut access| {
let store = &mut access.as_context_mut();
let stream_reader = StreamReader::new(store, BodyStreamProducer::new(body));
Ok::<wasmtime::component::StreamReader<u8>, wasmtime::component::ResourceTableError>(
stream_reader,
)
})?;
Ok(reader)
}
async fn set_body<T>(
accessor: &wasmtime::component::Accessor<T, Self>,
self_: wasmtime::component::Resource<InboundContent>,
content: wasmtime::component::StreamReader<u8>,
) -> wasmtime::Result<()> {
use http_body::Frame;
use http_body_util::StreamBody;
let (tx, rx) = mpsc::channel::<Result<Frame<Bytes>, ErrorCode>>(65536);
let body = StreamBody::new(tokio_stream::wrappers::ReceiverStream::new(rx)).boxed_unsync();
accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let content = state.table.get_mut(&self_)?;
content.set_body(body);
Ok::<(), wasmtime::component::ResourceTableError>(())
})?;
struct ChannelStreamConsumer {
tx: PollSender<Result<Frame<Bytes>, ErrorCode>>,
}
impl<D> wasmtime::component::StreamConsumer<D> for ChannelStreamConsumer {
type Item = u8;
fn poll_consume(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
store: StoreContextMut<D>,
source: Source<Self::Item>,
finish: bool,
) -> Poll<wasmtime::Result<StreamResult>> {
match self.tx.poll_reserve(cx) {
Poll::Ready(Ok(())) => {
let mut src = source.as_direct(store);
let buf = src.remaining();
let n = buf.len();
if n > 0 {
let buf = Bytes::copy_from_slice(buf);
match self.tx.send_item(Ok(Frame::data(buf))) {
Ok(()) => {
src.mark_read(n);
Poll::Ready(Ok(StreamResult::Completed))
}
Err(..) => {
Poll::Ready(Ok(StreamResult::Dropped))
}
}
} else {
Poll::Ready(Ok(StreamResult::Completed))
}
}
Poll::Ready(Err(..)) => {
Poll::Ready(Ok(StreamResult::Dropped))
}
Poll::Pending if finish => {
Poll::Ready(Ok(StreamResult::Cancelled))
}
Poll::Pending => Poll::Pending,
}
}
}
let poll_sender = PollSender::new(tx);
accessor.with(|mut access| {
content.pipe(&mut access, ChannelStreamConsumer { tx: poll_sender });
});
Ok(())
}
}
impl HostLocalStorageClientWithStore for WitmProxy {
async fn set<T>(
accessor: &Accessor<T, Self>,
self_: Resource<LocalStorageClient>,
key: String,
value: Vec<u8>,
) -> wasmtime::Result<()> {
let client = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let client = state.table.get(&self_)?;
Ok::<LocalStorageClient, wasmtime::component::ResourceTableError>(client.clone())
})?;
client.set(key, value).await;
Ok(())
}
async fn get<T>(
accessor: &Accessor<T, Self>,
self_: Resource<LocalStorageClient>,
key: String,
) -> wasmtime::Result<Option<Vec<u8>>> {
let client = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let client = state.table.get(&self_)?;
Ok::<LocalStorageClient, wasmtime::component::ResourceTableError>(client.clone())
})?;
Ok(client.get(&key).await.map(|bytes| bytes.to_vec()))
}
async fn delete<T>(
accessor: &Accessor<T, Self>,
self_: Resource<LocalStorageClient>,
key: String,
) -> wasmtime::Result<()> {
let client = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let client = state.table.get(&self_)?;
Ok::<LocalStorageClient, wasmtime::component::ResourceTableError>(client.clone())
})?;
client.delete(&key).await;
Ok(())
}
async fn drop<T>(
accessor: &Accessor<T, Self>,
rep: Resource<LocalStorageClient>,
) -> wasmtime::Result<()> {
accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
state.table.delete(rep)
})?;
Ok(())
}
}
impl HostAnnotatorClientWithStore for WitmProxy {
async fn annotate<T>(
accessor: &Accessor<T, Self>,
self_: Resource<AnnotatorClient>,
content: Resource<InboundContent>,
) -> wasmtime::Result<()> {
let _ = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let annotator = state.table.get(&self_)?;
let content = state.table.get(&content)?;
annotator.annotate(content);
Ok::<(), wasmtime::component::ResourceTableError>(())
});
Ok(())
}
async fn drop<T>(
accessor: &Accessor<T, Self>,
rep: Resource<AnnotatorClient>,
) -> wasmtime::Result<()> {
accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
state.table.delete(rep)
})?;
Ok(())
}
}
impl HostLoggerWithStore for WitmProxy {
async fn info<T>(
accessor: &Accessor<T, Self>,
self_: Resource<Logger>,
message: String,
) -> wasmtime::Result<()> {
let _ = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let logger = state.table.get(&self_)?;
logger.info(message);
Ok::<(), wasmtime::component::ResourceTableError>(())
});
Ok(())
}
async fn warn<T>(
accessor: &Accessor<T, Self>,
self_: Resource<Logger>,
message: String,
) -> wasmtime::Result<()> {
let _ = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let logger = state.table.get(&self_)?;
logger.warn(message);
Ok::<(), wasmtime::component::ResourceTableError>(())
})?;
Ok(())
}
async fn error<T>(
accessor: &Accessor<T, Self>,
self_: Resource<Logger>,
message: String,
) -> wasmtime::Result<()> {
let _ = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let logger = state.table.get(&self_)?;
logger.error(message);
Ok::<(), wasmtime::component::ResourceTableError>(())
})?;
Ok(())
}
async fn debug<T>(
accessor: &Accessor<T, Self>,
self_: Resource<Logger>,
message: String,
) -> wasmtime::Result<()> {
let _ = accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let logger = state.table.get(&self_)?;
logger.debug(message);
Ok::<(), wasmtime::component::ResourceTableError>(())
})?;
Ok(())
}
async fn drop<T>(accessor: &Accessor<T, Self>, rep: Resource<Logger>) -> wasmtime::Result<()> {
accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
state.table.delete(rep)
})?;
Ok(())
}
}
impl HostCapabilityProviderWithStore for WitmProxy {
async fn logger<T>(
accessor: &Accessor<T, Self>,
cap: Resource<CapabilityProvider>,
) -> wasmtime::Result<Option<Resource<Logger>>> {
Ok(accessor
.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let provider = state.table.get(&cap)?;
match provider.logger() {
Some(logger) => Ok::<
Option<Resource<Logger>>,
wasmtime::component::ResourceTableError,
>(Some(state.table.push(logger)?)),
None => Ok(None),
}
})
.unwrap_or(None))
}
async fn local_storage<T>(
accessor: &Accessor<T, Self>,
cap: Resource<CapabilityProvider>,
) -> wasmtime::Result<Option<Resource<LocalStorageClient>>> {
Ok(accessor
.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let provider = state.table.get(&cap)?;
match provider.local_storage() {
Some(client) => Ok::<
Option<Resource<LocalStorageClient>>,
wasmtime::component::ResourceTableError,
>(Some(state.table.push(client)?)),
None => Ok(None),
}
})
.unwrap_or(None))
}
async fn annotator<T>(
accessor: &Accessor<T, Self>,
cap: Resource<CapabilityProvider>,
) -> wasmtime::Result<Option<Resource<AnnotatorClient>>> {
Ok(accessor
.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
let provider = state.table.get(&cap)?;
match provider.annotator() {
Some(client) => Ok::<
Option<Resource<AnnotatorClient>>,
wasmtime::component::ResourceTableError,
>(Some(state.table.push(client)?)),
None => Ok(None),
}
})
.unwrap_or(None))
}
async fn drop<T>(
accessor: &Accessor<T, Self>,
rep: Resource<CapabilityProvider>,
) -> wasmtime::Result<()> {
accessor.with(|mut access| {
let state: &mut WitmProxyCtxView = &mut access.get();
state.table.delete(rep)
})?;
Ok(())
}
}
impl bindgen::witmproxy::plugin::capabilities::Host for WitmProxyCtxView<'_> {}
impl HostContent for WitmProxyCtxView<'_> {}
impl HostCapabilityProvider for WitmProxyCtxView<'_> {}
impl HostLocalStorageClient for WitmProxyCtxView<'_> {}
impl HostAnnotatorClient for WitmProxyCtxView<'_> {}
impl HostLogger for WitmProxyCtxView<'_> {}
pub struct P3Ctx {}
impl wasmtime_wasi_http::p3::WasiHttpCtx for P3Ctx {}
impl WasiView for Host {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.wasi,
table: &mut self.table,
}
}
}
impl WasiHttpView for Host {
fn ctx(&mut self) -> &mut WasiHttpCtx {
&mut self.http
}
fn table(&mut self) -> &mut ResourceTable {
&mut self.table
}
}
impl wasmtime_wasi_http::p3::WasiHttpView for Host {
fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
wasmtime_wasi_http::p3::WasiHttpCtxView {
table: &mut self.table,
ctx: &mut self.p3_http,
}
}
}
pub fn add_to_linker<T: Send + 'static>(
l: &mut wasmtime::component::Linker<T>,
f: fn(&mut T) -> WitmProxyCtxView<'_>,
) -> Result<()> {
bindgen::witmproxy::plugin::capabilities::add_to_linker::<_, WitmProxy>(l, f)?;
Ok(())
}
struct WitmProxy;
impl HasData for WitmProxy {
type Data<'a> = WitmProxyCtxView<'a>;
}