use crate::FieldMap;
use crate::get_content_length;
use crate::p2::bindings::http::types::{self, Method, Scheme, StatusCode, Trailers};
use crate::p2::body::{HostFutureTrailers, HostIncomingBody, HostOutgoingBody, StreamContext};
use crate::p2::types::{
HostFutureIncomingResponse, HostIncomingRequest, HostIncomingResponse, HostOutgoingRequest,
HostOutgoingResponse, HostResponseOutparam, remove_forbidden_headers,
};
use crate::p2::{HeaderError, HeaderResult, HttpError, HttpResult, WasiHttpCtxView};
use http::{HeaderName, HeaderValue};
use std::str::FromStr;
use wasmtime::component::Resource;
use wasmtime::{error::Context as _, format_err};
use wasmtime_wasi::p2::{DynInputStream, DynOutputStream, DynPollable};
impl types::Host for WasiHttpCtxView<'_> {
fn convert_error_code(&mut self, err: HttpError) -> wasmtime::Result<types::ErrorCode> {
err.downcast()
}
fn convert_header_error(&mut self, err: HeaderError) -> wasmtime::Result<types::HeaderError> {
err.downcast()
}
fn http_error_code(
&mut self,
err: wasmtime::component::Resource<types::IoError>,
) -> wasmtime::Result<Option<types::ErrorCode>> {
let e = self.table.get(&err)?;
Ok(e.downcast_ref::<types::ErrorCode>().cloned())
}
}
impl types::HostFields for WasiHttpCtxView<'_> {
fn new(&mut self) -> wasmtime::Result<Resource<FieldMap>> {
let limit = self.ctx.field_size_limit;
let id = self
.table
.push(FieldMap::new_mutable(limit))
.context("[new_fields] pushing fields")?;
Ok(id)
}
fn from_list(&mut self, entries: Vec<(String, Vec<u8>)>) -> HeaderResult<Resource<FieldMap>> {
let mut fields = FieldMap::new_mutable(self.ctx.field_size_limit);
for (header, value) in entries {
let header = HeaderName::from_bytes(header.as_bytes())?;
if self.hooks.is_forbidden_header(&header) {
return Err(types::HeaderError::Forbidden.into());
}
let value = HeaderValue::from_bytes(&value)?;
fields.append(header, value)?;
}
Ok(self.table.push(fields)?)
}
fn drop(&mut self, fields: Resource<FieldMap>) -> wasmtime::Result<()> {
self.table
.delete(fields)
.context("[drop_fields] deleting fields")?;
Ok(())
}
fn get(&mut self, fields: Resource<FieldMap>, name: String) -> wasmtime::Result<Vec<Vec<u8>>> {
let fields = self.table.get(&fields)?;
let header = match HeaderName::from_bytes(name.as_bytes()) {
Ok(header) => header,
Err(_) => return Ok(vec![]),
};
if !fields.contains_key(&header) {
return Ok(vec![]);
}
let res = fields
.get_all(&header)
.into_iter()
.map(|val| val.as_bytes().to_owned())
.collect();
Ok(res)
}
fn has(&mut self, fields: Resource<FieldMap>, name: String) -> wasmtime::Result<bool> {
let fields = self.table.get(&fields)?;
match HeaderName::from_bytes(name.as_bytes()) {
Ok(header) => Ok(fields.contains_key(&header)),
Err(_) => Ok(false),
}
}
fn set(
&mut self,
fields: Resource<FieldMap>,
name: String,
byte_values: Vec<Vec<u8>>,
) -> HeaderResult<()> {
let header = HeaderName::from_bytes(name.as_bytes())?;
if self.hooks.is_forbidden_header(&header) {
return Err(types::HeaderError::Forbidden.into());
}
let mut values = Vec::with_capacity(byte_values.len());
for value in byte_values {
values.push(HeaderValue::from_bytes(&value)?);
}
let fields = self.table.get_mut(&fields)?;
fields.set(header, values)?;
Ok(())
}
fn delete(&mut self, fields: Resource<FieldMap>, name: String) -> HeaderResult<()> {
let header = HeaderName::from_bytes(name.as_bytes())?;
if self.hooks.is_forbidden_header(&header) {
return Err(types::HeaderError::Forbidden.into());
}
let fields = self.table.get_mut(&fields)?;
fields.remove_all(header)?;
Ok(())
}
fn append(
&mut self,
fields: Resource<FieldMap>,
name: String,
value: Vec<u8>,
) -> HeaderResult<()> {
let header = HeaderName::from_bytes(name.as_bytes())?;
if self.hooks.is_forbidden_header(&header) {
return Err(types::HeaderError::Forbidden.into());
}
let value = HeaderValue::from_bytes(&value)?;
let fields = self.table.get_mut(&fields)?;
fields.append(header, value)?;
Ok(())
}
fn entries(&mut self, fields: Resource<FieldMap>) -> wasmtime::Result<Vec<(String, Vec<u8>)>> {
Ok(self
.table
.get(&fields)?
.iter()
.map(|(name, value)| (name.as_str().to_owned(), value.as_bytes().to_owned()))
.collect())
}
fn clone(&mut self, fields: Resource<FieldMap>) -> wasmtime::Result<Resource<FieldMap>> {
let mut fields = self.table.get(&fields)?.clone();
fields.set_mutable(self.ctx.field_size_limit);
let id = self.table.push(fields)?;
Ok(id)
}
}
impl types::HostIncomingRequest for WasiHttpCtxView<'_> {
fn method(&mut self, id: Resource<HostIncomingRequest>) -> wasmtime::Result<Method> {
let method = self.table.get(&id)?.method.clone();
Ok(method.into())
}
fn path_with_query(
&mut self,
id: Resource<HostIncomingRequest>,
) -> wasmtime::Result<Option<String>> {
let req = self.table.get(&id)?;
Ok(req
.uri
.path_and_query()
.map(|path_and_query| path_and_query.as_str().to_owned()))
}
fn scheme(&mut self, id: Resource<HostIncomingRequest>) -> wasmtime::Result<Option<Scheme>> {
let req = self.table.get(&id)?;
Ok(Some(req.scheme.clone()))
}
fn authority(&mut self, id: Resource<HostIncomingRequest>) -> wasmtime::Result<Option<String>> {
let req = self.table.get(&id)?;
Ok(Some(req.authority.clone()))
}
fn headers(
&mut self,
id: Resource<HostIncomingRequest>,
) -> wasmtime::Result<Resource<FieldMap>> {
let req = self.table.get(&id)?;
Ok(self.table.push(req.headers.clone())?)
}
fn consume(
&mut self,
id: Resource<HostIncomingRequest>,
) -> wasmtime::Result<Result<Resource<HostIncomingBody>, ()>> {
let req = self.table.get_mut(&id)?;
match req.body.take() {
Some(body) => {
let id = self.table.push(body)?;
Ok(Ok(id))
}
None => Ok(Err(())),
}
}
fn drop(&mut self, id: Resource<HostIncomingRequest>) -> wasmtime::Result<()> {
let _ = self.table.delete(id)?;
Ok(())
}
}
impl types::HostOutgoingRequest for WasiHttpCtxView<'_> {
fn new(
&mut self,
headers: Resource<FieldMap>,
) -> wasmtime::Result<Resource<HostOutgoingRequest>> {
let mut headers = self.table.delete(headers)?;
headers.set_immutable();
self.table
.push(HostOutgoingRequest {
path_with_query: None,
authority: None,
method: types::Method::Get,
headers,
scheme: None,
body: None,
})
.context("[new_outgoing_request] pushing request")
}
fn body(
&mut self,
request: Resource<HostOutgoingRequest>,
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
let buffer_chunks = self.hooks.outgoing_body_buffer_chunks();
let chunk_size = self.hooks.outgoing_body_chunk_size();
let req = self
.table
.get_mut(&request)
.context("[outgoing_request_write] getting request")?;
if req.body.is_some() {
return Ok(Err(()));
}
let size = match get_content_length(&req.headers) {
Ok(size) => size,
Err(..) => return Ok(Err(())),
};
let (host_body, hyper_body) =
HostOutgoingBody::new(StreamContext::Request, size, buffer_chunks, chunk_size);
req.body = Some(hyper_body);
let outgoing_body = self.table.push(host_body)?;
Ok(Ok(outgoing_body))
}
fn drop(&mut self, request: Resource<HostOutgoingRequest>) -> wasmtime::Result<()> {
let _ = self.table.delete(request)?;
Ok(())
}
fn method(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
) -> wasmtime::Result<Method> {
Ok(self.table.get(&request)?.method.clone())
}
fn set_method(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
method: Method,
) -> wasmtime::Result<Result<(), ()>> {
let req = self.table.get_mut(&request)?;
if let Method::Other(s) = &method {
if let Err(_) = http::Method::from_str(s) {
return Ok(Err(()));
}
}
req.method = method;
Ok(Ok(()))
}
fn path_with_query(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
) -> wasmtime::Result<Option<String>> {
Ok(self.table.get(&request)?.path_with_query.clone())
}
fn set_path_with_query(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
path_with_query: Option<String>,
) -> wasmtime::Result<Result<(), ()>> {
let req = self.table.get_mut(&request)?;
if let Some(s) = path_with_query.as_ref() {
if let Err(_) = http::uri::PathAndQuery::from_str(s) {
return Ok(Err(()));
}
}
req.path_with_query = path_with_query;
Ok(Ok(()))
}
fn scheme(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
) -> wasmtime::Result<Option<Scheme>> {
Ok(self.table.get(&request)?.scheme.clone())
}
fn set_scheme(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
scheme: Option<Scheme>,
) -> wasmtime::Result<Result<(), ()>> {
let req = self.table.get_mut(&request)?;
if let Some(types::Scheme::Other(s)) = scheme.as_ref() {
if let Err(_) = http::uri::Scheme::from_str(s.as_str()) {
return Ok(Err(()));
}
}
req.scheme = scheme;
Ok(Ok(()))
}
fn authority(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
) -> wasmtime::Result<Option<String>> {
Ok(self.table.get(&request)?.authority.clone())
}
fn set_authority(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
authority: Option<String>,
) -> wasmtime::Result<Result<(), ()>> {
let req = self.table.get_mut(&request)?;
if let Some(s) = authority.as_ref() {
if let Err(_) = http::uri::Authority::from_str(s.as_str()) {
return Ok(Err(()));
}
}
req.authority = authority;
Ok(Ok(()))
}
fn headers(
&mut self,
request: wasmtime::component::Resource<types::OutgoingRequest>,
) -> wasmtime::Result<wasmtime::component::Resource<FieldMap>> {
let req = self.table.get(&request)?;
let id = self.table.push(req.headers.clone())?;
Ok(id)
}
}
impl types::HostResponseOutparam for WasiHttpCtxView<'_> {
fn drop(&mut self, id: Resource<HostResponseOutparam>) -> wasmtime::Result<()> {
let _ = self.table.delete(id)?;
Ok(())
}
fn set(
&mut self,
id: Resource<HostResponseOutparam>,
resp: Result<Resource<HostOutgoingResponse>, types::ErrorCode>,
) -> wasmtime::Result<()> {
let val = match resp {
Ok(resp) => Ok(self.table.delete(resp)?.try_into()?),
Err(e) => Err(e),
};
let resp = self.table.delete(id)?;
let _ = resp.result.send(val);
Ok(())
}
fn send_informational(
&mut self,
_id: Resource<HostResponseOutparam>,
_status: u16,
_headers: Resource<FieldMap>,
) -> HttpResult<()> {
Err(HttpError::trap(format_err!("not implemented")))
}
}
impl types::HostIncomingResponse for WasiHttpCtxView<'_> {
fn drop(&mut self, response: Resource<HostIncomingResponse>) -> wasmtime::Result<()> {
let _ = self
.table
.delete(response)
.context("[drop_incoming_response] deleting response")?;
Ok(())
}
fn status(&mut self, response: Resource<HostIncomingResponse>) -> wasmtime::Result<StatusCode> {
let r = self
.table
.get(&response)
.context("[incoming_response_status] getting response")?;
Ok(r.status)
}
fn headers(
&mut self,
response: Resource<HostIncomingResponse>,
) -> wasmtime::Result<Resource<FieldMap>> {
let resp = self.table.get(&response)?;
let id = self.table.push(resp.headers.clone())?;
Ok(id)
}
fn consume(
&mut self,
response: Resource<HostIncomingResponse>,
) -> wasmtime::Result<Result<Resource<HostIncomingBody>, ()>> {
let r = self
.table
.get_mut(&response)
.context("[incoming_response_consume] getting response")?;
match r.body.take() {
Some(body) => {
let id = self.table.push(body)?;
Ok(Ok(id))
}
None => Ok(Err(())),
}
}
}
impl types::HostFutureTrailers for WasiHttpCtxView<'_> {
fn drop(&mut self, id: Resource<HostFutureTrailers>) -> wasmtime::Result<()> {
let _ = self
.table
.delete(id)
.context("[drop future-trailers] deleting future-trailers")?;
Ok(())
}
fn subscribe(
&mut self,
index: Resource<HostFutureTrailers>,
) -> wasmtime::Result<Resource<DynPollable>> {
wasmtime_wasi::p2::subscribe(self.table, index)
}
fn get(
&mut self,
id: Resource<HostFutureTrailers>,
) -> wasmtime::Result<Option<Result<Result<Option<Resource<Trailers>>, types::ErrorCode>, ()>>>
{
let trailers = self.table.get_mut(&id)?;
match trailers {
HostFutureTrailers::Waiting { .. } => return Ok(None),
HostFutureTrailers::Consumed => return Ok(Some(Err(()))),
HostFutureTrailers::Done(_) => {}
};
let res = match std::mem::replace(trailers, HostFutureTrailers::Consumed) {
HostFutureTrailers::Done(res) => res,
_ => unreachable!(),
};
let mut fields = match res {
Ok(Some(fields)) => fields,
Ok(None) => return Ok(Some(Ok(Ok(None)))),
Err(e) => return Ok(Some(Ok(Err(e)))),
};
remove_forbidden_headers(self.hooks, &mut fields);
let ts = self.table.push(FieldMap::new_immutable(fields))?;
Ok(Some(Ok(Ok(Some(ts)))))
}
}
impl types::HostIncomingBody for WasiHttpCtxView<'_> {
fn stream(
&mut self,
id: Resource<HostIncomingBody>,
) -> wasmtime::Result<Result<Resource<DynInputStream>, ()>> {
let body = self.table.get_mut(&id)?;
if let Some(stream) = body.take_stream() {
let stream: DynInputStream = Box::new(stream);
let stream = self.table.push_child(stream, &id)?;
return Ok(Ok(stream));
}
Ok(Err(()))
}
fn finish(
&mut self,
id: Resource<HostIncomingBody>,
) -> wasmtime::Result<Resource<HostFutureTrailers>> {
let body = self.table.delete(id)?;
let trailers = self.table.push(body.into_future_trailers())?;
Ok(trailers)
}
fn drop(&mut self, id: Resource<HostIncomingBody>) -> wasmtime::Result<()> {
let _ = self.table.delete(id)?;
Ok(())
}
}
impl types::HostOutgoingResponse for WasiHttpCtxView<'_> {
fn new(
&mut self,
headers: Resource<FieldMap>,
) -> wasmtime::Result<Resource<HostOutgoingResponse>> {
let mut fields = self.table.delete(headers)?;
fields.set_immutable();
let id = self.table.push(HostOutgoingResponse {
status: http::StatusCode::OK,
headers: fields,
body: None,
})?;
Ok(id)
}
fn body(
&mut self,
id: Resource<HostOutgoingResponse>,
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
let buffer_chunks = self.hooks.outgoing_body_buffer_chunks();
let chunk_size = self.hooks.outgoing_body_chunk_size();
let resp = self.table.get_mut(&id)?;
if resp.body.is_some() {
return Ok(Err(()));
}
let size = match get_content_length(&resp.headers) {
Ok(size) => size,
Err(..) => return Ok(Err(())),
};
let (host, body) =
HostOutgoingBody::new(StreamContext::Response, size, buffer_chunks, chunk_size);
resp.body.replace(body);
let id = self.table.push(host)?;
Ok(Ok(id))
}
fn status_code(
&mut self,
id: Resource<HostOutgoingResponse>,
) -> wasmtime::Result<types::StatusCode> {
Ok(self.table.get(&id)?.status.into())
}
fn set_status_code(
&mut self,
id: Resource<HostOutgoingResponse>,
status: types::StatusCode,
) -> wasmtime::Result<Result<(), ()>> {
let resp = self.table.get_mut(&id)?;
match http::StatusCode::from_u16(status) {
Ok(status) => resp.status = status,
Err(_) => return Ok(Err(())),
};
Ok(Ok(()))
}
fn headers(
&mut self,
id: Resource<HostOutgoingResponse>,
) -> wasmtime::Result<Resource<FieldMap>> {
let resp = self.table.get(&id)?;
Ok(self.table.push(resp.headers.clone())?)
}
fn drop(&mut self, id: Resource<HostOutgoingResponse>) -> wasmtime::Result<()> {
let _ = self.table.delete(id)?;
Ok(())
}
}
impl types::HostFutureIncomingResponse for WasiHttpCtxView<'_> {
fn drop(&mut self, id: Resource<HostFutureIncomingResponse>) -> wasmtime::Result<()> {
let _ = self.table.delete(id)?;
Ok(())
}
fn get(
&mut self,
id: Resource<HostFutureIncomingResponse>,
) -> wasmtime::Result<
Option<Result<Result<Resource<HostIncomingResponse>, types::ErrorCode>, ()>>,
> {
let resp = self.table.get_mut(&id)?;
match resp {
HostFutureIncomingResponse::Pending(_) => return Ok(None),
HostFutureIncomingResponse::Consumed => return Ok(Some(Err(()))),
HostFutureIncomingResponse::Ready(_) => {}
}
let resp =
match std::mem::replace(resp, HostFutureIncomingResponse::Consumed).unwrap_ready() {
Err(e) => {
let e = e.downcast::<types::ErrorCode>()?;
return Ok(Some(Ok(Err(e))));
}
Ok(Ok(resp)) => resp,
Ok(Err(e)) => return Ok(Some(Ok(Err(e)))),
};
let (mut parts, body) = resp.resp.into_parts();
remove_forbidden_headers(self.hooks, &mut parts.headers);
let headers = FieldMap::new_immutable(parts.headers);
let resp = self.table.push(HostIncomingResponse {
status: parts.status.as_u16(),
headers,
body: Some({
let mut body = HostIncomingBody::new(body, resp.between_bytes_timeout);
if let Some(worker) = resp.worker {
body.retain_worker(worker);
}
body
}),
})?;
Ok(Some(Ok(Ok(resp))))
}
fn subscribe(
&mut self,
id: Resource<HostFutureIncomingResponse>,
) -> wasmtime::Result<Resource<DynPollable>> {
wasmtime_wasi::p2::subscribe(self.table, id)
}
}
impl types::HostOutgoingBody for WasiHttpCtxView<'_> {
fn write(
&mut self,
id: Resource<HostOutgoingBody>,
) -> wasmtime::Result<Result<Resource<DynOutputStream>, ()>> {
let body = self.table.get_mut(&id)?;
if let Some(stream) = body.take_output_stream() {
let id = self.table.push_child(stream, &id)?;
Ok(Ok(id))
} else {
Ok(Err(()))
}
}
fn finish(
&mut self,
id: Resource<HostOutgoingBody>,
ts: Option<Resource<Trailers>>,
) -> HttpResult<()> {
let body = self.table.delete(id)?;
let ts = if let Some(ts) = ts {
Some(self.table.delete(ts)?)
} else {
None
};
body.finish(ts)?;
Ok(())
}
fn drop(&mut self, id: Resource<HostOutgoingBody>) -> wasmtime::Result<()> {
self.table.delete(id)?.abort();
Ok(())
}
}
impl types::HostRequestOptions for WasiHttpCtxView<'_> {
fn new(&mut self) -> wasmtime::Result<Resource<types::RequestOptions>> {
let id = self.table.push(types::RequestOptions::default())?;
Ok(id)
}
fn connect_timeout(
&mut self,
opts: Resource<types::RequestOptions>,
) -> wasmtime::Result<Option<types::Duration>> {
let nanos = self.table.get(&opts)?.connect_timeout.map(|d| d.as_nanos());
if let Some(nanos) = nanos {
Ok(Some(nanos.try_into()?))
} else {
Ok(None)
}
}
fn set_connect_timeout(
&mut self,
opts: Resource<types::RequestOptions>,
duration: Option<types::Duration>,
) -> wasmtime::Result<Result<(), ()>> {
self.table.get_mut(&opts)?.connect_timeout = duration.map(std::time::Duration::from_nanos);
Ok(Ok(()))
}
fn first_byte_timeout(
&mut self,
opts: Resource<types::RequestOptions>,
) -> wasmtime::Result<Option<types::Duration>> {
let nanos = self
.table
.get(&opts)?
.first_byte_timeout
.map(|d| d.as_nanos());
if let Some(nanos) = nanos {
Ok(Some(nanos.try_into()?))
} else {
Ok(None)
}
}
fn set_first_byte_timeout(
&mut self,
opts: Resource<types::RequestOptions>,
duration: Option<types::Duration>,
) -> wasmtime::Result<Result<(), ()>> {
self.table.get_mut(&opts)?.first_byte_timeout =
duration.map(std::time::Duration::from_nanos);
Ok(Ok(()))
}
fn between_bytes_timeout(
&mut self,
opts: Resource<types::RequestOptions>,
) -> wasmtime::Result<Option<types::Duration>> {
let nanos = self
.table
.get(&opts)?
.between_bytes_timeout
.map(|d| d.as_nanos());
if let Some(nanos) = nanos {
Ok(Some(nanos.try_into()?))
} else {
Ok(None)
}
}
fn set_between_bytes_timeout(
&mut self,
opts: Resource<types::RequestOptions>,
duration: Option<types::Duration>,
) -> wasmtime::Result<Result<(), ()>> {
self.table.get_mut(&opts)?.between_bytes_timeout =
duration.map(std::time::Duration::from_nanos);
Ok(Ok(()))
}
fn drop(&mut self, rep: Resource<types::RequestOptions>) -> wasmtime::Result<()> {
let _ = self.table.delete(rep)?;
Ok(())
}
}