async_web_client/http/
mod.rs

1use futures::{future::FusedFuture, ready, AsyncRead, AsyncReadExt, Future};
2use futures_rustls::rustls::ClientConfig;
3use std::fmt::{Debug, Formatter};
4use std::sync::Arc;
5use std::{
6    io,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use self::body::IntoNonUnitRequestBody;
12pub use self::body::IntoRequestBody;
13pub use self::error::HttpError;
14
15mod body;
16mod common;
17mod error;
18mod request_native;
19mod response_native;
20
21type RequestSendInner<'a> = request_native::RequestSend<'a>;
22
23pub trait RequestWithBodyExt<'a>: Sized {
24    type B: IntoNonUnitRequestBody;
25    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
26    fn send(self) -> RequestSend<'a> {
27        let client_config = crate::DEFAULT_CLIENT_CONFIG.clone();
28        self.send_with_client_config(client_config)
29    }
30    fn send_with_client_config(self, client_config: Arc<ClientConfig>) -> RequestSend<'a>;
31}
32
33pub trait RequestWithoutBodyExt<'a>: Sized {
34    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
35    fn send<B: IntoRequestBody + 'a>(&self, body: B) -> RequestSend<'a> {
36        let client_config = crate::DEFAULT_CLIENT_CONFIG.clone();
37        self.send_with_client_config(body, client_config)
38    }
39    fn send_with_client_config<B: IntoRequestBody + 'a>(&self, body: B, client_config: Arc<ClientConfig>) -> RequestSend<'a>;
40}
41
42pub trait RequestExt {
43    type Body;
44    fn swap_body<T>(self, body: T) -> (http::Request<T>, Self::Body);
45}
46
47impl<B> RequestExt for http::Request<B> {
48    type Body = B;
49    fn swap_body<T>(self, body: T) -> (http::Request<T>, Self::Body) {
50        let (head, old_body) = self.into_parts();
51        (http::Request::from_parts(head, body), old_body)
52    }
53}
54
55impl<'a, T: IntoNonUnitRequestBody + 'a> RequestWithBodyExt<'a> for http::Request<T> {
56    type B = T;
57    fn send_with_client_config(self, client_config: Arc<ClientConfig>) -> RequestSend<'a> {
58        let (this, body) = self.swap_body(());
59        this.send_with_client_config(body, client_config)
60    }
61}
62
63impl<'a> RequestWithoutBodyExt<'a> for http::Request<()> {
64    fn send_with_client_config<B: IntoRequestBody + 'a>(&self, body: B, client_config: Arc<ClientConfig>) -> RequestSend<'a> {
65        let (read, len) = body.into_request_body();
66        let body: (Pin<Box<dyn AsyncRead + Send>>, _) = (Box::pin(read), len);
67        let inner = RequestSendInner::new_with_client_config(self.clone(), body, client_config);
68        RequestSend { inner }
69    }
70}
71
72pub struct RequestSend<'a>
73where
74    Self: Send,
75{
76    inner: RequestSendInner<'a>,
77}
78
79impl Future for RequestSend<'_> {
80    type Output = Result<http::Response<ResponseBody>, HttpError>;
81
82    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let response = ready!(self.inner.poll(cx))?;
84        Ok(response.map(|inner| ResponseBody { inner })).into()
85    }
86}
87
88impl FusedFuture for RequestSend<'_> {
89    fn is_terminated(&self) -> bool {
90        self.inner.is_terminated()
91    }
92}
93
94type ResponseBodyInner = response_native::ResponseBodyInner;
95
96pub trait ResponseExt {
97    #[doc(hidden)]
98    fn body_as_async_read(&mut self) -> &mut ResponseBody;
99
100    #[allow(async_fn_in_trait)]
101    async fn body_vec(&mut self, limit: Option<usize>) -> io::Result<Vec<u8>> {
102        let mut buf = Vec::new();
103        match limit {
104            None => {
105                self.body_as_async_read().read_to_end(&mut buf).await?;
106            }
107            Some(l) => {
108                self.body_as_async_read().take(l as u64).read_to_end(&mut buf).await?;
109                if self.body_as_async_read().read(&mut [0u8]).await? > 0 {
110                    return Err(io::ErrorKind::OutOfMemory.into());
111                }
112            }
113        };
114        Ok(buf)
115    }
116
117    #[allow(async_fn_in_trait)]
118    async fn body_string(&mut self, limit: Option<usize>) -> io::Result<String> {
119        let mut buf = String::new();
120        match limit {
121            None => {
122                self.body_as_async_read().read_to_string(&mut buf).await?;
123            }
124            Some(l) => {
125                self.body_as_async_read().take(l as u64).read_to_string(&mut buf).await?;
126                if self.body_as_async_read().read(&mut [0u8]).await? > 0 {
127                    return Err(io::ErrorKind::OutOfMemory.into());
128                }
129            }
130        };
131        Ok(buf)
132    }
133}
134
135impl ResponseExt for http::Response<ResponseBody> {
136    #[doc(hidden)]
137    fn body_as_async_read(&mut self) -> &mut ResponseBody {
138        self.body_mut()
139    }
140}
141
142pub struct ResponseBody {
143    inner: ResponseBodyInner,
144}
145
146#[cfg(feature = "websocket")]
147impl ResponseBody {
148    pub(crate) fn into_inner(self) -> Result<(async_http_codec::BodyDecodeState, crate::Transport), HttpError> {
149        self.inner.into_inner()
150    }
151}
152
153impl AsyncRead for ResponseBody {
154    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
155        Pin::new(&mut self.inner).poll_read(cx, buf)
156    }
157}
158
159impl Debug for ResponseBody {
160    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct("ResponseBody").finish()
162    }
163}