1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use futures_core::{Stream, TryStream};
6use pin_project_lite::pin_project;
7
8use super::Body;
9
10pin_project! {
11 #[derive(Debug, Default)]
12 pub struct StreamBody<S> {
13 #[pin]
14 stream: S,
15 }
16}
17
18impl<S> StreamBody<S> {
19 pub fn new(stream: S) -> Self {
20 Self { stream }
21 }
22}
23
24impl<S> Body for StreamBody<S>
25where
26 S: TryStream,
27 S::Ok: Into<Bytes>,
28{
29 type Error = S::Error;
30
31 fn poll_next(
32 self: Pin<&mut Self>,
33 cx: &mut Context<'_>,
34 ) -> Poll<Option<Result<Bytes, Self::Error>>> {
35 match self.project().stream.try_poll_next(cx) {
36 Poll::Pending => Poll::Pending,
37 Poll::Ready(None) => Poll::Ready(None),
38 Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(data.into()))),
39 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
40 }
41 }
42}
43
44pin_project! {
45 #[derive(Debug, Default)]
46 pub struct BodyStream<B> {
47 #[pin]
48 body: B,
49 }
50}
51
52impl<B> BodyStream<B> {
53 pub fn new(body: B) -> Self {
54 Self { body }
55 }
56}
57
58impl<B> Stream for BodyStream<B>
59where
60 B: Body,
61{
62 type Item = Result<Bytes, B::Error>;
63
64 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 self.project().body.poll_next(cx)
66 }
67}