1use eva_common::op::Op;
2use eva_common::prelude::*;
3use hyper::{Body, StatusCode, Uri, client::HttpConnector};
4use hyper_tls::HttpsConnector;
5use serde::{Deserialize, Serialize};
6use simple_pool::ResourcePool;
7use std::collections::BTreeMap;
8use std::time::Duration;
9
10type Resource = hyper::Client<HttpsConnector<HttpConnector>>;
11
12pub const MAX_REDIRECTS: usize = 10;
13
14pub struct Client {
15 pool: ResourcePool<Resource>,
16 timeout: Duration,
17 max_redirects: usize,
18 follow_redirects: bool,
19}
20
21#[derive(Serialize, Deserialize, Debug, Clone)]
22pub struct Response {
23 status: u16,
24 headers: BTreeMap<String, String>,
25 body: Vec<u8>,
26}
27
28impl Response {
29 #[inline]
30 pub fn status(&self) -> u16 {
31 self.status
32 }
33 #[inline]
34 pub fn headers(&self) -> &BTreeMap<String, String> {
35 &self.headers
36 }
37 #[inline]
38 pub fn body(&self) -> &[u8] {
39 &self.body
40 }
41}
42
43impl TryFrom<Response> for hyper::http::Response<Body> {
44 type Error = Error;
45 fn try_from(resp: Response) -> EResult<Self> {
46 let mut r = hyper::http::Response::builder();
47 for (header, value) in resp.headers {
48 r = r.header(header, value);
49 }
50 r.status(StatusCode::from_u16(resp.status).map_err(Error::failed)?)
51 .body(Body::from(resp.body))
52 .map_err(Error::failed)
53 }
54}
55
56impl Client {
57 pub fn new(pool_size: usize, timeout: Duration) -> Self {
58 let pool: ResourcePool<Resource> = <_>::default();
59 for _ in 0..=pool_size {
60 let https = HttpsConnector::new();
61 let client: hyper::Client<_> = hyper::Client::builder()
62 .pool_idle_timeout(timeout)
63 .build(https);
64 pool.append(client);
65 }
66 Self {
67 pool,
68 timeout,
69 max_redirects: MAX_REDIRECTS,
70 follow_redirects: true,
71 }
72 }
73 #[inline]
74 pub fn max_redirects(mut self, max_redirects: usize) -> Self {
75 self.max_redirects = max_redirects;
76 self
77 }
78 #[inline]
79 pub fn follow_redirects(mut self, follow: bool) -> Self {
80 self.follow_redirects = follow;
81 self
82 }
83 pub async fn get(&self, url: &str) -> EResult<hyper::Response<Body>> {
84 let op = Op::new(self.timeout);
85 let mut target_uri: Uri = {
86 if url.starts_with("http://") || url.starts_with("https://") {
87 url.parse()
88 } else {
89 format!("http://{url}").parse()
90 }
91 }
92 .map_err(|e| Error::invalid_params(format!("invalid url {}: {}", url, e)))?;
93 let client = tokio::time::timeout(op.timeout()?, self.pool.get()).await?;
94 let mut rdr = 0;
95 loop {
96 let res = tokio::time::timeout(op.timeout()?, client.get(target_uri.clone()))
97 .await?
98 .map_err(Error::io)?;
99 if self.follow_redirects
100 && (res.status() == StatusCode::MOVED_PERMANENTLY
101 || res.status() == StatusCode::TEMPORARY_REDIRECT
102 || res.status() == StatusCode::FOUND)
103 {
104 if rdr > self.max_redirects {
105 return Err(Error::io("too many redirects"));
106 }
107 rdr += 1;
108 if let Some(loc) = res.headers().get(hyper::header::LOCATION) {
109 let location_uri: Uri = loc
110 .to_str()
111 .map_err(|e| Error::invalid_params(format!("invalid redirect url: {e}")))?
112 .parse()
113 .map_err(|e| Error::invalid_params(format!("invalid redirect url: {e}")))?;
114 let loc_parts = location_uri.into_parts();
115 let mut parts = target_uri.into_parts();
116 if loc_parts.scheme.is_some() {
117 parts.scheme = loc_parts.scheme;
118 }
119 if loc_parts.authority.is_some() {
120 parts.authority = loc_parts.authority;
121 }
122 parts.path_and_query = loc_parts.path_and_query;
123 target_uri = Uri::from_parts(parts)
124 .map_err(|e| Error::invalid_params(format!("invalid redirect url: {e}")))?;
125 } else {
126 return Err(Error::io("invalid redirect"));
127 }
128 } else {
129 return Ok(res);
130 }
131 }
132 }
133 pub async fn get_response(&self, url: &str) -> EResult<Response> {
134 let op = Op::new(self.timeout);
135 let resp = self.get(url).await?;
136 let status = resp.status().as_u16();
137 let mut headers = BTreeMap::new();
138 for (header, value) in resp.headers() {
139 headers.insert(
140 header.to_string(),
141 value.to_str().unwrap_or_default().to_owned(),
142 );
143 }
144 let body = tokio::time::timeout(op.timeout()?, hyper::body::to_bytes(resp))
145 .await?
146 .map_err(Error::io)?
147 .to_vec();
148 Ok(Response {
149 status,
150 headers,
151 body,
152 })
153 }
154}