use futures_util::future::Shared;
use futures_util::FutureExt;
use http_body::SizeHint;
use std::future::Future;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use crate::clonable_frame::ClonableFrame;
pub(crate) type IsEndStream = bool;
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub(crate) struct InnerFuture<B> {
inner: Option<B>,
}
impl<B> InnerFuture<B>
where
B: http_body::Body + Unpin,
B::Data: Clone,
B::Error: Clone,
{
pub(crate) fn new(body: B) -> Self {
InnerFuture { inner: Some(body) }
}
}
impl<B> Future for InnerFuture<B>
where
B: http_body::Body + Unpin,
B::Data: Clone,
B::Error: Clone,
{
type Output = Option<(
Result<ClonableFrame<B::Data>, B::Error>,
Shared<Self>,
IsEndStream,
SizeHint,
)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let body = match self.inner.as_mut() {
Some(b) => Pin::new(b),
None => return Poll::Ready(None),
};
let item = ready!(body.poll_frame(cx));
let body = self.inner.take().unwrap();
match item {
Some(item) => {
let is_end_stream = body.is_end_stream();
let size_hint = body.size_hint();
let next_shared_future = InnerFuture::new(body).shared();
Poll::Ready(Some((
item.map(ClonableFrame::new),
next_shared_future,
is_end_stream,
size_hint,
)))
}
None => Poll::Ready(None),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use futures_util::stream;
use http_body::Frame;
use http_body_util::StreamBody;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[tokio::test]
async fn test_inner_future_direct_poll() {
let chunks = vec!["test"];
let stream = stream::iter(
chunks
.into_iter()
.map(|s| Ok::<_, std::convert::Infallible>(Frame::data(Bytes::from(s)))),
);
let body = StreamBody::new(stream);
let mut inner_future = InnerFuture::new(body);
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut inner_future).poll(&mut cx);
assert!(matches!(result, Poll::Ready(Some(_))));
let result = Pin::new(&mut inner_future).poll(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
}
}