generic_async_http_client/hyper/
mod.rs1use std::{
2 convert::{Infallible, TryFrom},
3 str::FromStr,
4};
5
6use serde::Serialize;
7
8pub use hyper::{
9 body::Incoming,
10 header::{HeaderName, HeaderValue},
11};
12use hyper::{
13 body::{Body as BodyTrait, Bytes, Frame, SizeHint},
14 header::{InvalidHeaderName, InvalidHeaderValue, CONTENT_TYPE},
15 http::{
16 method::{InvalidMethod, Method},
17 request::Builder,
18 uri::{Builder as UriBuilder, InvalidUri, PathAndQuery, Uri},
19 Error as HTTPError,
20 },
21 Error as HyperError, Request, Response,
22};
23use std::mem::take;
24
25mod connector;
26pub(crate) use connector::HyperClient;
27
28pub(crate) fn get_client() -> HyperClient {
29 HyperClient::default()
30}
31
32#[derive(Debug)]
33pub struct Req {
34 req: Builder,
35 body: Body,
36 pub(crate) client: Option<HyperClient>,
37}
38pub struct Resp {
39 resp: Response<Incoming>,
40}
41
42impl From<crate::Response> for Response<Incoming> {
43 fn from(val: crate::Response) -> Self {
44 val.0.resp
45 }
46}
47
48impl<M, U> TryFrom<(M, U)> for crate::Request
49where
50 Method: TryFrom<M>,
51 <Method as TryFrom<M>>::Error: Into<HTTPError>,
52 Uri: TryFrom<U>,
53 <Uri as TryFrom<U>>::Error: Into<HTTPError>,
54{
55 type Error = Infallible;
56
57 fn try_from(value: (M, U)) -> Result<Self, Self::Error> {
58 let req = Builder::new().method(value.0).uri(value.1);
59
60 Ok(crate::Request(Req {
61 req,
62 body: Body::empty(),
63 client: None,
64 }))
65 }
66}
67
68impl Req {
69 pub fn get(uri: &str) -> Req {
70 Self::init(Method::GET, uri)
71 }
72 pub fn post(uri: &str) -> Req {
73 Self::init(Method::POST, uri)
74 }
75 pub fn put(uri: &str) -> Req {
76 Self::init(Method::PUT, uri)
77 }
78 pub fn delete(uri: &str) -> Req {
79 Self::init(Method::DELETE, uri)
80 }
81 pub fn head(uri: &str) -> Req {
82 Self::init(Method::HEAD, uri)
83 }
84 pub fn options(uri: &str) -> Req {
85 Self::init(Method::OPTIONS, uri)
86 }
87 pub fn new(meth: &str, uri: &str) -> Result<Req, Error> {
88 Ok(Self::init(Method::from_str(meth)?, uri))
89 }
90 fn init(method: Method, uri: &str) -> Req {
91 let req = Builder::new().method(method).uri(uri);
92
93 Req {
94 req,
95 body: Body::empty(),
96 client: None,
97 }
98 }
99 pub async fn send_request(mut self) -> Result<Resp, Error> {
100 let req = self.req.body(self.body)?;
101
102 let resp = if let Some(mut client) = self.client.take() {
103 client.request(req).await?
104 } else {
105 get_client().request(req).await?
106 };
107 Ok(Resp { resp })
108 }
109 pub fn json<T: Serialize + ?Sized>(&mut self, json: &T) -> Result<(), Error> {
110 let bytes = serde_json::to_string(&json)?;
111 self.set_header(CONTENT_TYPE, HeaderValue::from_static("application/json"))?;
112 self.body = bytes.into();
113 Ok(())
114 }
115 pub fn form<T: Serialize + ?Sized>(&mut self, data: &T) -> Result<(), Error> {
116 let query = serde_urlencoded::to_string(data)?;
117 self.set_header(
118 CONTENT_TYPE,
119 HeaderValue::from_static("application/x-www-form-urlencoded"),
120 )?;
121 self.body = query.into();
122 Ok(())
123 }
124 #[inline]
125 pub fn query<T: Serialize + ?Sized>(&mut self, query: &T) -> Result<(), Error> {
126 self._query(serde_qs::to_string(&query)?)
128 }
129 fn _query(&mut self, query: String) -> Result<(), Error> {
130 let old = self.req.uri_ref().expect("no uri");
131
132 let mut p_and_p = String::with_capacity(old.path().len() + query.len() + 1);
133 p_and_p.push_str(old.path());
134 p_and_p.push('?');
135 p_and_p.push_str(&query);
136
137 let path_and_query = PathAndQuery::from_str(&p_and_p)?;
138
139 let new = UriBuilder::new()
140 .scheme(old.scheme_str().unwrap())
141 .authority(old.authority().unwrap().as_str())
142 .path_and_query(path_and_query)
143 .build()?;
144
145 self.req = take(&mut self.req).uri(new);
146 Ok(())
147 }
148 pub fn body<B: Into<Body>>(&mut self, body: B) -> Result<(), Error> {
149 self.body = body.into();
150 Ok(())
151 }
152 pub fn set_header(&mut self, name: HeaderName, value: HeaderValue) -> Result<(), Error> {
153 self.req.headers_mut().map(|hm| hm.insert(name, value));
154 Ok(())
155 }
156 pub fn add_header(&mut self, name: HeaderName, value: HeaderValue) -> Result<(), Error> {
157 self.req = take(&mut self.req).header(name, value);
158 Ok(())
159 }
160}
161use hyper::body::Buf;
162use serde::de::DeserializeOwned;
163impl Resp {
164 pub fn status(&self) -> u16 {
165 self.resp.status().as_u16()
166 }
167 pub fn status_str(&self) -> &'static str {
168 self.resp.status().canonical_reason().unwrap_or("")
169 }
170 pub async fn json<D: DeserializeOwned>(&mut self) -> Result<D, Error> {
171 let reader = aggregate(self.resp.body_mut()).await?.reader();
172 Ok(serde_json::from_reader(reader)?)
173 }
174 pub async fn bytes(&mut self) -> Result<Vec<u8>, Error> {
175 let mut b = aggregate(self.resp.body_mut()).await?;
176 let capacity = b.remaining();
177 let mut v = Vec::with_capacity(capacity);
178 let ptr = v.spare_capacity_mut().as_mut_ptr();
179 let mut off = 0;
180 while off < capacity {
181 let cnt;
182 unsafe {
183 let src = b.chunk();
184 cnt = src.len();
185 std::ptr::copy_nonoverlapping(src.as_ptr(), ptr.add(off).cast(), cnt);
186 off += cnt;
187 }
188 b.advance(cnt);
189 }
190 unsafe {
191 v.set_len(capacity);
192 }
193 Ok(v)
194 }
195 pub async fn string(&mut self) -> Result<String, Error> {
196 let b = self.bytes().await?;
197 Ok(String::from_utf8_lossy(&b).to_string())
198 }
199 pub fn get_header(&self, name: HeaderName) -> Option<&HeaderValue> {
200 self.resp.headers().get(name)
201 }
202 pub fn header_iter(&self) -> impl Iterator<Item = (&HeaderName, &HeaderValue)> {
203 self.resp.headers().into_iter()
204 }
205}
206
207struct FracturedBuf(std::collections::VecDeque<Bytes>);
208impl Buf for FracturedBuf {
209 fn remaining(&self) -> usize {
210 self.0.iter().map(|buf| buf.remaining()).sum()
211 }
212 fn chunk(&self) -> &[u8] {
213 self.0.front().map(Buf::chunk).unwrap_or_default()
214 }
215 fn advance(&mut self, mut cnt: usize) {
216 let bufs = &mut self.0;
217 while cnt > 0 {
218 if let Some(front) = bufs.front_mut() {
219 let rem = front.remaining();
220 if rem > cnt {
221 front.advance(cnt);
222 return;
223 } else {
224 front.advance(rem);
225 cnt -= rem;
226 }
227 } else {
228 return;
230 }
231 bufs.pop_front();
232 }
233 }
234}
235struct Framed<'a>(&'a mut Incoming);
236
237impl<'a> futures::Future for Framed<'a> {
238 type Output = Option<Result<hyper::body::Frame<Bytes>, hyper::Error>>;
239
240 fn poll(
241 mut self: std::pin::Pin<&mut Self>,
242 ctx: &mut std::task::Context<'_>,
243 ) -> std::task::Poll<Self::Output> {
244 std::pin::Pin::new(&mut self.0).poll_frame(ctx)
245 }
246}
247async fn aggregate(body: &mut Incoming) -> Result<FracturedBuf, Error> {
248 let mut v = std::collections::VecDeque::new();
249 while let Some(f) = Framed(body).await {
250 if let Ok(d) = f?.into_data() {
251 v.push_back(d);
252 }
253 }
254 Ok(FracturedBuf(v))
255}
256
257#[derive(Debug)]
258pub enum Error {
259 Scheme,
260 Http(HTTPError),
261 InvalidQueryString(serde_qs::Error),
262 InvalidMethod(InvalidMethod),
263 Hyper(HyperError),
264 Json(serde_json::Error),
265 InvalidHeaderValue(InvalidHeaderValue),
266 InvalidHeaderName(InvalidHeaderName),
267 InvalidUri(InvalidUri),
268 Urlencoded(serde_urlencoded::ser::Error),
269 Io(std::io::Error),
270}
271impl std::error::Error for Error {}
272use std::fmt;
273impl fmt::Display for Error {
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 match self {
276 Error::Scheme => write!(f, "Scheme"),
277 Error::Http(i) => write!(f, "{}", i),
278 Error::InvalidQueryString(i) => write!(f, "{}", i),
279 Error::InvalidMethod(i) => write!(f, "{}", i),
280 Error::Hyper(i) => write!(f, "{}", i),
281 Error::Json(i) => write!(f, "{}", i),
282 Error::InvalidHeaderValue(i) => write!(f, "{}", i),
283 Error::InvalidHeaderName(i) => write!(f, "{}", i),
284 Error::InvalidUri(i) => write!(f, "{}", i),
285 Error::Urlencoded(i) => write!(f, "{}", i),
286 Error::Io(i) => write!(f, "{}", i),
287 }
288 }
289}
290
291impl From<std::io::Error> for Error {
292 fn from(e: std::io::Error) -> Self {
293 Self::Io(e)
294 }
295}
296impl From<serde_urlencoded::ser::Error> for Error {
297 fn from(e: serde_urlencoded::ser::Error) -> Self {
298 Self::Urlencoded(e)
299 }
300}
301impl From<InvalidUri> for Error {
302 fn from(e: InvalidUri) -> Self {
303 Self::InvalidUri(e)
304 }
305}
306impl From<InvalidHeaderName> for Error {
307 fn from(e: InvalidHeaderName) -> Self {
308 Self::InvalidHeaderName(e)
309 }
310}
311
312impl From<InvalidHeaderValue> for Error {
313 fn from(e: InvalidHeaderValue) -> Self {
314 Self::InvalidHeaderValue(e)
315 }
316}
317impl From<serde_json::Error> for Error {
318 fn from(e: serde_json::Error) -> Self {
319 Self::Json(e)
320 }
321}
322impl From<HyperError> for Error {
323 fn from(e: HyperError) -> Self {
324 Self::Hyper(e)
325 }
326}
327impl From<InvalidMethod> for Error {
328 fn from(e: InvalidMethod) -> Self {
329 Self::InvalidMethod(e)
330 }
331}
332impl From<HTTPError> for Error {
333 fn from(e: HTTPError) -> Self {
334 Self::Http(e)
335 }
336}
337impl From<serde_qs::Error> for Error {
338 fn from(e: serde_qs::Error) -> Self {
339 Self::InvalidQueryString(e)
340 }
341}
342impl From<std::convert::Infallible> for Error {
343 fn from(_e: std::convert::Infallible) -> Self {
344 unreachable!();
345 }
346}
347
348#[derive(Debug)]
349pub struct Body(Vec<u8>);
350impl Body {
351 fn empty() -> Self {
352 Self(vec![])
353 }
354}
355impl From<String> for Body {
356 #[inline]
357 fn from(t: String) -> Self {
358 Body(t.into_bytes())
359 }
360}
361impl From<Vec<u8>> for Body {
362 #[inline]
363 fn from(t: Vec<u8>) -> Self {
364 Body(t)
365 }
366}
367impl From<&'static [u8]> for Body {
368 #[inline]
369 fn from(t: &'static [u8]) -> Self {
370 Body(t.to_vec())
371 }
372}
373impl From<&'static str> for Body {
374 #[inline]
375 fn from(t: &'static str) -> Self {
376 Body(t.as_bytes().to_vec())
377 }
378}
379impl hyper::body::Body for Body {
380 type Data = Bytes;
381 type Error = Infallible;
382
383 fn poll_frame(
384 mut self: std::pin::Pin<&mut Self>,
385 _cx: &mut std::task::Context<'_>,
386 ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
387 if self.0.is_empty() {
388 std::task::Poll::Ready(None)
389 } else {
390 let v: Vec<u8> = std::mem::take(self.0.as_mut());
391 std::task::Poll::Ready(Some(Ok(Frame::data(v.into()))))
392 }
393 }
394 fn size_hint(&self) -> SizeHint {
395 SizeHint::with_exact(self.0.len() as u64)
396 }
397}