generic_async_http_client/hyper/
mod.rs

1use std::{
2    convert::{Infallible, TryFrom},
3    str::FromStr,
4};
5
6use serde::Serialize;
7
8pub use hyper::{
9    body::Incoming,
10    header::{HeaderName, HeaderValue},
11};
12use hyper::{
13    body::{Body as BodyTrait, Bytes, Frame, SizeHint},
14    header::{InvalidHeaderName, InvalidHeaderValue, CONTENT_TYPE},
15    http::{
16        method::{InvalidMethod, Method},
17        request::Builder,
18        uri::{Builder as UriBuilder, InvalidUri, PathAndQuery, Uri},
19        Error as HTTPError,
20    },
21    Error as HyperError, Request, Response,
22};
23use std::mem::take;
24
25mod connector;
26pub(crate) use connector::HyperClient;
27
28pub(crate) fn get_client() -> HyperClient {
29    HyperClient::default()
30}
31
32#[derive(Debug)]
33pub struct Req {
34    req: Builder,
35    body: Body,
36    pub(crate) client: Option<HyperClient>,
37}
38pub struct Resp {
39    resp: Response<Incoming>,
40}
41
42impl From<crate::Response> for Response<Incoming> {
43    fn from(val: crate::Response) -> Self {
44        val.0.resp
45    }
46}
47
48impl<M, U> TryFrom<(M, U)> for crate::Request
49where
50    Method: TryFrom<M>,
51    <Method as TryFrom<M>>::Error: Into<HTTPError>,
52    Uri: TryFrom<U>,
53    <Uri as TryFrom<U>>::Error: Into<HTTPError>,
54{
55    type Error = Infallible;
56
57    fn try_from(value: (M, U)) -> Result<Self, Self::Error> {
58        let req = Builder::new().method(value.0).uri(value.1);
59
60        Ok(crate::Request(Req {
61            req,
62            body: Body::empty(),
63            client: None,
64        }))
65    }
66}
67
68impl Req {
69    pub fn get(uri: &str) -> Req {
70        Self::init(Method::GET, uri)
71    }
72    pub fn post(uri: &str) -> Req {
73        Self::init(Method::POST, uri)
74    }
75    pub fn put(uri: &str) -> Req {
76        Self::init(Method::PUT, uri)
77    }
78    pub fn delete(uri: &str) -> Req {
79        Self::init(Method::DELETE, uri)
80    }
81    pub fn head(uri: &str) -> Req {
82        Self::init(Method::HEAD, uri)
83    }
84    pub fn options(uri: &str) -> Req {
85        Self::init(Method::OPTIONS, uri)
86    }
87    pub fn new(meth: &str, uri: &str) -> Result<Req, Error> {
88        Ok(Self::init(Method::from_str(meth)?, uri))
89    }
90    fn init(method: Method, uri: &str) -> Req {
91        let req = Builder::new().method(method).uri(uri);
92
93        Req {
94            req,
95            body: Body::empty(),
96            client: None,
97        }
98    }
99    pub async fn send_request(mut self) -> Result<Resp, Error> {
100        let req = self.req.body(self.body)?;
101
102        let resp = if let Some(mut client) = self.client.take() {
103            client.request(req).await?
104        } else {
105            get_client().request(req).await?
106        };
107        Ok(Resp { resp })
108    }
109    pub fn json<T: Serialize + ?Sized>(&mut self, json: &T) -> Result<(), Error> {
110        let bytes = serde_json::to_string(&json)?;
111        self.set_header(CONTENT_TYPE, HeaderValue::from_static("application/json"))?;
112        self.body = bytes.into();
113        Ok(())
114    }
115    pub fn form<T: Serialize + ?Sized>(&mut self, data: &T) -> Result<(), Error> {
116        let query = serde_urlencoded::to_string(data)?;
117        self.set_header(
118            CONTENT_TYPE,
119            HeaderValue::from_static("application/x-www-form-urlencoded"),
120        )?;
121        self.body = query.into();
122        Ok(())
123    }
124    #[inline]
125    pub fn query<T: Serialize + ?Sized>(&mut self, query: &T) -> Result<(), Error> {
126        // codegen trampoline: https://github.com/rust-lang/rust/issues/77960
127        self._query(serde_qs::to_string(&query)?)
128    }
129    fn _query(&mut self, query: String) -> Result<(), Error> {
130        let old = self.req.uri_ref().expect("no uri");
131
132        let mut p_and_p = String::with_capacity(old.path().len() + query.len() + 1);
133        p_and_p.push_str(old.path());
134        p_and_p.push('?');
135        p_and_p.push_str(&query);
136
137        let path_and_query = PathAndQuery::from_str(&p_and_p)?;
138
139        let new = UriBuilder::new()
140            .scheme(old.scheme_str().unwrap())
141            .authority(old.authority().unwrap().as_str())
142            .path_and_query(path_and_query)
143            .build()?;
144
145        self.req = take(&mut self.req).uri(new);
146        Ok(())
147    }
148    pub fn body<B: Into<Body>>(&mut self, body: B) -> Result<(), Error> {
149        self.body = body.into();
150        Ok(())
151    }
152    pub fn set_header(&mut self, name: HeaderName, value: HeaderValue) -> Result<(), Error> {
153        self.req.headers_mut().map(|hm| hm.insert(name, value));
154        Ok(())
155    }
156    pub fn add_header(&mut self, name: HeaderName, value: HeaderValue) -> Result<(), Error> {
157        self.req = take(&mut self.req).header(name, value);
158        Ok(())
159    }
160}
161use hyper::body::Buf;
162use serde::de::DeserializeOwned;
163impl Resp {
164    pub fn status(&self) -> u16 {
165        self.resp.status().as_u16()
166    }
167    pub fn status_str(&self) -> &'static str {
168        self.resp.status().canonical_reason().unwrap_or("")
169    }
170    pub async fn json<D: DeserializeOwned>(&mut self) -> Result<D, Error> {
171        let reader = aggregate(self.resp.body_mut()).await?.reader();
172        Ok(serde_json::from_reader(reader)?)
173    }
174    pub async fn bytes(&mut self) -> Result<Vec<u8>, Error> {
175        let mut b = aggregate(self.resp.body_mut()).await?;
176        let capacity = b.remaining();
177        let mut v = Vec::with_capacity(capacity);
178        let ptr = v.spare_capacity_mut().as_mut_ptr();
179        let mut off = 0;
180        while off < capacity {
181            let cnt;
182            unsafe {
183                let src = b.chunk();
184                cnt = src.len();
185                std::ptr::copy_nonoverlapping(src.as_ptr(), ptr.add(off).cast(), cnt);
186                off += cnt;
187            }
188            b.advance(cnt);
189        }
190        unsafe {
191            v.set_len(capacity);
192        }
193        Ok(v)
194    }
195    pub async fn string(&mut self) -> Result<String, Error> {
196        let b = self.bytes().await?;
197        Ok(String::from_utf8_lossy(&b).to_string())
198    }
199    pub fn get_header(&self, name: HeaderName) -> Option<&HeaderValue> {
200        self.resp.headers().get(name)
201    }
202    pub fn header_iter(&self) -> impl Iterator<Item = (&HeaderName, &HeaderValue)> {
203        self.resp.headers().into_iter()
204    }
205}
206
207struct FracturedBuf(std::collections::VecDeque<Bytes>);
208impl Buf for FracturedBuf {
209    fn remaining(&self) -> usize {
210        self.0.iter().map(|buf| buf.remaining()).sum()
211    }
212    fn chunk(&self) -> &[u8] {
213        self.0.front().map(Buf::chunk).unwrap_or_default()
214    }
215    fn advance(&mut self, mut cnt: usize) {
216        let bufs = &mut self.0;
217        while cnt > 0 {
218            if let Some(front) = bufs.front_mut() {
219                let rem = front.remaining();
220                if rem > cnt {
221                    front.advance(cnt);
222                    return;
223                } else {
224                    front.advance(rem);
225                    cnt -= rem;
226                }
227            } else {
228                //no data -> panic?
229                return;
230            }
231            bufs.pop_front();
232        }
233    }
234}
235struct Framed<'a>(&'a mut Incoming);
236
237impl<'a> futures::Future for Framed<'a> {
238    type Output = Option<Result<hyper::body::Frame<Bytes>, hyper::Error>>;
239
240    fn poll(
241        mut self: std::pin::Pin<&mut Self>,
242        ctx: &mut std::task::Context<'_>,
243    ) -> std::task::Poll<Self::Output> {
244        std::pin::Pin::new(&mut self.0).poll_frame(ctx)
245    }
246}
247async fn aggregate(body: &mut Incoming) -> Result<FracturedBuf, Error> {
248    let mut v = std::collections::VecDeque::new();
249    while let Some(f) = Framed(body).await {
250        if let Ok(d) = f?.into_data() {
251            v.push_back(d);
252        }
253    }
254    Ok(FracturedBuf(v))
255}
256
257#[derive(Debug)]
258pub enum Error {
259    Scheme,
260    Http(HTTPError),
261    InvalidQueryString(serde_qs::Error),
262    InvalidMethod(InvalidMethod),
263    Hyper(HyperError),
264    Json(serde_json::Error),
265    InvalidHeaderValue(InvalidHeaderValue),
266    InvalidHeaderName(InvalidHeaderName),
267    InvalidUri(InvalidUri),
268    Urlencoded(serde_urlencoded::ser::Error),
269    Io(std::io::Error),
270}
271impl std::error::Error for Error {}
272use std::fmt;
273impl fmt::Display for Error {
274    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275        match self {
276            Error::Scheme => write!(f, "Scheme"),
277            Error::Http(i) => write!(f, "{}", i),
278            Error::InvalidQueryString(i) => write!(f, "{}", i),
279            Error::InvalidMethod(i) => write!(f, "{}", i),
280            Error::Hyper(i) => write!(f, "{}", i),
281            Error::Json(i) => write!(f, "{}", i),
282            Error::InvalidHeaderValue(i) => write!(f, "{}", i),
283            Error::InvalidHeaderName(i) => write!(f, "{}", i),
284            Error::InvalidUri(i) => write!(f, "{}", i),
285            Error::Urlencoded(i) => write!(f, "{}", i),
286            Error::Io(i) => write!(f, "{}", i),
287        }
288    }
289}
290
291impl From<std::io::Error> for Error {
292    fn from(e: std::io::Error) -> Self {
293        Self::Io(e)
294    }
295}
296impl From<serde_urlencoded::ser::Error> for Error {
297    fn from(e: serde_urlencoded::ser::Error) -> Self {
298        Self::Urlencoded(e)
299    }
300}
301impl From<InvalidUri> for Error {
302    fn from(e: InvalidUri) -> Self {
303        Self::InvalidUri(e)
304    }
305}
306impl From<InvalidHeaderName> for Error {
307    fn from(e: InvalidHeaderName) -> Self {
308        Self::InvalidHeaderName(e)
309    }
310}
311
312impl From<InvalidHeaderValue> for Error {
313    fn from(e: InvalidHeaderValue) -> Self {
314        Self::InvalidHeaderValue(e)
315    }
316}
317impl From<serde_json::Error> for Error {
318    fn from(e: serde_json::Error) -> Self {
319        Self::Json(e)
320    }
321}
322impl From<HyperError> for Error {
323    fn from(e: HyperError) -> Self {
324        Self::Hyper(e)
325    }
326}
327impl From<InvalidMethod> for Error {
328    fn from(e: InvalidMethod) -> Self {
329        Self::InvalidMethod(e)
330    }
331}
332impl From<HTTPError> for Error {
333    fn from(e: HTTPError) -> Self {
334        Self::Http(e)
335    }
336}
337impl From<serde_qs::Error> for Error {
338    fn from(e: serde_qs::Error) -> Self {
339        Self::InvalidQueryString(e)
340    }
341}
342impl From<std::convert::Infallible> for Error {
343    fn from(_e: std::convert::Infallible) -> Self {
344        unreachable!();
345    }
346}
347
348#[derive(Debug)]
349pub struct Body(Vec<u8>);
350impl Body {
351    fn empty() -> Self {
352        Self(vec![])
353    }
354}
355impl From<String> for Body {
356    #[inline]
357    fn from(t: String) -> Self {
358        Body(t.into_bytes())
359    }
360}
361impl From<Vec<u8>> for Body {
362    #[inline]
363    fn from(t: Vec<u8>) -> Self {
364        Body(t)
365    }
366}
367impl From<&'static [u8]> for Body {
368    #[inline]
369    fn from(t: &'static [u8]) -> Self {
370        Body(t.to_vec())
371    }
372}
373impl From<&'static str> for Body {
374    #[inline]
375    fn from(t: &'static str) -> Self {
376        Body(t.as_bytes().to_vec())
377    }
378}
379impl hyper::body::Body for Body {
380    type Data = Bytes;
381    type Error = Infallible;
382
383    fn poll_frame(
384        mut self: std::pin::Pin<&mut Self>,
385        _cx: &mut std::task::Context<'_>,
386    ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
387        if self.0.is_empty() {
388            std::task::Poll::Ready(None)
389        } else {
390            let v: Vec<u8> = std::mem::take(self.0.as_mut());
391            std::task::Poll::Ready(Some(Ok(Frame::data(v.into()))))
392        }
393    }
394    fn size_hint(&self) -> SizeHint {
395        SizeHint::with_exact(self.0.len() as u64)
396    }
397}