1use std::error::Error as StdError;
4use std::io;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::{Buf, Bytes};
9use futures_util::stream::Stream;
10use http_body::Body as HttpBody;
11use pin_project_lite::pin_project;
12
13use crate::async_stream::AsyncStream;
14
15pub struct Body {
18 pub inner: BodyType,
19}
20
21pub enum BodyType {
22 Bytes(Option<Bytes>),
23 AsyncStream(AsyncStream<Bytes, io::Error>),
24 Empty,
25}
26
27impl Body {
28 pub fn empty() -> Body {
30 Body {
31 inner: BodyType::Empty,
32 }
33 }
34}
35
36impl Stream for Body {
37 type Item = io::Result<Bytes>;
38
39 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
40 match self.inner {
41 BodyType::Bytes(ref mut strm) => Poll::Ready(strm.take().map(Ok)),
42 BodyType::AsyncStream(ref mut strm) => {
43 let strm = Pin::new(strm);
44 strm.poll_next(cx)
45 }
46 BodyType::Empty => Poll::Ready(None),
47 }
48 }
49}
50
51impl HttpBody for Body {
52 type Data = Bytes;
53 type Error = io::Error;
54
55 fn poll_frame(
56 self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
59 self.poll_next(cx).map_ok(http_body::Frame::data)
60 }
61}
62
63impl From<String> for Body {
64 fn from(t: String) -> Body {
65 Body {
66 inner: BodyType::Bytes(Some(Bytes::from(t))),
67 }
68 }
69}
70
71impl From<&str> for Body {
72 fn from(t: &str) -> Body {
73 Body {
74 inner: BodyType::Bytes(Some(Bytes::from(t.to_string()))),
75 }
76 }
77}
78
79impl From<Bytes> for Body {
80 fn from(t: Bytes) -> Body {
81 Body {
82 inner: BodyType::Bytes(Some(t)),
83 }
84 }
85}
86
87impl From<AsyncStream<Bytes, io::Error>> for Body {
88 fn from(s: AsyncStream<Bytes, io::Error>) -> Body {
89 Body {
90 inner: BodyType::AsyncStream(s),
91 }
92 }
93}
94
95pin_project! {
96 pub(crate) struct StreamBody<B> {
100 #[pin]
101 body: B,
102 }
103}
104
105impl<ReqBody, ReqData, ReqError> HttpBody for StreamBody<ReqBody>
106where
107 ReqData: Buf + Send,
108 ReqError: StdError + Send + Sync + 'static,
109 ReqBody: Stream<Item = Result<ReqData, ReqError>>,
110{
111 type Data = ReqData;
112 type Error = ReqError;
113
114 fn poll_frame(
115 self: Pin<&mut Self>,
116 cx: &mut Context<'_>,
117 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
118 let this = self.project();
119 this.body.poll_next(cx).map_ok(http_body::Frame::data)
120 }
121}
122
123impl<ReqBody, ReqData, ReqError> StreamBody<ReqBody>
124where
125 ReqData: Buf + Send,
126 ReqError: StdError + Send + Sync + 'static,
127 ReqBody: Stream<Item = Result<ReqData, ReqError>>,
128{
129 pub fn new(body: ReqBody) -> StreamBody<ReqBody> {
130 StreamBody { body }
131 }
132}