1use futures::{future, TryFutureExt, TryStreamExt};
2
3use futures::stream;
4use futures::stream::Stream;
5use futures::stream::StreamExt;
6use std::future::Future;
7
8use bytes::Bytes;
9
10use crate::message::SimpleHttpMessage;
11use crate::solicit::header::Headers;
12use crate::solicit_async::*;
13
14use crate::data_or_headers::DataOrHeaders;
15use crate::data_or_headers_with_flag::DataOrHeadersWithFlag;
16use crate::data_or_headers_with_flag::DataOrHeadersWithFlagStream;
17use crate::data_or_trailers::*;
18use crate::error;
19use crate::result;
20use futures::task::Context;
21use std::pin::Pin;
22use std::task::Poll;
23
24pub struct Response(pub HttpFutureSend<(Headers, HttpStreamAfterHeaders)>);
26
27impl Response {
28 pub fn new<F>(future: F) -> Response
31 where
32 F: Future<Output = result::Result<(Headers, HttpStreamAfterHeaders)>> + Send + 'static,
33 {
34 Response(Box::pin(future))
35 }
36
37 pub fn headers_and_stream(headers: Headers, stream: HttpStreamAfterHeaders) -> Response {
38 Response::new(future::ok((headers, stream)))
39 }
40
41 pub fn headers_and_bytes_stream<S>(headers: Headers, content: S) -> Response
42 where
43 S: Stream<Item = result::Result<Bytes>> + Send + 'static,
44 {
45 Response::headers_and_stream(headers, HttpStreamAfterHeaders::bytes(content))
46 }
47
48 pub fn headers(headers: Headers) -> Response {
50 Response::headers_and_bytes_stream(headers, stream::empty())
51 }
52
53 pub fn headers_and_bytes<B: Into<Bytes>>(header: Headers, content: B) -> Response {
55 Response::headers_and_bytes_stream(header, stream::once(future::ok(content.into())))
56 }
57
58 pub fn message(message: SimpleHttpMessage) -> Response {
59 Response::headers_and_bytes(message.headers, message.body)
60 }
61
62 pub fn found_200_plain_text(body: &str) -> Response {
63 Response::message(SimpleHttpMessage::found_200_plain_text(body))
64 }
65
66 pub fn not_found_404() -> Response {
67 Response::headers(Headers::not_found_404())
68 }
69
70 pub fn redirect_302(location: &str) -> Response {
71 let mut headers = Headers::new_status(302);
72 headers.add("location", location);
73 Response::headers(headers)
74 }
75
76 pub fn from_stream<S>(mut stream: S) -> Response
77 where
78 S: Stream<Item = result::Result<DataOrHeadersWithFlag>> + Unpin + Send + 'static,
79 {
80 Response::new(async move {
81 let (first, rem) = match stream.try_next().await? {
83 Some(part) => match part.content {
84 DataOrHeaders::Headers(headers) => {
85 (headers, HttpStreamAfterHeaders::from_parts(stream))
86 }
87 DataOrHeaders::Data(..) => {
88 return Err(error::Error::InvalidFrame("data before headers".to_owned()))
89 }
90 },
91 None => {
92 return Err(error::Error::InvalidFrame(
93 "empty response, expecting headers".to_owned(),
94 ))
95 }
96 };
97 Ok((first, rem))
98 })
99 }
100
101 pub fn err(err: error::Error) -> Response {
102 Response::new(future::err(err))
103 }
104
105 pub fn into_stream_flag(self) -> HttpFutureStreamSend<DataOrHeadersWithFlag> {
108 Box::pin(
109 self.0
110 .map_ok(|(headers, rem)| {
111 let header = stream::once(future::ok(
113 DataOrHeadersWithFlag::intermediate_headers(headers),
114 ));
115 let rem = rem.into_flag_stream();
116 header.chain(rem)
117 })
118 .try_flatten_stream(),
119 )
120 }
121
122 pub fn into_stream(self) -> HttpFutureStreamSend<DataOrHeaders> {
123 Box::pin(TryStreamExt::map_ok(self.into_stream_flag(), |c| c.content))
124 }
125
126 pub fn into_part_stream(self) -> DataOrHeadersWithFlagStream {
127 DataOrHeadersWithFlagStream::new(self.into_stream_flag())
128 }
129
130 pub fn collect(self) -> HttpFutureSend<SimpleHttpMessage> {
131 Box::pin(
132 self.into_stream()
133 .try_fold(SimpleHttpMessage::new(), |mut c, p| {
134 c.add(p);
135 future::ok::<_, error::Error>(c)
136 }),
137 )
138 }
139}
140
141impl Future for Response {
142 type Output = result::Result<(Headers, HttpStreamAfterHeaders)>;
143
144 fn poll(
145 mut self: Pin<&mut Self>,
146 cx: &mut Context<'_>,
147 ) -> Poll<result::Result<(Headers, HttpStreamAfterHeaders)>> {
148 Pin::new(&mut self.0).poll(cx)
149 }
150}