use bytes::Bytes;
use futures_channel::mpsc;
use futures_util::{SinkExt, Stream};
use hyper::body::{Body, Frame, SizeHint};
use std::ops::ControlFlow;
use std::{
error::Error as StdError,
mem,
pin::Pin,
task::{Context, Poll},
};
pub struct RequestBody(Inner);
enum Inner {
Full(Bytes),
Chunked(mpsc::Receiver<Message>),
}
enum Message {
Chunk(Bytes),
Abort,
}
impl RequestBody {
pub(crate) fn full(content: String) -> Self {
Self(Inner::Full(Bytes::from(content)))
}
pub(crate) fn chunked() -> (ChunkSender, Self) {
let (tx, rx) = mpsc::channel(0); let sender = ChunkSender(tx);
(sender, Self(Inner::Chunked(rx)))
}
}
impl Body for RequestBody {
type Data = Bytes;
type Error = Box<dyn StdError + Send + Sync>;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut self.get_mut().0 {
Inner::Full(bytes) if bytes.is_empty() => Poll::Ready(None),
Inner::Full(bytes) => Poll::Ready(Some(Ok(Frame::data(mem::take(bytes))))),
Inner::Chunked(rx) => match Pin::new(rx).poll_next(cx) {
Poll::Ready(Some(Message::Chunk(bytes))) => {
Poll::Ready(Some(Ok(Frame::data(bytes))))
}
Poll::Ready(Some(Message::Abort)) => Poll::Ready(Some(Err("aborted".into()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
}
}
fn is_end_stream(&self) -> bool {
match &self.0 {
Inner::Full(bytes) => bytes.is_empty(),
Inner::Chunked(_) => false, }
}
fn size_hint(&self) -> SizeHint {
match &self.0 {
Inner::Full(bytes) => SizeHint::with_exact(bytes.len() as u64),
Inner::Chunked(_) => SizeHint::default(), }
}
}
pub(crate) struct ChunkSender(mpsc::Sender<Message>);
impl ChunkSender {
#[allow(dead_code)] pub(crate) async fn send(&mut self, chunk: Bytes) -> bool {
self.0.send(Message::Chunk(chunk)).await.is_ok()
}
#[inline(always)]
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<bool> {
self.0.poll_ready(cx).map(|res| res.is_ok())
}
#[inline(always)]
pub(crate) fn try_send(
&mut self,
chunk: Bytes,
) -> ControlFlow<Result<(), &'static str>, Bytes> {
self.0.try_send(Message::Chunk(chunk)).map_or_else(
|e| {
if e.is_full() {
let Message::Chunk(bytes) = e.into_inner() else {
unreachable!()
};
ControlFlow::Continue(bytes)
} else {
ControlFlow::Break(Err("channel closed"))
}
},
|()| ControlFlow::Break(Ok(())),
)
}
pub(crate) fn abort(&self) {
let _ = self.0.clone().try_send(Message::Abort);
}
}