use crate::common::sys::lifecycle::Error;
use bytes::Bytes;
use http_body::{Body, Frame, SizeHint};
use http_body_util::{Full, combinators::BoxBody};
use hyper::body::Incoming;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
#[derive(Debug, Default)]
pub enum VaneBody {
Hyper(Incoming),
H3(BoxBody<Bytes, Error>),
Generic(BoxBody<Bytes, Error>),
Buffered(Full<Bytes>),
#[default]
Empty,
}
impl Body for VaneBody {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut *self {
Self::Hyper(body) => match Pin::new(body).poll_frame(cx) {
Poll::Ready(Some(Ok(frame))) => {
let frame = frame.map_data(|d| d);
Poll::Ready(Some(Ok(frame)))
}
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(Error::System(format!("Hyper Body Error: {e}")))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
Self::H3(body) | Self::Generic(body) => Pin::new(body).poll_frame(cx),
Self::Buffered(body) => match Pin::new(body).poll_frame(cx) {
Poll::Ready(Some(Ok(frame))) => Poll::Ready(Some(Ok(frame))),
Poll::Ready(Some(Err(e))) => match e {}, Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
Self::Empty => Poll::Ready(None),
}
}
fn is_end_stream(&self) -> bool {
match self {
Self::Hyper(b) => b.is_end_stream(),
Self::H3(b) | Self::Generic(b) => b.is_end_stream(),
Self::Buffered(b) => b.is_end_stream(),
Self::Empty => true,
}
}
fn size_hint(&self) -> SizeHint {
match self {
Self::Hyper(b) => b.size_hint(),
Self::H3(b) | Self::Generic(b) => b.size_hint(),
Self::Buffered(b) => b.size_hint(),
Self::Empty => SizeHint::with_exact(0),
}
}
}
pub struct H3BodyAdapter {
rx: mpsc::Receiver<Result<Bytes, Error>>,
}
impl H3BodyAdapter {
#[must_use]
pub fn new(rx: mpsc::Receiver<Result<Bytes, Error>>) -> Self {
Self { rx }
}
}
impl Body for H3BodyAdapter {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.rx.poll_recv(cx) {
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(Frame::data(bytes)))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending,
}
}
}