use crate::FieldMap;
use crate::p2::{
WasiHttpCtxView, WasiHttpHooks,
bindings::http::types::{self, ErrorCode, Method, Scheme},
body::{HostIncomingBody, HyperIncomingBody, HyperOutgoingBody},
};
use bytes::Bytes;
use http_body_util::BodyExt;
use hyper::body::Body;
use std::time::Duration;
use wasmtime::component::Resource;
use wasmtime::{Result, bail};
use wasmtime_wasi::p2::Pollable;
use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
pub(crate) fn remove_forbidden_headers(
hooks: &mut dyn WasiHttpHooks,
headers: &mut http::HeaderMap,
) {
let forbidden_keys = Vec::from_iter(headers.keys().filter_map(|name| {
if hooks.is_forbidden_header(name) {
Some(name.clone())
} else {
None
}
}));
for name in forbidden_keys {
headers.remove(&name);
}
}
pub struct OutgoingRequestConfig {
pub use_tls: bool,
pub connect_timeout: Duration,
pub first_byte_timeout: Duration,
pub between_bytes_timeout: Duration,
}
impl From<http::Method> for types::Method {
fn from(method: http::Method) -> Self {
if method == http::Method::GET {
types::Method::Get
} else if method == hyper::Method::HEAD {
types::Method::Head
} else if method == hyper::Method::POST {
types::Method::Post
} else if method == hyper::Method::PUT {
types::Method::Put
} else if method == hyper::Method::DELETE {
types::Method::Delete
} else if method == hyper::Method::CONNECT {
types::Method::Connect
} else if method == hyper::Method::OPTIONS {
types::Method::Options
} else if method == hyper::Method::TRACE {
types::Method::Trace
} else if method == hyper::Method::PATCH {
types::Method::Patch
} else {
types::Method::Other(method.to_string())
}
}
}
impl TryInto<http::Method> for types::Method {
type Error = http::method::InvalidMethod;
fn try_into(self) -> Result<http::Method, Self::Error> {
match self {
Method::Get => Ok(http::Method::GET),
Method::Head => Ok(http::Method::HEAD),
Method::Post => Ok(http::Method::POST),
Method::Put => Ok(http::Method::PUT),
Method::Delete => Ok(http::Method::DELETE),
Method::Connect => Ok(http::Method::CONNECT),
Method::Options => Ok(http::Method::OPTIONS),
Method::Trace => Ok(http::Method::TRACE),
Method::Patch => Ok(http::Method::PATCH),
Method::Other(s) => http::Method::from_bytes(s.as_bytes()),
}
}
}
#[derive(Debug)]
pub struct HostIncomingRequest {
pub(crate) method: http::method::Method,
pub(crate) uri: http::uri::Uri,
pub(crate) headers: FieldMap,
pub(crate) scheme: Scheme,
pub(crate) authority: String,
pub body: Option<HostIncomingBody>,
}
impl WasiHttpCtxView<'_> {
pub fn new_incoming_request<B>(
&mut self,
scheme: Scheme,
req: hyper::Request<B>,
) -> wasmtime::Result<Resource<HostIncomingRequest>>
where
B: Body<Data = Bytes> + Send + 'static,
B::Error: Into<ErrorCode>,
{
let (mut parts, body) = req.into_parts();
let body = body.map_err(Into::into).boxed_unsync();
let body = HostIncomingBody::new(
body,
std::time::Duration::from_millis(600 * 1000),
);
let authority = match parts.uri.authority() {
Some(authority) => authority.to_string(),
None => match parts.headers.get(http::header::HOST) {
Some(host) => host.to_str()?.to_string(),
None => bail!("invalid HTTP request missing authority in URI and host header"),
},
};
remove_forbidden_headers(self.hooks, &mut parts.headers);
let headers = FieldMap::new_immutable(parts.headers);
let req = HostIncomingRequest {
method: parts.method,
uri: parts.uri,
headers,
authority,
scheme,
body: Some(body),
};
Ok(self.table.push(req)?)
}
}
pub struct HostResponseOutparam {
pub result:
tokio::sync::oneshot::Sender<Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>>,
}
impl WasiHttpCtxView<'_> {
pub fn new_response_outparam(
&mut self,
result: tokio::sync::oneshot::Sender<
Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>,
>,
) -> wasmtime::Result<Resource<HostResponseOutparam>> {
let id = self.table.push(HostResponseOutparam { result })?;
Ok(id)
}
}
pub struct HostOutgoingResponse {
pub status: http::StatusCode,
pub headers: FieldMap,
pub body: Option<HyperOutgoingBody>,
}
impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {
type Error = http::Error;
fn try_from(
resp: HostOutgoingResponse,
) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error> {
use http_body_util::Empty;
let mut builder = hyper::Response::builder().status(resp.status);
*builder.headers_mut().unwrap() = resp.headers.into();
match resp.body {
Some(body) => builder.body(body),
None => builder.body(
Empty::<bytes::Bytes>::new()
.map_err(|_| unreachable!("Infallible error"))
.boxed_unsync(),
),
}
}
}
#[derive(Debug)]
pub struct HostOutgoingRequest {
pub method: Method,
pub scheme: Option<Scheme>,
pub authority: Option<String>,
pub path_with_query: Option<String>,
pub headers: FieldMap,
pub body: Option<HyperOutgoingBody>,
}
#[derive(Debug, Default)]
pub struct HostRequestOptions {
pub connect_timeout: Option<std::time::Duration>,
pub first_byte_timeout: Option<std::time::Duration>,
pub between_bytes_timeout: Option<std::time::Duration>,
}
#[derive(Debug)]
pub struct HostIncomingResponse {
pub status: u16,
pub headers: FieldMap,
pub body: Option<HostIncomingBody>,
}
pub type FutureIncomingResponseHandle =
AbortOnDropJoinHandle<wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>>;
#[derive(Debug)]
pub struct IncomingResponse {
pub resp: hyper::Response<HyperIncomingBody>,
pub worker: Option<AbortOnDropJoinHandle<()>>,
pub between_bytes_timeout: std::time::Duration,
}
#[derive(Debug)]
pub enum HostFutureIncomingResponse {
Pending(FutureIncomingResponseHandle),
Ready(wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>),
Consumed,
}
impl HostFutureIncomingResponse {
pub fn pending(handle: FutureIncomingResponseHandle) -> Self {
Self::Pending(handle)
}
pub fn ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self {
Self::Ready(result)
}
pub fn is_ready(&self) -> bool {
matches!(self, Self::Ready(_))
}
pub fn unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>> {
match self {
Self::Ready(res) => res,
Self::Pending(_) | Self::Consumed => {
panic!("unwrap_ready called on a pending HostFutureIncomingResponse")
}
}
}
}
#[async_trait::async_trait]
impl Pollable for HostFutureIncomingResponse {
async fn ready(&mut self) {
if let Self::Pending(handle) = self {
*self = Self::Ready(handle.await);
}
}
}