use crate::abi;
use crate::Error;
use fastly_shared::BodyWriteEnd;
use lazy_static::lazy_static;
use std::convert::TryFrom;
use std::io::{BufReader, BufWriter, Read, Write};
use std::sync::Mutex;
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct BodyHandle {
handle: u32,
}
lazy_static! {
pub(crate) static ref GOT_DOWNSTREAM: Mutex<bool> = Mutex::new(false);
}
impl BodyHandle {
pub const INVALID: Self = Self {
handle: fastly_shared::INVALID_BODY_HANDLE,
};
pub fn is_invalid(&self) -> bool {
self.handle == Self::INVALID.handle
}
pub(crate) unsafe fn handle(&self) -> Self {
Self {
handle: self.handle,
}
}
pub fn downstream_request() -> Result<Self, Error> {
let mut got = GOT_DOWNSTREAM.lock().unwrap();
if *got {
return Err(Error::msg(
"cannot get more than one handle to the downstream request body per execution",
));
}
let mut handle = BodyHandle::INVALID;
let status = unsafe { abi::xqd_req_body_downstream_get(std::ptr::null_mut(), &mut handle) };
if status.is_err() || handle.is_invalid() {
Err(Error::msg("xqd_req_body_downstream_get failed"))
} else {
*got = true;
Ok(handle)
}
}
pub fn new() -> Result<BodyHandle, Error> {
let mut handle = BodyHandle::INVALID;
let status = unsafe { abi::xqd_body_new(&mut handle) };
if status.is_err() || handle.is_invalid() {
Err(Error::msg("xqd_body_new failed"))
} else {
Ok(handle)
}
}
pub fn append(&mut self, other: BodyHandle) -> Result<(), Error> {
let status = unsafe { abi::xqd_body_append(self.handle(), other.handle()) };
if status.is_err() {
Err(Error::msg("xqd_body_append failed"))
} else {
Ok(())
}
}
pub fn into_bytes(self) -> Result<Vec<u8>, Error> {
let mut body = vec![];
let mut bufread = BufReader::new(self);
bufread.read_to_end(&mut body)?;
Ok(body)
}
pub fn into_string(self) -> Result<String, Error> {
let mut body = String::new();
let mut bufread = BufReader::new(self);
bufread.read_to_string(&mut body)?;
Ok(body)
}
}
impl Read for BodyHandle {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
use std::io::{Error, ErrorKind};
let mut nread = 0;
let status =
unsafe { abi::xqd_body_read(self.handle(), buf.as_mut_ptr(), buf.len(), &mut nread) };
if status.is_err() {
Err(Error::new(ErrorKind::Other, "xqd_body_read failed"))
} else {
Ok(nread)
}
}
}
impl Write for BodyHandle {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
use std::io::{Error, ErrorKind};
let mut nwritten = 0;
let status = unsafe {
abi::xqd_body_write(
self.handle(),
buf.as_ptr(),
buf.len(),
BodyWriteEnd::Back,
&mut nwritten,
)
};
if status.is_err() {
Err(Error::new(ErrorKind::Other, "xqd_body_write failed"))
} else {
Ok(nwritten)
}
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl TryFrom<&str> for BodyHandle {
type Error = Error;
fn try_from(s: &str) -> Result<Self, Error> {
let mut handle = BodyHandle::new()?;
handle.write_all(s.as_bytes())?;
Ok(handle)
}
}
impl TryFrom<String> for BodyHandle {
type Error = Error;
fn try_from(s: String) -> Result<Self, Error> {
BodyHandle::try_from(s.as_str())
}
}
impl TryFrom<&[u8]> for BodyHandle {
type Error = Error;
fn try_from(s: &[u8]) -> Result<Self, Error> {
let mut handle = BodyHandle::new()?;
handle.write_all(s)?;
Ok(handle)
}
}
impl TryFrom<Vec<u8>> for BodyHandle {
type Error = Error;
fn try_from(s: Vec<u8>) -> Result<Self, Error> {
BodyHandle::try_from(s.as_slice())
}
}
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct StreamingBodyHandle {
handle: u32,
}
impl StreamingBodyHandle {
pub(crate) unsafe fn from_body_handle(body_handle: &BodyHandle) -> Self {
Self {
handle: body_handle.handle,
}
}
unsafe fn as_body_handle(&self) -> BodyHandle {
BodyHandle {
handle: self.handle,
}
}
pub fn append(&mut self, other: BodyHandle) -> Result<(), Error> {
unsafe { self.as_body_handle() }.append(other)
}
}
impl Write for StreamingBodyHandle {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
unsafe { self.as_body_handle() }.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
pub struct Body {
reader: BufReader<BodyHandle>,
writer: BufWriter<BodyHandle>,
}
impl Body {
pub fn new() -> Result<Self, Error> {
Ok(BodyHandle::new()?.into())
}
fn handle(&mut self) -> &mut BodyHandle {
self.reader.get_mut()
}
pub fn into_handle(mut self) -> Result<BodyHandle, Error> {
let read_buf = self.reader.buffer();
if !read_buf.is_empty() {
let mut nwritten = 0;
let status = unsafe {
abi::xqd_body_write(
self.reader.get_ref().handle(),
read_buf.as_ptr(),
read_buf.len(),
BodyWriteEnd::Front,
&mut nwritten,
)
};
if status.is_err() {
return Err(Error::msg("xqd_body_write_front failed"));
}
if nwritten != read_buf.len() {
return Err(Error::msg("xqd_body_write_front didn't fully write"));
}
}
self.writer.flush()?;
Ok(self.reader.into_inner())
}
pub fn into_bytes(self) -> Result<Vec<u8>, Error> {
self.into_handle()?.into_bytes()
}
pub fn into_string(self) -> Result<String, Error> {
self.into_handle()?.into_string()
}
pub fn append(&mut self, other: Body) -> Result<(), Error> {
self.writer.flush()?;
self.handle().append(other.into_handle()?)
}
}
impl Read for Body {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.writer.flush()?;
self.reader.read(buf)
}
fn read_vectored(&mut self, bufs: &mut [std::io::IoSliceMut]) -> std::io::Result<usize> {
self.writer.flush()?;
self.reader.read_vectored(bufs)
}
}
impl std::io::BufRead for Body {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.writer.flush()?;
self.reader.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.reader.consume(amt)
}
}
impl Write for Body {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
self.writer.write_vectored(bufs)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
}
}
impl From<BodyHandle> for Body {
fn from(handle: BodyHandle) -> Self {
let handle2 = unsafe { handle.handle() };
Self {
reader: BufReader::new(handle),
writer: BufWriter::new(handle2),
}
}
}
impl TryFrom<&str> for Body {
type Error = Error;
fn try_from(s: &str) -> Result<Self, Error> {
Ok(BodyHandle::try_from(s)?.into())
}
}
impl TryFrom<String> for Body {
type Error = Error;
fn try_from(s: String) -> Result<Self, Error> {
Ok(BodyHandle::try_from(s)?.into())
}
}
impl TryFrom<&[u8]> for Body {
type Error = Error;
fn try_from(s: &[u8]) -> Result<Self, Error> {
Ok(BodyHandle::try_from(s)?.into())
}
}
impl TryFrom<Vec<u8>> for Body {
type Error = Error;
fn try_from(s: Vec<u8>) -> Result<Self, Error> {
Ok(BodyHandle::try_from(s)?.into())
}
}
pub struct StreamingBody {
writer: BufWriter<StreamingBodyHandle>,
}
impl StreamingBody {
fn handle(&mut self) -> &mut StreamingBodyHandle {
self.writer.get_mut()
}
pub fn append(&mut self, other: Body) -> Result<(), Error> {
self.writer.flush()?;
self.handle().append(other.into_handle()?)
}
}
impl From<StreamingBodyHandle> for StreamingBody {
fn from(handle: StreamingBodyHandle) -> Self {
Self {
writer: BufWriter::new(handle),
}
}
}
impl Write for StreamingBody {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
self.writer.write_vectored(bufs)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
}
}