use crate::{
abi::{self, FastlyStatus, MultiValueHostcallError},
error::{BufferSizeError, HandleError, HandleKind},
experimental::{BodyHandleError, BodyHandleExt},
};
use bytes::BytesMut;
use fastly_shared::BodyWriteEnd;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
io::{BufReader, Read, Write},
mem::ManuallyDrop,
};
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct BodyHandle {
pub(crate) handle: u32,
}
pub(crate) static GOT_CLIENT_BODY: AtomicBool = AtomicBool::new(false);
impl BodyHandle {
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub const INVALID: Self = Self {
handle: fastly_shared::INVALID_BODY_HANDLE,
};
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub const fn is_valid(&self) -> bool {
!self.is_invalid()
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub const fn is_invalid(&self) -> bool {
self.handle == fastly_shared::INVALID_BODY_HANDLE
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn from_u32(handle: u32) -> Self {
Self { handle }
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn as_u32(&self) -> u32 {
self.handle
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub unsafe fn as_u32_mut(&mut self) -> &mut u32 {
&mut self.handle
}
#[cfg_attr(
not(target_env = "p1"),
deprecated(
since = "0.11.6",
note = "This code will need to be updated for wasip2."
)
)]
pub fn into_u32(self) -> u32 {
unsafe { ManuallyDrop::new(self).as_u32() }
}
pub(crate) fn set_got_client() {
if GOT_CLIENT_BODY.swap(true, Ordering::SeqCst) {
panic!("cannot get more than one handle to the client body per execution");
}
}
pub fn from_client() -> Self {
Self::set_got_client();
let mut handle = BodyHandle::INVALID;
let status = unsafe {
abi::fastly_http_req::body_downstream_get(std::ptr::null_mut(), handle.as_u32_mut())
};
match status.result().map(|_| handle) {
Ok(h) if h.is_valid() => h,
_ => panic!("fastly_http_req::body_downstream_get failed"),
}
}
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut handle = BodyHandle::INVALID;
let status = unsafe { abi::fastly_http_body::new(handle.as_u32_mut()) };
match status.result().map(|_| handle) {
Ok(h) if h.is_valid() => h,
_ => panic!("fastly_http_body::new failed"),
}
}
#[doc = include_str!("../../../docs/snippets/body-append-constant-time.md")]
pub fn append(&mut self, other: BodyHandle) {
unsafe { abi::fastly_http_body::append(self.as_u32(), other.into_u32()) }
.result()
.expect("fastly_http_body::append failed")
}
#[doc = include_str!("../../../docs/snippets/buffers-body-handle.md")]
pub fn into_bytes(self) -> Vec<u8> {
let mut body = vec![];
let mut bufread = BufReader::new(self);
bufread
.read_to_end(&mut body)
.expect("fastly_http_body::read failed");
body
}
#[doc = include_str!("../../../docs/snippets/buffers-body-handle.md")]
#[doc = include_str!("../../../docs/snippets/panics-body-utf8.md")]
pub fn into_string(self) -> String {
let mut body = String::new();
let mut bufread = BufReader::new(self);
bufread
.read_to_string(&mut body)
.expect("fastly_http_body::read failed");
body
}
pub(crate) fn write_front(&mut self, bytes: &[u8]) -> usize {
let mut nwritten = 0;
let status = unsafe {
abi::fastly_http_body::write(
self.as_u32(),
bytes.as_ptr(),
bytes.len(),
BodyWriteEnd::Front,
&mut nwritten,
)
};
assert!(status.is_ok(), "fastly_http_body::write_front failed");
assert!(
nwritten == bytes.len(),
"fastly_http_body::write_front didn't fully write"
);
nwritten
}
pub fn close(self) -> Result<(), HandleError> {
match unsafe { abi::fastly_http_body::close(self.into_u32()) } {
FastlyStatus::OK => Ok(()),
_ => Err(HandleError::ClosedHandle(HandleKind::Body)),
}
}
pub(crate) fn trailers_replace(&mut self, trailers: &HeaderMap) {
for (n, v) in trailers.iter() {
self.append_trailer(n, v);
}
}
pub(crate) fn append_trailer(&mut self, name: &HeaderName, value: &HeaderValue) {
let name_bytes: &[u8] = name.as_ref();
let value_bytes: &[u8] = value.as_ref();
unsafe {
abi::fastly_http_body::trailer_append(
self.as_u32(),
name_bytes.as_ptr(),
name_bytes.len(),
value_bytes.as_ptr(),
value_bytes.len(),
)
}
.result()
.expect("fastly_http_body::trailer_append returned error");
}
pub(crate) fn get_trailer_names_impl(
&self,
mut initial_buf_size: usize,
max_buf_size: Option<usize>,
) -> Result<Box<impl Iterator<Item = Result<HeaderName, BufferSizeError>> + '_>, BodyHandleError>
{
if let Some(max) = max_buf_size {
initial_buf_size = std::cmp::min(initial_buf_size, max);
}
let mut iter = abi::MultiValueHostcall::new(
b'\0',
initial_buf_size,
max_buf_size,
move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
abi::fastly_http_body::trailer_names_get(
self.as_u32(),
buf,
buf_size,
cursor,
ending_cursor,
nwritten,
)
},
)
.peekable();
if let Some(res) = iter.peek() {
use MultiValueHostcallError::ClosureError;
match res {
Err(ClosureError(e)) if *e == FastlyStatus::AGAIN => {
return Err(BodyHandleError::TrailersNotReady)
}
_ => (),
}
}
Ok(Box::new(iter.map(move |res| {
use MultiValueHostcallError::{BufferTooSmall, ClosureError};
match res {
Ok(name_bytes) => Ok(HeaderName::try_from(Vec::from(name_bytes)).unwrap()),
Err(BufferTooSmall { needed_buf_size }) => Err(BufferSizeError::header_name(
max_buf_size
.expect("maximum buffer size must exist if a buffer size error occurs"),
needed_buf_size,
)),
Err(ClosureError(e)) => {
panic!("fastly_http_resp::trailer_names_get returned error: {e:?}",)
}
}
})))
}
pub(crate) fn get_trailer_values_impl<'a>(
&'a self,
name: &'a HeaderName,
mut initial_buf_size: usize,
max_buf_size: Option<usize>,
) -> Result<Box<impl Iterator<Item = Result<HeaderValue, BufferSizeError>> + 'a>, BodyHandleError>
{
if let Some(max) = max_buf_size {
initial_buf_size = std::cmp::min(initial_buf_size, max);
}
let mut iter = abi::MultiValueHostcall::new(
b'\0',
initial_buf_size,
max_buf_size,
move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
let name: &[u8] = name.as_ref();
abi::fastly_http_body::trailer_values_get(
self.as_u32(),
name.as_ptr(),
name.len(),
buf,
buf_size,
cursor,
ending_cursor,
nwritten,
)
},
)
.peekable();
if let Some(res) = iter.peek() {
use MultiValueHostcallError::ClosureError;
match res {
Err(ClosureError(e)) if *e == FastlyStatus::AGAIN => {
return Err(BodyHandleError::TrailersNotReady)
}
_ => (),
}
}
Ok(Box::new(iter.map(move |res| {
use MultiValueHostcallError::{BufferTooSmall, ClosureError};
match res {
Ok(value_bytes) => {
let header_value =
unsafe { HeaderValue::from_maybe_shared_unchecked(value_bytes) };
Ok(header_value)
}
Err(BufferTooSmall { needed_buf_size }) => Err(BufferSizeError::header_value(
max_buf_size
.expect("maximum buffer size must exist if a buffer size error occurs"),
needed_buf_size,
)),
Err(ClosureError(e)) => {
panic!(
"fastly_http_resp::trailer_values_get returned error: {:?}",
e
);
}
}
})))
}
pub(crate) fn known_length(&self) -> Option<u64> {
let mut length_out: u64 = 0;
let status =
unsafe { fastly_sys::fastly_http_body::known_length(self.as_u32(), &mut length_out) };
if status == FastlyStatus::NONE {
None
} else {
assert_eq!(status, FastlyStatus::OK);
Some(length_out)
}
}
}
impl Read for BodyHandle {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
use std::io::{Error, ErrorKind};
let mut nread = 0;
let status = unsafe {
abi::fastly_http_body::read(self.as_u32(), buf.as_mut_ptr(), buf.len(), &mut nread)
};
match status {
FastlyStatus::OK => Ok(nread),
FastlyStatus::HTTPINCOMPLETE => {
Err(Error::new(ErrorKind::UnexpectedEof, "incomplete HTTP body"))
}
other => Err(Error::other(format!(
"fastly_http_body::read failed with '{:?}'",
other
))),
}
}
}
impl Write for BodyHandle {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
use std::io::{Error, ErrorKind};
let mut nwritten = 0;
let status = unsafe {
abi::fastly_http_body::write(
self.as_u32(),
buf.as_ptr(),
buf.len(),
BodyWriteEnd::Back,
&mut nwritten,
)
};
match status {
FastlyStatus::OK => Ok(nwritten),
FastlyStatus::BADF => Err(Error::new(ErrorKind::InvalidInput, format!("{status:?}"))),
_ => Err(Error::other(format!("{status:?}"))),
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Drop for BodyHandle {
fn drop(&mut self) {
if self.is_valid() {
unsafe { abi::fastly_http_body::close(self.as_u32()) }
.result()
.expect("fastly_http_body::close failed");
}
}
}
impl From<&str> for BodyHandle {
fn from(s: &str) -> Self {
let mut handle = Self::new();
handle
.write_all(s.as_bytes())
.expect("BodyHandle::from() write failed");
handle
}
}
impl From<String> for BodyHandle {
fn from(s: String) -> Self {
Self::from(s.as_str())
}
}
impl From<&[u8]> for BodyHandle {
fn from(s: &[u8]) -> Self {
let mut handle = Self::new();
handle
.write_all(s)
.expect("BodyHandle::from() write failed");
handle
}
}
impl From<Vec<u8>> for BodyHandle {
fn from(s: Vec<u8>) -> Self {
Self::from(s.as_slice())
}
}
impl BodyHandleExt for BodyHandle {
fn get_trailer_names<'a>(
&'a self,
buf_size: usize,
) -> Result<Box<dyn Iterator<Item = Result<HeaderName, BufferSizeError>> + 'a>, BodyHandleError>
{
Ok(self.get_trailer_names_impl(buf_size, Some(buf_size))?)
}
fn get_trailer_value(
&self,
name: &HeaderName,
max_len: usize,
) -> Result<Result<Option<HeaderValue>, BufferSizeError>, BodyHandleError> {
let name: &[u8] = name.as_ref();
let mut buf = BytesMut::with_capacity(max_len);
let mut nwritten = 0;
let status = unsafe {
abi::fastly_http_body::trailer_value_get(
self.as_u32(),
name.as_ptr(),
name.len(),
buf.as_mut_ptr(),
buf.capacity(),
&mut nwritten,
)
};
match status.result().map(|_| nwritten) {
Ok(nwritten) => {
assert!(nwritten <= buf.capacity(), "hostcall wrote too many bytes");
unsafe {
buf.set_len(nwritten);
}
let buf = buf.freeze();
let value = unsafe { HeaderValue::from_maybe_shared_unchecked(buf) };
Ok(Ok(Some(value)))
}
Err(FastlyStatus::AGAIN) => Err(BodyHandleError::TrailersNotReady),
Err(FastlyStatus::INVAL) => Ok(Ok(None)),
Err(FastlyStatus::BUFLEN) => Ok(Err(BufferSizeError::header_value(max_len, nwritten))),
_ => panic!("fastly_http_resp::trailer_value_get returned error"),
}
}
fn get_trailer_values<'a>(
&'a self,
name: &'a HeaderName,
max_len: usize,
) -> Result<Box<dyn Iterator<Item = Result<HeaderValue, BufferSizeError>> + 'a>, BodyHandleError>
{
Ok(self.get_trailer_values_impl(name, max_len, Some(max_len))?)
}
}