1use std::error::Error as StdError;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::{Buf, Bytes};
9use futures::stream::Stream;
10use http::header::HeaderMap;
11use http_body::Body as HttpBody;
12
13use crate::async_stream::AsyncStream;
14
15pub struct Body {
18 pub(crate) inner: BodyType,
19}
20
21pub(crate) enum BodyType {
22 Bytes(Option<Bytes>),
23 AsyncStream(AsyncStream<Bytes, io::Error>),
24 Empty,
25}
26
27impl Body {
28 pub fn empty() -> Body {
30 Body {
31 inner: BodyType::Empty,
32 }
33 }
34}
35
36impl Stream for Body {
37 type Item = io::Result<Bytes>;
38
39 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
40 match self.inner {
41 BodyType::Bytes(ref mut strm) => Poll::Ready(strm.take().map(|b| Ok(b))),
42 BodyType::AsyncStream(ref mut strm) => {
43 let strm = Pin::new(strm);
44 strm.poll_next(cx)
45 },
46 BodyType::Empty => Poll::Ready(None),
47 }
48 }
49}
50
51impl HttpBody for Body {
52 type Data = Bytes;
53 type Error = io::Error;
54
55 fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Self::Data, Self::Error>>> {
56 self.poll_next(cx)
57 }
58
59 fn poll_trailers(
60 self: Pin<&mut Self>,
61 _cx: &mut Context,
62 ) -> Poll<Result<Option<HeaderMap>, Self::Error>>
63 {
64 Poll::Ready(Ok(None))
65 }
66}
67
68impl From<String> for Body {
69 fn from(t: String) -> Body {
70 Body {
71 inner: BodyType::Bytes(Some(Bytes::from(t))),
72 }
73 }
74}
75
76impl From<&str> for Body {
77 fn from(t: &str) -> Body {
78 Body {
79 inner: BodyType::Bytes(Some(Bytes::from(t.to_string()))),
80 }
81 }
82}
83
84impl From<Bytes> for Body {
85 fn from(t: Bytes) -> Body {
86 Body {
87 inner: BodyType::Bytes(Some(t)),
88 }
89 }
90}
91
92impl From<AsyncStream<Bytes, io::Error>> for Body {
93 fn from(s: AsyncStream<Bytes, io::Error>) -> Body {
94 Body {
95 inner: BodyType::AsyncStream(s),
96 }
97 }
98}
99
100use pin_project::pin_project;
101
102#[pin_project]
106pub(crate) struct StreamBody<B> {
107 #[pin]
108 body: B,
109}
110
111impl<ReqBody, ReqData, ReqError> HttpBody for StreamBody<ReqBody>
112where
113 ReqData: Buf + Send,
114 ReqError: StdError + Send + Sync + 'static,
115 ReqBody: Stream<Item = Result<ReqData, ReqError>>,
116{
117 type Data = ReqData;
118 type Error = ReqError;
119
120 fn poll_data(
121 self: Pin<&mut Self>,
122 cx: &mut Context<'_>,
123 ) -> Poll<Option<Result<Self::Data, Self::Error>>>
124 {
125 let this = self.project();
126 this.body.poll_next(cx)
127 }
128
129 fn poll_trailers(
130 self: Pin<&mut Self>,
131 _cx: &mut Context,
132 ) -> Poll<Result<Option<HeaderMap>, Self::Error>>
133 {
134 Poll::Ready(Ok(None))
135 }
136}
137
138impl<ReqBody, ReqData, ReqError> StreamBody<ReqBody>
139where
140 ReqData: Buf + Send,
141 ReqError: StdError + Send + Sync + 'static,
142 ReqBody: Stream<Item = Result<ReqData, ReqError>>,
143{
144 pub fn new(body: ReqBody) -> StreamBody<ReqBody> {
145 StreamBody { body }
146 }
147}