Skip to main content

async_web_client/http/
mod.rs

1use self::body::IntoNonUnitRequestBody;
2pub use self::body::IntoRequestBody;
3pub use self::error::HttpError;
4use futures::{future::FusedFuture, ready, AsyncRead, AsyncReadExt, Future};
5use futures_rustls::rustls::ClientConfig;
6use serde::de::DeserializeOwned;
7use std::fmt::{Debug, Formatter};
8use std::io::ErrorKind::InvalidData;
9use std::sync::Arc;
10use std::{
11    io,
12    pin::Pin,
13    task::{Context, Poll},
14};
15mod body;
16mod common;
17mod error;
18mod request_native;
19mod response_native;
20
21type RequestSendInner<'a> = request_native::RequestSend<'a>;
22
23pub trait RequestWithBodyExt<'a>: Sized {
24    type B: IntoNonUnitRequestBody;
25    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
26    fn send(self) -> RequestSend<'a> {
27        let client_config = crate::DEFAULT_CLIENT_CONFIG.clone();
28        self.send_with_client_config(client_config)
29    }
30    fn send_with_client_config(self, client_config: Arc<ClientConfig>) -> RequestSend<'a>;
31}
32
33pub trait RequestWithoutBodyExt<'a>: Sized {
34    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
35    fn send<B: IntoRequestBody + 'a>(&self, body: B) -> RequestSend<'a> {
36        let client_config = crate::DEFAULT_CLIENT_CONFIG.clone();
37        self.send_with_client_config(body, client_config)
38    }
39    fn send_with_client_config<B: IntoRequestBody + 'a>(&self, body: B, client_config: Arc<ClientConfig>) -> RequestSend<'a>;
40}
41
42pub trait RequestExt {
43    type Body;
44    fn swap_body<T>(self, body: T) -> (http::Request<T>, Self::Body);
45}
46
47impl<B> RequestExt for http::Request<B> {
48    type Body = B;
49    fn swap_body<T>(self, body: T) -> (http::Request<T>, Self::Body) {
50        let (head, old_body) = self.into_parts();
51        (http::Request::from_parts(head, body), old_body)
52    }
53}
54
55impl<'a, T: IntoNonUnitRequestBody + 'a> RequestWithBodyExt<'a> for http::Request<T> {
56    type B = T;
57    fn send_with_client_config(self, client_config: Arc<ClientConfig>) -> RequestSend<'a> {
58        let (this, body) = self.swap_body(());
59        this.send_with_client_config(body, client_config)
60    }
61}
62
63impl<'a> RequestWithoutBodyExt<'a> for http::Request<()> {
64    fn send_with_client_config<B: IntoRequestBody + 'a>(&self, body: B, client_config: Arc<ClientConfig>) -> RequestSend<'a> {
65        let (read, len) = body.into_request_body();
66        let body: (Pin<Box<dyn AsyncRead + Send>>, _) = (Box::pin(read), len);
67        let inner = RequestSendInner::new_with_client_config(self.clone(), body, client_config);
68        RequestSend { inner }
69    }
70}
71
72pub struct RequestSend<'a>
73where
74    Self: Send,
75{
76    inner: RequestSendInner<'a>,
77}
78
79impl Future for RequestSend<'_> {
80    type Output = Result<http::Response<ResponseBody>, HttpError>;
81
82    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let response = ready!(self.inner.poll(cx))?;
84        Ok(response.map(|inner| ResponseBody { inner })).into()
85    }
86}
87
88impl FusedFuture for RequestSend<'_> {
89    fn is_terminated(&self) -> bool {
90        self.inner.is_terminated()
91    }
92}
93
94type ResponseBodyInner = response_native::ResponseBodyInner;
95
96pub struct ResponseBody {
97    inner: ResponseBodyInner,
98}
99
100#[cfg(feature = "websocket")]
101impl ResponseBody {
102    pub(crate) fn into_inner(self) -> Result<(async_http_codec::BodyDecodeState, crate::Transport), HttpError> {
103        self.inner.into_inner()
104    }
105}
106impl ResponseBody {
107    pub async fn bytes(&mut self, limit: Option<usize>) -> Result<Vec<u8>, io::Error> {
108        let mut result = Vec::new();
109        match limit {
110            None => {
111                self.read_to_end(&mut result).await?;
112            }
113            Some(l) => {
114                self.take(l as u64).read_to_end(&mut result).await?;
115                if self.read(&mut [0u8]).await? > 0 {
116                    return Err(io::ErrorKind::OutOfMemory.into());
117                }
118            }
119        };
120        Ok(result)
121    }
122
123    pub async fn string(&mut self, limit: Option<usize>) -> Result<String, io::Error> {
124        let mut result = String::new();
125        match limit {
126            None => {
127                self.read_to_string(&mut result).await?;
128            }
129            Some(l) => {
130                self.take(l as u64).read_to_string(&mut result).await?;
131                if self.read(&mut [0u8]).await? > 0 {
132                    return Err(io::ErrorKind::OutOfMemory.into());
133                }
134            }
135        };
136        Ok(result)
137    }
138    #[cfg(feature = "json")]
139    pub async fn json<T: DeserializeOwned>(&mut self, limit: Option<usize>) -> Result<T, io::Error> {
140        let json_string = self.string(limit).await?;
141        let result = serde_json::from_str(&json_string).map_err(|error| HttpError::IoError(io::Error::new(InvalidData, error).into()))?;
142        Ok(result)
143    }
144}
145
146impl AsyncRead for ResponseBody {
147    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
148        Pin::new(&mut self.inner).poll_read(cx, buf)
149    }
150}
151
152impl Debug for ResponseBody {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("ResponseBody").finish()
155    }
156}