async_web_client/http/
mod.rs1use self::body::IntoNonUnitRequestBody;
2pub use self::body::IntoRequestBody;
3pub use self::error::HttpError;
4use futures::{future::FusedFuture, ready, AsyncRead, AsyncReadExt, Future};
5use futures_rustls::rustls::ClientConfig;
6use serde::de::DeserializeOwned;
7use std::fmt::{Debug, Formatter};
8use std::io::ErrorKind::InvalidData;
9use std::sync::Arc;
10use std::{
11 io,
12 pin::Pin,
13 task::{Context, Poll},
14};
15mod body;
16mod common;
17mod error;
18mod request_native;
19mod response_native;
20
21type RequestSendInner<'a> = request_native::RequestSend<'a>;
22
23pub trait RequestWithBodyExt<'a>: Sized {
24 type B: IntoNonUnitRequestBody;
25 #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
26 fn send(self) -> RequestSend<'a> {
27 let client_config = crate::DEFAULT_CLIENT_CONFIG.clone();
28 self.send_with_client_config(client_config)
29 }
30 fn send_with_client_config(self, client_config: Arc<ClientConfig>) -> RequestSend<'a>;
31}
32
33pub trait RequestWithoutBodyExt<'a>: Sized {
34 #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
35 fn send<B: IntoRequestBody + 'a>(&self, body: B) -> RequestSend<'a> {
36 let client_config = crate::DEFAULT_CLIENT_CONFIG.clone();
37 self.send_with_client_config(body, client_config)
38 }
39 fn send_with_client_config<B: IntoRequestBody + 'a>(&self, body: B, client_config: Arc<ClientConfig>) -> RequestSend<'a>;
40}
41
42pub trait RequestExt {
43 type Body;
44 fn swap_body<T>(self, body: T) -> (http::Request<T>, Self::Body);
45}
46
47impl<B> RequestExt for http::Request<B> {
48 type Body = B;
49 fn swap_body<T>(self, body: T) -> (http::Request<T>, Self::Body) {
50 let (head, old_body) = self.into_parts();
51 (http::Request::from_parts(head, body), old_body)
52 }
53}
54
55impl<'a, T: IntoNonUnitRequestBody + 'a> RequestWithBodyExt<'a> for http::Request<T> {
56 type B = T;
57 fn send_with_client_config(self, client_config: Arc<ClientConfig>) -> RequestSend<'a> {
58 let (this, body) = self.swap_body(());
59 this.send_with_client_config(body, client_config)
60 }
61}
62
63impl<'a> RequestWithoutBodyExt<'a> for http::Request<()> {
64 fn send_with_client_config<B: IntoRequestBody + 'a>(&self, body: B, client_config: Arc<ClientConfig>) -> RequestSend<'a> {
65 let (read, len) = body.into_request_body();
66 let body: (Pin<Box<dyn AsyncRead + Send>>, _) = (Box::pin(read), len);
67 let inner = RequestSendInner::new_with_client_config(self.clone(), body, client_config);
68 RequestSend { inner }
69 }
70}
71
72pub struct RequestSend<'a>
73where
74 Self: Send,
75{
76 inner: RequestSendInner<'a>,
77}
78
79impl Future for RequestSend<'_> {
80 type Output = Result<http::Response<ResponseBody>, HttpError>;
81
82 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 let response = ready!(self.inner.poll(cx))?;
84 Ok(response.map(|inner| ResponseBody { inner })).into()
85 }
86}
87
88impl FusedFuture for RequestSend<'_> {
89 fn is_terminated(&self) -> bool {
90 self.inner.is_terminated()
91 }
92}
93
94type ResponseBodyInner = response_native::ResponseBodyInner;
95
96pub struct ResponseBody {
97 inner: ResponseBodyInner,
98}
99
100#[cfg(feature = "websocket")]
101impl ResponseBody {
102 pub(crate) fn into_inner(self) -> Result<(async_http_codec::BodyDecodeState, crate::Transport), HttpError> {
103 self.inner.into_inner()
104 }
105}
106impl ResponseBody {
107 pub async fn bytes(&mut self, limit: Option<usize>) -> Result<Vec<u8>, io::Error> {
108 let mut result = Vec::new();
109 match limit {
110 None => {
111 self.read_to_end(&mut result).await?;
112 }
113 Some(l) => {
114 self.take(l as u64).read_to_end(&mut result).await?;
115 if self.read(&mut [0u8]).await? > 0 {
116 return Err(io::ErrorKind::OutOfMemory.into());
117 }
118 }
119 };
120 Ok(result)
121 }
122
123 pub async fn string(&mut self, limit: Option<usize>) -> Result<String, io::Error> {
124 let mut result = String::new();
125 match limit {
126 None => {
127 self.read_to_string(&mut result).await?;
128 }
129 Some(l) => {
130 self.take(l as u64).read_to_string(&mut result).await?;
131 if self.read(&mut [0u8]).await? > 0 {
132 return Err(io::ErrorKind::OutOfMemory.into());
133 }
134 }
135 };
136 Ok(result)
137 }
138 #[cfg(feature = "json")]
139 pub async fn json<T: DeserializeOwned>(&mut self, limit: Option<usize>) -> Result<T, io::Error> {
140 let json_string = self.string(limit).await?;
141 let result = serde_json::from_str(&json_string).map_err(|error| HttpError::IoError(io::Error::new(InvalidData, error).into()))?;
142 Ok(result)
143 }
144}
145
146impl AsyncRead for ResponseBody {
147 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
148 Pin::new(&mut self.inner).poll_read(cx, buf)
149 }
150}
151
152impl Debug for ResponseBody {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("ResponseBody").finish()
155 }
156}