Skip to main content

httpbis/
resp.rs

1use futures::{future, TryFutureExt, TryStreamExt};
2
3use futures::stream;
4use futures::stream::Stream;
5use futures::stream::StreamExt;
6use std::future::Future;
7
8use bytes::Bytes;
9
10use crate::message::SimpleHttpMessage;
11use crate::solicit::header::Headers;
12use crate::solicit_async::*;
13
14use crate::data_or_headers::DataOrHeaders;
15use crate::data_or_headers_with_flag::DataOrHeadersWithFlag;
16use crate::data_or_headers_with_flag::DataOrHeadersWithFlagStream;
17use crate::data_or_trailers::*;
18use crate::error;
19use crate::result;
20use futures::task::Context;
21use std::pin::Pin;
22use std::task::Poll;
23
24/// Convenient wrapper around async HTTP response future/stream
25pub struct Response(pub HttpFutureSend<(Headers, HttpStreamAfterHeaders)>);
26
27impl Response {
28    // constructors
29
30    pub fn new<F>(future: F) -> Response
31    where
32        F: Future<Output = result::Result<(Headers, HttpStreamAfterHeaders)>> + Send + 'static,
33    {
34        Response(Box::pin(future))
35    }
36
37    pub fn headers_and_stream(headers: Headers, stream: HttpStreamAfterHeaders) -> Response {
38        Response::new(future::ok((headers, stream)))
39    }
40
41    pub fn headers_and_bytes_stream<S>(headers: Headers, content: S) -> Response
42    where
43        S: Stream<Item = result::Result<Bytes>> + Send + 'static,
44    {
45        Response::headers_and_stream(headers, HttpStreamAfterHeaders::bytes(content))
46    }
47
48    /// Create a response with only headers
49    pub fn headers(headers: Headers) -> Response {
50        Response::headers_and_bytes_stream(headers, stream::empty())
51    }
52
53    /// Create a response with headers and response body
54    pub fn headers_and_bytes<B: Into<Bytes>>(header: Headers, content: B) -> Response {
55        Response::headers_and_bytes_stream(header, stream::once(future::ok(content.into())))
56    }
57
58    pub fn message(message: SimpleHttpMessage) -> Response {
59        Response::headers_and_bytes(message.headers, message.body)
60    }
61
62    pub fn found_200_plain_text(body: &str) -> Response {
63        Response::message(SimpleHttpMessage::found_200_plain_text(body))
64    }
65
66    pub fn not_found_404() -> Response {
67        Response::headers(Headers::not_found_404())
68    }
69
70    pub fn redirect_302(location: &str) -> Response {
71        let mut headers = Headers::new_status(302);
72        headers.add("location", location);
73        Response::headers(headers)
74    }
75
76    pub fn from_stream<S>(mut stream: S) -> Response
77    where
78        S: Stream<Item = result::Result<DataOrHeadersWithFlag>> + Unpin + Send + 'static,
79    {
80        Response::new(async move {
81            // Check that first frame is HEADERS
82            let (first, rem) = match stream.try_next().await? {
83                Some(part) => match part.content {
84                    DataOrHeaders::Headers(headers) => {
85                        (headers, HttpStreamAfterHeaders::from_parts(stream))
86                    }
87                    DataOrHeaders::Data(..) => {
88                        return Err(error::Error::InvalidFrame("data before headers".to_owned()))
89                    }
90                },
91                None => {
92                    return Err(error::Error::InvalidFrame(
93                        "empty response, expecting headers".to_owned(),
94                    ))
95                }
96            };
97            Ok((first, rem))
98        })
99    }
100
101    pub fn err(err: error::Error) -> Response {
102        Response::new(future::err(err))
103    }
104
105    // getters
106
107    pub fn into_stream_flag(self) -> HttpFutureStreamSend<DataOrHeadersWithFlag> {
108        Box::pin(
109            self.0
110                .map_ok(|(headers, rem)| {
111                    // NOTE: flag may be wrong for first item
112                    let header = stream::once(future::ok(
113                        DataOrHeadersWithFlag::intermediate_headers(headers),
114                    ));
115                    let rem = rem.into_flag_stream();
116                    header.chain(rem)
117                })
118                .try_flatten_stream(),
119        )
120    }
121
122    pub fn into_stream(self) -> HttpFutureStreamSend<DataOrHeaders> {
123        Box::pin(TryStreamExt::map_ok(self.into_stream_flag(), |c| c.content))
124    }
125
126    pub fn into_part_stream(self) -> DataOrHeadersWithFlagStream {
127        DataOrHeadersWithFlagStream::new(self.into_stream_flag())
128    }
129
130    pub fn collect(self) -> HttpFutureSend<SimpleHttpMessage> {
131        Box::pin(
132            self.into_stream()
133                .try_fold(SimpleHttpMessage::new(), |mut c, p| {
134                    c.add(p);
135                    future::ok::<_, error::Error>(c)
136                }),
137        )
138    }
139}
140
141impl Future for Response {
142    type Output = result::Result<(Headers, HttpStreamAfterHeaders)>;
143
144    fn poll(
145        mut self: Pin<&mut Self>,
146        cx: &mut Context<'_>,
147    ) -> Poll<result::Result<(Headers, HttpStreamAfterHeaders)>> {
148        Pin::new(&mut self.0).poll(cx)
149    }
150}