eva_sdk/
http.rs

1use eva_common::op::Op;
2use eva_common::prelude::*;
3use hyper::{Body, StatusCode, Uri, client::HttpConnector};
4use hyper_tls::HttpsConnector;
5use serde::{Deserialize, Serialize};
6use simple_pool::ResourcePool;
7use std::collections::BTreeMap;
8use std::time::Duration;
9
10type Resource = hyper::Client<HttpsConnector<HttpConnector>>;
11
12pub const MAX_REDIRECTS: usize = 10;
13
14pub struct Client {
15    pool: ResourcePool<Resource>,
16    timeout: Duration,
17    max_redirects: usize,
18    follow_redirects: bool,
19}
20
21#[derive(Serialize, Deserialize, Debug, Clone)]
22pub struct Response {
23    status: u16,
24    headers: BTreeMap<String, String>,
25    body: Vec<u8>,
26}
27
28impl Response {
29    #[inline]
30    pub fn status(&self) -> u16 {
31        self.status
32    }
33    #[inline]
34    pub fn headers(&self) -> &BTreeMap<String, String> {
35        &self.headers
36    }
37    #[inline]
38    pub fn body(&self) -> &[u8] {
39        &self.body
40    }
41}
42
43impl TryFrom<Response> for hyper::http::Response<Body> {
44    type Error = Error;
45    fn try_from(resp: Response) -> EResult<Self> {
46        let mut r = hyper::http::Response::builder();
47        for (header, value) in resp.headers {
48            r = r.header(header, value);
49        }
50        r.status(StatusCode::from_u16(resp.status).map_err(Error::failed)?)
51            .body(Body::from(resp.body))
52            .map_err(Error::failed)
53    }
54}
55
56impl Client {
57    pub fn new(pool_size: usize, timeout: Duration) -> Self {
58        let pool: ResourcePool<Resource> = <_>::default();
59        for _ in 0..=pool_size {
60            let https = HttpsConnector::new();
61            let client: hyper::Client<_> = hyper::Client::builder()
62                .pool_idle_timeout(timeout)
63                .build(https);
64            pool.append(client);
65        }
66        Self {
67            pool,
68            timeout,
69            max_redirects: MAX_REDIRECTS,
70            follow_redirects: true,
71        }
72    }
73    #[inline]
74    pub fn max_redirects(mut self, max_redirects: usize) -> Self {
75        self.max_redirects = max_redirects;
76        self
77    }
78    #[inline]
79    pub fn follow_redirects(mut self, follow: bool) -> Self {
80        self.follow_redirects = follow;
81        self
82    }
83    pub async fn get(&self, url: &str) -> EResult<hyper::Response<Body>> {
84        let op = Op::new(self.timeout);
85        let mut target_uri: Uri = {
86            if url.starts_with("http://") || url.starts_with("https://") {
87                url.parse()
88            } else {
89                format!("http://{url}").parse()
90            }
91        }
92        .map_err(|e| Error::invalid_params(format!("invalid url {}: {}", url, e)))?;
93        let client = tokio::time::timeout(op.timeout()?, self.pool.get()).await?;
94        let mut rdr = 0;
95        loop {
96            let res = tokio::time::timeout(op.timeout()?, client.get(target_uri.clone()))
97                .await?
98                .map_err(Error::io)?;
99            if self.follow_redirects
100                && (res.status() == StatusCode::MOVED_PERMANENTLY
101                    || res.status() == StatusCode::TEMPORARY_REDIRECT
102                    || res.status() == StatusCode::FOUND)
103            {
104                if rdr > self.max_redirects {
105                    return Err(Error::io("too many redirects"));
106                }
107                rdr += 1;
108                if let Some(loc) = res.headers().get(hyper::header::LOCATION) {
109                    let location_uri: Uri = loc
110                        .to_str()
111                        .map_err(|e| Error::invalid_params(format!("invalid redirect url: {e}")))?
112                        .parse()
113                        .map_err(|e| Error::invalid_params(format!("invalid redirect url: {e}")))?;
114                    let loc_parts = location_uri.into_parts();
115                    let mut parts = target_uri.into_parts();
116                    if loc_parts.scheme.is_some() {
117                        parts.scheme = loc_parts.scheme;
118                    }
119                    if loc_parts.authority.is_some() {
120                        parts.authority = loc_parts.authority;
121                    }
122                    parts.path_and_query = loc_parts.path_and_query;
123                    target_uri = Uri::from_parts(parts)
124                        .map_err(|e| Error::invalid_params(format!("invalid redirect url: {e}")))?;
125                } else {
126                    return Err(Error::io("invalid redirect"));
127                }
128            } else {
129                return Ok(res);
130            }
131        }
132    }
133    pub async fn get_response(&self, url: &str) -> EResult<Response> {
134        let op = Op::new(self.timeout);
135        let resp = self.get(url).await?;
136        let status = resp.status().as_u16();
137        let mut headers = BTreeMap::new();
138        for (header, value) in resp.headers() {
139            headers.insert(
140                header.to_string(),
141                value.to_str().unwrap_or_default().to_owned(),
142            );
143        }
144        let body = tokio::time::timeout(op.timeout()?, hyper::body::to_bytes(resp))
145            .await?
146            .map_err(Error::io)?
147            .to_vec();
148        Ok(Response {
149            status,
150            headers,
151            body,
152        })
153    }
154}