use futures_lite::io::{AsyncRead, BlockOn};
use std::{
borrow::Cow,
fmt,
io::{self, Cursor, Read},
pin::Pin,
str,
task::{Context, Poll},
};
mod sync;
#[allow(unreachable_pub)]
pub use sync::Body;
pub struct AsyncBody(Inner);
enum Inner {
Empty,
Buffer(Cursor<Cow<'static, [u8]>>),
Reader(Pin<Box<dyn AsyncRead + Send + Sync>>, Option<u64>),
}
impl AsyncBody {
pub const fn empty() -> Self {
Self(Inner::Empty)
}
#[inline]
pub fn from_bytes_static<B>(bytes: B) -> Self
where
B: AsRef<[u8]> + 'static,
{
castaway::match_type!(bytes, {
Cursor<Cow<'static, [u8]>> as bytes => Self(Inner::Buffer(bytes)),
&'static [u8] as bytes => Self::from_static_impl(bytes),
&'static str as bytes => Self::from_static_impl(bytes.as_bytes()),
Vec<u8> as bytes => Self::from(bytes),
String as bytes => Self::from(bytes.into_bytes()),
bytes => Self::from(bytes.as_ref().to_vec()),
})
}
#[inline]
fn from_static_impl(bytes: &'static [u8]) -> Self {
Self(Inner::Buffer(Cursor::new(Cow::Borrowed(bytes))))
}
pub fn from_reader<R>(read: R) -> Self
where
R: AsyncRead + Send + Sync + 'static,
{
Self(Inner::Reader(Box::pin(read), None))
}
pub fn from_reader_sized<R>(read: R, length: u64) -> Self
where
R: AsyncRead + Send + Sync + 'static,
{
Self(Inner::Reader(Box::pin(read), Some(length)))
}
pub fn is_empty(&self) -> bool {
match self.0 {
Inner::Empty => true,
_ => false,
}
}
pub fn len(&self) -> Option<u64> {
match &self.0 {
Inner::Empty => Some(0),
Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64),
Inner::Reader(_, len) => *len,
}
}
pub fn reset(&mut self) -> bool {
match &mut self.0 {
Inner::Empty => true,
Inner::Buffer(cursor) => {
cursor.set_position(0);
true
}
Inner::Reader(_, _) => false,
}
}
pub(crate) fn into_sync(self) -> sync::Body {
match self.0 {
Inner::Empty => sync::Body::empty(),
Inner::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()),
Inner::Reader(reader, Some(len)) => {
sync::Body::from_reader_sized(BlockOn::new(reader), len)
}
Inner::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)),
}
}
}
impl AsyncRead for AsyncBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match &mut self.0 {
Inner::Empty => Poll::Ready(Ok(0)),
Inner::Buffer(cursor) => Poll::Ready(cursor.read(buf)),
Inner::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf),
}
}
}
impl Default for AsyncBody {
fn default() -> Self {
Self::empty()
}
}
impl From<()> for AsyncBody {
fn from(_: ()) -> Self {
Self::empty()
}
}
impl From<Vec<u8>> for AsyncBody {
fn from(body: Vec<u8>) -> Self {
Self(Inner::Buffer(Cursor::new(Cow::Owned(body))))
}
}
impl From<&'_ [u8]> for AsyncBody {
fn from(body: &[u8]) -> Self {
body.to_vec().into()
}
}
impl From<String> for AsyncBody {
fn from(body: String) -> Self {
body.into_bytes().into()
}
}
impl From<&'_ str> for AsyncBody {
fn from(body: &str) -> Self {
body.as_bytes().into()
}
}
impl<T: Into<Self>> From<Option<T>> for AsyncBody {
fn from(body: Option<T>) -> Self {
match body {
Some(body) => body.into(),
None => Self::empty(),
}
}
}
impl fmt::Debug for AsyncBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.is_empty() {
write!(f, "AsyncBody(Empty)")
} else {
match self.len() {
Some(len) => write!(f, "AsyncBody({})", len),
None => write!(f, "AsyncBody(?)"),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::{
future::{block_on, zip},
io::AsyncReadExt,
};
static_assertions::assert_impl_all!(AsyncBody: Send, Sync);
#[test]
fn empty_body() {
let body = AsyncBody::empty();
assert!(body.is_empty());
assert_eq!(body.len(), Some(0));
}
#[test]
fn zero_length_body() {
let body = AsyncBody::from(vec![]);
assert!(!body.is_empty());
assert_eq!(body.len(), Some(0));
}
#[test]
fn reader_with_unknown_length() {
let body = AsyncBody::from_reader(futures_lite::io::empty());
assert!(!body.is_empty());
assert_eq!(body.len(), None);
}
#[test]
fn reader_with_known_length() {
let body = AsyncBody::from_reader_sized(futures_lite::io::empty(), 0);
assert!(!body.is_empty());
assert_eq!(body.len(), Some(0));
}
#[test]
fn reset_memory_body() {
block_on(async {
let mut body = AsyncBody::from("hello world");
let mut buf = String::new();
assert_eq!(body.read_to_string(&mut buf).await.unwrap(), 11);
assert_eq!(buf, "hello world");
assert!(body.reset());
buf.clear(); assert_eq!(body.read_to_string(&mut buf).await.unwrap(), 11);
assert_eq!(buf, "hello world");
});
}
#[test]
fn cannot_reset_reader() {
let mut body = AsyncBody::from_reader(futures_lite::io::empty());
assert_eq!(body.reset(), false);
}
#[test]
fn sync_memory_into_async() {
let (body, writer) = Body::from("hello world").into_async();
assert!(writer.is_none());
assert_eq!(body.len(), Some(11));
}
#[test]
fn sync_reader_into_async() {
block_on(async {
let (mut body, writer) = Body::from_reader("hello world".as_bytes()).into_async();
assert!(writer.is_some());
zip(
async move {
writer.unwrap().write().await.unwrap();
},
async move {
let mut buf = String::new();
body.read_to_string(&mut buf).await.unwrap();
assert_eq!(buf, "hello world");
},
)
.await;
});
}
}