via/response/
stream_adapter.rs

1use bytes::Bytes;
2use futures_core::Stream;
3use http_body::{Body, Frame, SizeHint};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::body::size_hint;
8use crate::error::Error;
9
10/// Convert a `Stream + Send` into an `impl Body`.
11#[must_use = "streams do nothing unless polled"]
12pub struct StreamAdapter<T> {
13    /// The `Stream` that we are adapting to an `impl Body`.
14    stream: T,
15}
16
17impl<T, E> StreamAdapter<T>
18where
19    T: Stream<Item = Result<Frame<Bytes>, E>> + Send + Unpin,
20{
21    pub fn new(stream: T) -> Self {
22        Self { stream }
23    }
24}
25
26impl<T: Unpin> StreamAdapter<T> {
27    fn project(self: Pin<&mut Self>) -> Pin<&mut T> {
28        let this = self.get_mut();
29        let ptr = &mut this.stream;
30
31        Pin::new(ptr)
32    }
33}
34
35impl<T, E> Body for StreamAdapter<T>
36where
37    T: Stream<Item = Result<Frame<Bytes>, E>> + Send + Unpin,
38    Error: From<E>,
39{
40    type Data = Bytes;
41    type Error = Error;
42
43    fn poll_frame(
44        self: Pin<&mut Self>,
45        context: &mut Context<'_>,
46    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
47        self.project()
48            .poll_next(context)
49            .map_err(|error| error.into())
50    }
51
52    fn size_hint(&self) -> SizeHint {
53        let hint = self.stream.size_hint();
54        size_hint::from_stream_for_body(hint)
55    }
56}