#![allow(missing_docs)]
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
use bytes::Bytes;
#[cfg(feature = "streaming")]
use bytes::BytesMut;
use http_body::{Body, Frame};
use pin_project_lite::pin_project;
use crate::error::StreamingError;
#[cfg(feature = "streaming")]
const STREAM_BUFFER_SIZE: usize = 64 * 1024;
#[cfg(feature = "streaming")]
pin_project! {
#[project = StreamingBodyProj]
pub enum StreamingBody<B> {
Buffered {
data: Option<Bytes>,
},
Streaming {
#[pin]
inner: B,
},
File {
#[pin]
reader: tokio::fs::File,
buffer: BytesMut,
done: bool,
size: u64,
},
}
}
#[cfg(not(feature = "streaming"))]
pin_project! {
#[project = StreamingBodyProj]
pub enum StreamingBody<B> {
Buffered {
data: Option<Bytes>,
},
Streaming {
#[pin]
inner: B,
},
}
}
impl<B> StreamingBody<B> {
#[must_use]
pub fn buffered(data: Bytes) -> Self {
Self::Buffered { data: Some(data) }
}
#[must_use]
pub fn streaming(body: B) -> Self {
Self::Streaming { inner: body }
}
#[cfg(feature = "streaming")]
#[must_use]
pub fn from_file_with_size(file: tokio::fs::File, size: u64) -> Self {
Self::File {
reader: file,
buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
done: false,
size,
}
}
}
#[cfg(feature = "streaming")]
impl<B> Body for StreamingBody<B>
where
B: Body + Unpin,
B::Error: Into<StreamingError>,
B::Data: Into<Bytes>,
{
type Data = Bytes;
type Error = StreamingError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.as_mut().project() {
StreamingBodyProj::Buffered { data } => {
if let Some(bytes) = data.take() {
if bytes.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(Frame::data(bytes))))
}
} else {
Poll::Ready(None)
}
}
StreamingBodyProj::Streaming { inner } => {
inner.poll_frame(cx).map(|opt| {
opt.map(|res| {
res.map(|frame| frame.map_data(Into::into))
.map_err(Into::into)
})
})
}
StreamingBodyProj::File { reader, buffer, done, .. } => {
if *done {
return Poll::Ready(None);
}
use tokio::io::AsyncRead;
buffer.resize(STREAM_BUFFER_SIZE, 0);
let mut read_buf = tokio::io::ReadBuf::new(buffer.as_mut());
match reader.poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let filled_len = read_buf.filled().len();
if filled_len == 0 {
*done = true;
buffer.clear();
Poll::Ready(None)
} else {
buffer.truncate(filled_len);
let bytes = buffer.split().freeze();
Poll::Ready(Some(Ok(Frame::data(bytes))))
}
}
Poll::Ready(Err(e)) => {
*done = true;
buffer.clear();
Poll::Ready(Some(Err(StreamingError::new(Box::new(e)))))
}
Poll::Pending => Poll::Pending,
}
}
}
}
fn is_end_stream(&self) -> bool {
match self {
StreamingBody::Buffered { data } => data.is_none(),
StreamingBody::Streaming { inner } => inner.is_end_stream(),
StreamingBody::File { done, .. } => *done,
}
}
fn size_hint(&self) -> http_body::SizeHint {
match self {
StreamingBody::Buffered { data } => {
if let Some(bytes) = data {
let len = bytes.len() as u64;
http_body::SizeHint::with_exact(len)
} else {
http_body::SizeHint::with_exact(0)
}
}
StreamingBody::Streaming { inner } => inner.size_hint(),
StreamingBody::File { size, .. } => {
http_body::SizeHint::with_exact(*size)
}
}
}
}
#[cfg(not(feature = "streaming"))]
impl<B> Body for StreamingBody<B>
where
B: Body + Unpin,
B::Error: Into<StreamingError>,
B::Data: Into<Bytes>,
{
type Data = Bytes;
type Error = StreamingError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.as_mut().project() {
StreamingBodyProj::Buffered { data } => {
if let Some(bytes) = data.take() {
if bytes.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(Frame::data(bytes))))
}
} else {
Poll::Ready(None)
}
}
StreamingBodyProj::Streaming { inner } => {
inner.poll_frame(cx).map(|opt| {
opt.map(|res| {
res.map(|frame| frame.map_data(Into::into))
.map_err(Into::into)
})
})
}
}
}
fn is_end_stream(&self) -> bool {
match self {
StreamingBody::Buffered { data } => data.is_none(),
StreamingBody::Streaming { inner } => inner.is_end_stream(),
}
}
fn size_hint(&self) -> http_body::SizeHint {
match self {
StreamingBody::Buffered { data } => {
if let Some(bytes) = data {
let len = bytes.len() as u64;
http_body::SizeHint::with_exact(len)
} else {
http_body::SizeHint::with_exact(0)
}
}
StreamingBody::Streaming { inner } => inner.size_hint(),
}
}
}
impl<B> From<Bytes> for StreamingBody<B> {
fn from(bytes: Bytes) -> Self {
Self::buffered(bytes)
}
}
#[cfg(feature = "streaming")]
impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Buffered { data } => f
.debug_struct("StreamingBody::Buffered")
.field("has_data", &data.is_some())
.field("len", &data.as_ref().map(|b| b.len()))
.finish(),
Self::Streaming { inner } => f
.debug_struct("StreamingBody::Streaming")
.field("inner", inner)
.finish(),
Self::File { done, size, .. } => f
.debug_struct("StreamingBody::File")
.field("done", done)
.field("size", &size)
.finish_non_exhaustive(),
}
}
}
#[cfg(not(feature = "streaming"))]
impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Buffered { data } => f
.debug_struct("StreamingBody::Buffered")
.field("has_data", &data.is_some())
.field("len", &data.as_ref().map(|b| b.len()))
.finish(),
Self::Streaming { inner } => f
.debug_struct("StreamingBody::Streaming")
.field("inner", inner)
.finish(),
}
}
}
#[cfg(feature = "streaming")]
impl<B> StreamingBody<B>
where
B: Body + Unpin + Send,
B::Error: Into<StreamingError>,
B::Data: Into<Bytes>,
{
pub fn into_bytes_stream(
self,
) -> impl futures_util::Stream<
Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
> + Send {
use futures_util::TryStreamExt;
http_body_util::BodyStream::new(self)
.map_ok(|frame| {
frame.into_data().unwrap_or_else(|_| Bytes::new())
})
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
Box::new(std::io::Error::other(format!("Stream error: {e}")))
})
}
}