1use std::error::Error as StdError;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::{Buf, Bytes};
9use futures_util::stream::Stream;
10use http_body::Body as HttpBody;
11
12use crate::async_stream::AsyncStream;
13
14pub struct Body {
17 pub(crate) inner: BodyType,
18}
19
20pub(crate) enum BodyType {
21 Bytes(Option<Bytes>),
22 AsyncStream(AsyncStream<Bytes, io::Error>),
23 Empty,
24}
25
26impl Body {
27 pub fn empty() -> Body {
29 Body {
30 inner: BodyType::Empty,
31 }
32 }
33}
34
35impl Stream for Body {
36 type Item = io::Result<Bytes>;
37
38 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
39 match self.inner {
40 BodyType::Bytes(ref mut strm) => Poll::Ready(strm.take().map(Ok)),
41 BodyType::AsyncStream(ref mut strm) => {
42 let strm = Pin::new(strm);
43 strm.poll_next(cx)
44 }
45 BodyType::Empty => Poll::Ready(None),
46 }
47 }
48}
49
50impl HttpBody for Body {
51 type Data = Bytes;
52 type Error = io::Error;
53
54 fn poll_frame(
55 self: Pin<&mut Self>,
56 cx: &mut Context<'_>,
57 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
58 self.poll_next(cx).map_ok(http_body::Frame::data)
59 }
60}
61
62impl From<String> for Body {
63 fn from(t: String) -> Body {
64 Body {
65 inner: BodyType::Bytes(Some(Bytes::from(t))),
66 }
67 }
68}
69
70impl From<&str> for Body {
71 fn from(t: &str) -> Body {
72 Body {
73 inner: BodyType::Bytes(Some(Bytes::from(t.to_string()))),
74 }
75 }
76}
77
78impl From<Bytes> for Body {
79 fn from(t: Bytes) -> Body {
80 Body {
81 inner: BodyType::Bytes(Some(t)),
82 }
83 }
84}
85
86impl From<AsyncStream<Bytes, io::Error>> for Body {
87 fn from(s: AsyncStream<Bytes, io::Error>) -> Body {
88 Body {
89 inner: BodyType::AsyncStream(s),
90 }
91 }
92}
93
94use pin_project::pin_project;
95
96#[pin_project]
100pub(crate) struct StreamBody<B> {
101 #[pin]
102 body: B,
103}
104
105impl<ReqBody, ReqData, ReqError> HttpBody for StreamBody<ReqBody>
106where
107 ReqData: Buf + Send,
108 ReqError: StdError + Send + Sync + 'static,
109 ReqBody: Stream<Item = Result<ReqData, ReqError>>,
110{
111 type Data = ReqData;
112 type Error = ReqError;
113
114 fn poll_frame(
115 self: Pin<&mut Self>,
116 cx: &mut Context<'_>,
117 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
118 let this = self.project();
119 this.body.poll_next(cx).map_ok(http_body::Frame::data)
120 }
121}
122
123impl<ReqBody, ReqData, ReqError> StreamBody<ReqBody>
124where
125 ReqData: Buf + Send,
126 ReqError: StdError + Send + Sync + 'static,
127 ReqBody: Stream<Item = Result<ReqData, ReqError>>,
128{
129 pub fn new(body: ReqBody) -> StreamBody<ReqBody> {
130 StreamBody { body }
131 }
132}