use {
crate::component::{
bindings::fastly::compute::{http_body, types},
compute::headers,
},
crate::{
body::Body,
error::Error,
linking::{ComponentCtx, SessionView},
},
http::header::{HeaderName, HeaderValue},
wasmtime::component::Resource,
};
pub const MAX_HEADER_NAME_LEN: usize = (1 << 16) - 1;
impl http_body::Host for ComponentCtx {
fn new(&mut self) -> Result<Resource<http_body::Body>, types::Error> {
Ok(self.session_mut().insert_body(Body::empty()).into())
}
async fn write(
&mut self,
h: Resource<http_body::Body>,
buf: Vec<u8>,
) -> Result<u32, types::Error> {
let h = h.into();
let buf = buf.as_slice();
if self.session().is_streaming_body(h) {
let body = self.session_mut().streaming_body_mut(h)?;
body.send_chunk(buf).await?;
} else {
let body = self.session_mut().body_mut(h)?;
body.push_back(buf);
}
Ok(buf
.len()
.try_into()
.expect("the buffer length must fit into a u32"))
}
async fn write_front(
&mut self,
h: Resource<http_body::Body>,
buf: Vec<u8>,
) -> Result<(), types::Error> {
let h = h.into();
let buf = buf.as_slice();
if self.session().is_streaming_body(h) {
return Err(Error::Unsupported {
msg: "can only write to the end of a streaming body",
}
.into());
}
let body = self.session_mut().body_mut(h)?;
body.push_front(buf);
Ok(())
}
async fn append(
&mut self,
dest: Resource<http_body::Body>,
src: Resource<http_body::Body>,
) -> Result<(), types::Error> {
let src = self.session_mut().take_body(src.into())?;
let dest = dest.into();
if self.session().is_streaming_body(dest) {
let dest = self.session_mut().streaming_body_mut(dest)?;
for chunk in src {
dest.send_chunk(chunk).await?;
}
} else {
let dest = self.session_mut().body_mut(dest)?;
dest.append(src);
}
Ok(())
}
async fn read(
&mut self,
h: Resource<http_body::Body>,
chunk_size: u32,
) -> Result<Vec<u8>, types::Error> {
let h = h.into();
let body = self.session_mut().body_mut(h)?;
let mut buffer = vec![0; chunk_size as usize];
let len = body.read(&mut buffer).await?;
buffer.truncate(len);
Ok(buffer)
}
fn close(&mut self, h: Resource<http_body::Body>) -> Result<(), types::Error> {
let h = h.into();
if self.session().is_streaming_body(h) {
self.session_mut().take_streaming_body(h)?.finish()?;
Ok(())
} else {
Ok(self.session_mut().drop_body(h)?)
}
}
fn get_known_length(&mut self, h: Resource<http_body::Body>) -> Option<u64> {
let h = h.into();
if self.session().is_streaming_body(h) {
None
} else {
self.session_mut().body_mut(h).unwrap().len()
}
}
fn append_trailer(
&mut self,
h: Resource<http_body::Body>,
name: String,
value: Vec<u8>,
) -> Result<(), types::Error> {
let h = h.into();
if self.session().is_streaming_body(h) {
let body = self.session_mut().streaming_body_mut(h)?;
let name = HeaderName::from_bytes(name.as_bytes())?;
let value = HeaderValue::from_bytes(value.as_slice())?;
body.append_trailer(name, value);
Ok(())
} else {
let trailers = &mut self.session_mut().body_mut(h)?.trailers;
if name.len() > MAX_HEADER_NAME_LEN {
return Err(Error::InvalidArgument.into());
}
let name = HeaderName::from_bytes(name.as_bytes())?;
let value = HeaderValue::from_bytes(value.as_slice())?;
trailers.append(name, value);
Ok(())
}
}
fn get_trailer_names(
&mut self,
h: Resource<http_body::Body>,
max_len: u64,
cursor: u32,
) -> Result<(String, Option<u32>), http_body::TrailerError> {
let h = h.into();
if self.session().is_streaming_body(h) {
return Err(Error::InvalidArgument.into());
}
let body = self.session_mut().body_mut(h)?;
if !body.trailers_ready {
return Err(http_body::TrailerError::NotAvailableYet);
}
let trailers = &body.trailers;
let (buf, next) = headers::get_names(trailers.keys(), max_len, cursor)?;
Ok((buf, next))
}
fn get_trailer_value(
&mut self,
h: Resource<http_body::Body>,
name: String,
max_len: u64,
) -> Result<Option<Vec<u8>>, http_body::TrailerError> {
let h = h.into();
if self.session().is_streaming_body(h) {
return Err(Error::InvalidArgument.into());
}
let body = self.session_mut().body_mut(h)?;
if !body.trailers_ready {
return Err(http_body::TrailerError::NotAvailableYet);
}
let trailers = &mut body.trailers;
if name.len() > MAX_HEADER_NAME_LEN {
return Err(Error::InvalidArgument.into());
}
let value = {
let name = HeaderName::from_bytes(name.as_bytes())?;
if let Some(value) = trailers.get(&name) {
value
} else {
return Ok(None);
}
};
if value.len() > max_len as usize {
return Err(Error::BufferLengthError {
buf: "value",
len: "value_max_len",
}
.into());
}
Ok(Some(value.as_bytes().to_owned()))
}
fn get_trailer_values(
&mut self,
h: Resource<http_body::Body>,
name: String,
max_len: u64,
cursor: u32,
) -> Result<(Vec<u8>, Option<u32>), http_body::TrailerError> {
let h = h.into();
if self.session().is_streaming_body(h) {
return Err(Error::InvalidArgument.into());
}
let body = self.session_mut().body_mut(h).unwrap();
if !body.trailers_ready {
return Err(http_body::TrailerError::NotAvailableYet);
}
let trailers = &mut body.trailers;
let (buf, next) = headers::get_values(trailers, &name, max_len, cursor)?;
Ok((buf, next))
}
}
impl<T: Into<types::Error>> From<T> for http_body::TrailerError {
fn from(err: T) -> Self {
Self::Error(err.into())
}
}