use super::AsyncBody;
use futures_lite::{future::yield_now, io::AsyncWriteExt};
use sluice::pipe::{pipe, PipeWriter};
use std::{
borrow::Cow,
fmt,
fs::File,
io::{Cursor, ErrorKind, Read, Result},
};
pub struct Body(Inner);
enum Inner {
Empty,
Buffer(Cursor<Cow<'static, [u8]>>),
Reader(Box<dyn Read + Send + Sync>, Option<u64>),
}
impl Body {
pub const fn empty() -> Self {
Self(Inner::Empty)
}
#[inline]
pub fn from_bytes_static<B>(bytes: B) -> Self
where
B: AsRef<[u8]> + 'static,
{
castaway::match_type!(bytes, {
Cursor<Cow<'static, [u8]>> as bytes => Self(Inner::Buffer(bytes)),
Vec<u8> as bytes => Self::from(bytes),
String as bytes => Self::from(bytes.into_bytes()),
bytes => Self::from(bytes.as_ref().to_vec()),
})
}
pub fn from_reader<R>(reader: R) -> Self
where
R: Read + Send + Sync + 'static,
{
Self(Inner::Reader(Box::new(reader), None))
}
pub fn from_reader_sized<R>(reader: R, length: u64) -> Self
where
R: Read + Send + Sync + 'static,
{
Self(Inner::Reader(Box::new(reader), Some(length)))
}
pub fn is_empty(&self) -> bool {
match self.0 {
Inner::Empty => true,
_ => false,
}
}
pub fn len(&self) -> Option<u64> {
match &self.0 {
Inner::Empty => Some(0),
Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64),
Inner::Reader(_, len) => *len,
}
}
pub fn reset(&mut self) -> bool {
match &mut self.0 {
Inner::Empty => true,
Inner::Buffer(cursor) => {
cursor.set_position(0);
true
}
_ => false,
}
}
pub(crate) fn into_async(self) -> (AsyncBody, Option<Writer>) {
match self.0 {
Inner::Empty => (AsyncBody::empty(), None),
Inner::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None),
Inner::Reader(reader, len) => {
let (pipe_reader, writer) = pipe();
(
if let Some(len) = len {
AsyncBody::from_reader_sized(pipe_reader, len)
} else {
AsyncBody::from_reader(pipe_reader)
},
Some(Writer {
reader,
writer,
}),
)
}
}
}
}
impl Read for Body {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
match &mut self.0 {
Inner::Empty => Ok(0),
Inner::Buffer(cursor) => cursor.read(buf),
Inner::Reader(reader, _) => reader.read(buf),
}
}
}
impl Default for Body {
fn default() -> Self {
Self::empty()
}
}
impl From<()> for Body {
fn from(_: ()) -> Self {
Self::empty()
}
}
impl From<Vec<u8>> for Body {
fn from(body: Vec<u8>) -> Self {
Self(Inner::Buffer(Cursor::new(Cow::Owned(body))))
}
}
impl From<&'_ [u8]> for Body {
fn from(body: &[u8]) -> Self {
body.to_vec().into()
}
}
impl From<String> for Body {
fn from(body: String) -> Self {
body.into_bytes().into()
}
}
impl From<&'_ str> for Body {
fn from(body: &str) -> Self {
body.as_bytes().into()
}
}
impl From<File> for Body {
fn from(file: File) -> Self {
if let Ok(metadata) = file.metadata() {
Self::from_reader_sized(file, metadata.len())
} else {
Self::from_reader(file)
}
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.is_empty() {
write!(f, "Body(Empty)")
} else {
match self.len() {
Some(len) => write!(f, "Body({})", len),
None => write!(f, "Body(?)"),
}
}
}
}
pub(crate) struct Writer {
reader: Box<dyn Read + Send + Sync>,
writer: PipeWriter,
}
impl Writer {
const BUF_SIZE: usize = 16384;
pub(crate) async fn write(&mut self) -> Result<()> {
let mut buf = [0; Self::BUF_SIZE];
loop {
let len = match self.reader.read(&mut buf) {
Ok(0) => return Ok(()),
Ok(len) => len,
Err(e) if e.kind() == ErrorKind::Interrupted => {
yield_now().await;
continue;
}
Err(e) => return Err(e),
};
self.writer.write_all(&buf[..len]).await?;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
static_assertions::assert_impl_all!(Body: Send, Sync);
#[test]
fn empty_body() {
let body = Body::empty();
assert!(body.is_empty());
assert_eq!(body.len(), Some(0));
}
#[test]
fn zero_length_body() {
let body = Body::from(vec![]);
assert!(!body.is_empty());
assert_eq!(body.len(), Some(0));
}
#[test]
fn reader_with_unknown_length() {
let body = Body::from_reader(std::io::empty());
assert!(!body.is_empty());
assert_eq!(body.len(), None);
}
#[test]
fn reader_with_known_length() {
let body = Body::from_reader_sized(std::io::empty(), 0);
assert!(!body.is_empty());
assert_eq!(body.len(), Some(0));
}
#[test]
fn reset_memory_body() {
let mut body = Body::from("hello world");
let mut buf = String::new();
assert_eq!(body.read_to_string(&mut buf).unwrap(), 11);
assert_eq!(buf, "hello world");
assert!(body.reset());
assert_eq!(body.read_to_string(&mut buf).unwrap(), 11);
assert_eq!(buf, "hello worldhello world");
}
#[test]
fn cannot_reset_reader() {
let mut body = Body::from_reader(std::io::empty());
assert_eq!(body.reset(), false);
}
}