puzz_http/body/
stream.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use futures_core::{Stream, TryStream};
6use pin_project_lite::pin_project;
7
8use super::Body;
9
10pin_project! {
11    #[derive(Debug, Default)]
12    pub struct StreamBody<S> {
13        #[pin]
14        stream: S,
15    }
16}
17
18impl<S> StreamBody<S> {
19    pub fn new(stream: S) -> Self {
20        Self { stream }
21    }
22}
23
24impl<S> Body for StreamBody<S>
25where
26    S: TryStream,
27    S::Ok: Into<Bytes>,
28{
29    type Error = S::Error;
30
31    fn poll_next(
32        self: Pin<&mut Self>,
33        cx: &mut Context<'_>,
34    ) -> Poll<Option<Result<Bytes, Self::Error>>> {
35        match self.project().stream.try_poll_next(cx) {
36            Poll::Pending => Poll::Pending,
37            Poll::Ready(None) => Poll::Ready(None),
38            Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(data.into()))),
39            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
40        }
41    }
42}
43
44pin_project! {
45    #[derive(Debug, Default)]
46    pub struct BodyStream<B> {
47        #[pin]
48        body: B,
49    }
50}
51
52impl<B> BodyStream<B> {
53    pub fn new(body: B) -> Self {
54        Self { body }
55    }
56}
57
58impl<B> Stream for BodyStream<B>
59where
60    B: Body,
61{
62    type Item = Result<Bytes, B::Error>;
63
64    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        self.project().body.poll_next(cx)
66    }
67}