use std::{
future::Future,
io::Cursor,
pin::Pin,
task::{ready, Context, Poll},
};
use h2::SendStream;
use http_body::Body;
use pin_project_lite::pin_project;
use crate::{h2_error_to_io, h2_reason_to_io};
pin_project! {
pub(crate) struct PipeToSendStream<S>
where
S: Body,
{
body_tx: SendStream<SendBuf<S::Data>>,
capacity_reserving: bool,
#[pin]
stream: S,
}
}
impl<S> PipeToSendStream<S>
where
S: Body,
{
#[inline]
pub fn new(body_tx: SendStream<SendBuf<S::Data>>, stream: S) -> Self {
Self {
body_tx,
capacity_reserving: false,
stream,
}
}
}
impl<S> Future for PipeToSendStream<S>
where
S: Body,
S::Error: std::error::Error,
{
type Output = Result<(), std::io::Error>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
if !*this.capacity_reserving {
this.body_tx.reserve_capacity(1);
*this.capacity_reserving = true;
}
if this.body_tx.capacity() == 0 {
loop {
match ready!(this.body_tx.poll_capacity(cx)) {
Some(Ok(0)) => {
}
Some(Ok(_)) => break,
Some(Err(e)) => return Poll::Ready(Err(h2_error_to_io(e))),
None => {
return Poll::Ready(Err(std::io::Error::other(
"send stream capacity unexpectedly closed",
)));
}
}
}
}
*this.capacity_reserving = false;
match this.body_tx.poll_reset(cx) {
Poll::Ready(Ok(reason)) => {
return Poll::Ready(Err(h2_reason_to_io(reason)));
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(h2_error_to_io(err)));
}
Poll::Pending => {}
}
match ready!(this.stream.as_mut().poll_frame(cx)) {
Some(Ok(frame)) => {
if frame.is_data() {
let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
let is_eos = this.stream.is_end_stream();
let buf = SendBuf::Buf(chunk);
this.body_tx
.send_data(buf, is_eos)
.map_err(h2_error_to_io)?;
if is_eos {
return Poll::Ready(Ok(()));
}
} else if frame.is_trailers() {
this.body_tx.reserve_capacity(0);
this.body_tx
.send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
.map_err(h2_error_to_io)?;
return Poll::Ready(Ok(()));
} else {
}
}
Some(Err(e)) => return Poll::Ready(Err(std::io::Error::other(e.to_string()))),
None => {
return Poll::Ready(
this.body_tx
.send_data(SendBuf::None, true)
.map_err(h2_error_to_io),
);
}
}
}
}
}
#[repr(usize)]
pub(super) enum SendBuf<B> {
Buf(B),
Cursor(Cursor<Box<[u8]>>),
None,
}
impl<B: bytes::Buf> bytes::Buf for SendBuf<B> {
#[inline]
fn remaining(&self) -> usize {
match self {
Self::Buf(b) => b.remaining(),
Self::Cursor(c) => c.remaining(),
Self::None => 0,
}
}
#[inline]
fn chunk(&self) -> &[u8] {
match self {
Self::Buf(b) => b.chunk(),
Self::Cursor(c) => {
let pos = c.position() as usize;
let slice = c.get_ref();
if pos < slice.len() {
&slice[pos..]
} else {
&[]
}
}
Self::None => &[],
}
}
#[inline]
fn advance(&mut self, cnt: usize) {
match self {
Self::Buf(b) => b.advance(cnt),
Self::Cursor(c) => c.advance(cnt),
Self::None => {}
}
}
#[inline]
fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
match self {
Self::Buf(b) => b.chunks_vectored(dst),
Self::Cursor(c) => c.chunks_vectored(dst),
Self::None => 0,
}
}
}