via/response/
stream_adapter.rs1use bytes::Bytes;
2use futures_core::Stream;
3use http_body::{Body, Frame, SizeHint};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::body::size_hint;
8use crate::error::Error;
9
10#[must_use = "streams do nothing unless polled"]
12pub struct StreamAdapter<T> {
13 stream: T,
15}
16
17impl<T, E> StreamAdapter<T>
18where
19 T: Stream<Item = Result<Frame<Bytes>, E>> + Send + Unpin,
20{
21 pub fn new(stream: T) -> Self {
22 Self { stream }
23 }
24}
25
26impl<T: Unpin> StreamAdapter<T> {
27 fn project(self: Pin<&mut Self>) -> Pin<&mut T> {
28 let this = self.get_mut();
29 let ptr = &mut this.stream;
30
31 Pin::new(ptr)
32 }
33}
34
35impl<T, E> Body for StreamAdapter<T>
36where
37 T: Stream<Item = Result<Frame<Bytes>, E>> + Send + Unpin,
38 Error: From<E>,
39{
40 type Data = Bytes;
41 type Error = Error;
42
43 fn poll_frame(
44 self: Pin<&mut Self>,
45 context: &mut Context<'_>,
46 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
47 self.project()
48 .poll_next(context)
49 .map_err(|error| error.into())
50 }
51
52 fn size_hint(&self) -> SizeHint {
53 let hint = self.stream.size_hint();
54 size_hint::from_stream_for_body(hint)
55 }
56}