1pub mod request;
48
49pub use bytes::Bytes;
54
55pub use hyper::header;
56pub use hyper::header::{HeaderName, HeaderValue};
57pub use hyper::{HeaderMap, Method, Uri};
58
59pub use request::HttpRequest;
60
61use bytes::{BufMut, BytesMut};
62use http_body_util::{BodyExt, Full};
63use hyper::{Request, Response, body::Incoming};
64use hyper_rustls::HttpsConnectorBuilder;
65use hyper_util::client::legacy::{Client, connect::HttpConnector};
66use hyper_util::rt::TokioExecutor;
67use std::time::Duration;
68use thiserror::Error;
69use tokio::time::timeout;
70
71#[derive(Debug, Error)]
75pub enum HttpError {
76 #[error("invalid uri: {0}")]
78 InvalidUri(String),
79
80 #[error("http status: {0}")]
82 Status(u16),
83
84 #[error("timeout")]
86 Timeout,
87
88 #[error("response body too large (limit {limit_bytes} bytes)")]
90 BodyTooLarge { limit_bytes: usize },
91
92 #[error("content decode failed: {0}")]
94 DecodeBody(String),
95
96 #[error("json parse error: {0}")]
98 Json(String),
99
100 #[error("json5 parse error: {0}")]
102 Json5(String),
103
104 #[error("typed decode error: {0}")]
106 Decode(String),
107
108 #[error("urlencoded encode error: {0}")]
110 UrlEncoded(String),
111
112 #[error("http client error: {0}")]
114 Client(#[from] hyper_util::client::legacy::Error),
115
116 #[error("hyper error: {0}")]
118 Hyper(#[from] hyper::Error),
119
120 #[error("io error: {0}")]
122 Io(#[from] std::io::Error),
123
124 #[error("tls error: {0}")]
126 Tls(String),
127}
128
129#[derive(Clone)]
133pub struct HttpClient {
134 host: String,
135 client: Client<hyper_rustls::HttpsConnector<HttpConnector>, Full<Bytes>>,
136 io_timeout: Duration,
137 max_body_bytes: usize,
138}
139
140impl HttpClient {
141 pub const DEFAULT_MAX_BODY_BYTES: usize = 50 * 1024 * 1024;
143
144 pub fn new_https(host: impl Into<String>) -> Result<Self, HttpError> {
151 let host = host.into();
152
153 let mut http = HttpConnector::new();
155 http.enforce_http(false);
156
157 let tls_config = platform_tls_config()?;
158
159 let https = HttpsConnectorBuilder::new()
160 .with_tls_config(tls_config)
161 .https_only()
162 .enable_http1()
163 .enable_http2()
164 .build();
165
166 let client = Client::builder(TokioExecutor::new())
167 .pool_idle_timeout(Duration::from_secs(60))
168 .pool_max_idle_per_host(64)
169 .build(https);
170
171 Ok(Self {
172 host,
173 client,
174 io_timeout: Duration::from_secs(60),
175 max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
176 })
177 }
178
179 pub fn host(&self) -> &str {
181 &self.host
182 }
183
184 pub fn set_timeout(&mut self, d: Duration) {
188 self.io_timeout = d;
189 }
190
191 pub fn set_max_body_bytes(&mut self, max: usize) {
195 self.max_body_bytes = max;
196 }
197
198 fn uri(&self, path_and_query: &str) -> Result<Uri, HttpError> {
199 if !path_and_query.starts_with('/') {
200 return Err(HttpError::InvalidUri(format!(
201 "path_and_query must start with '/': {}",
202 path_and_query
203 )));
204 }
205 let full = format!("https://{}{}", self.host, path_and_query);
206 full.parse::<Uri>().map_err(|_| HttpError::InvalidUri(full))
207 }
208
209 pub async fn request_bytes(
218 &self,
219 method: Method,
220 path_and_query: &str,
221 mut headers: HeaderMap,
222 body: Bytes,
223 ) -> Result<(u16, HeaderMap, Bytes), HttpError> {
224 let uri = self.uri(path_and_query)?;
225
226 if !headers.contains_key(header::ACCEPT_ENCODING) {
227 headers.insert(
228 header::ACCEPT_ENCODING,
229 HeaderValue::from_static("gzip, deflate, br"),
230 );
231 }
232
233 if !headers.contains_key(header::USER_AGENT) {
234 const UA: &str = concat!("reqkit/", env!("CARGO_PKG_VERSION"));
236 headers.insert(header::USER_AGENT, HeaderValue::from_static(UA));
237 }
238
239 async fn attempt(
240 client: &Client<hyper_rustls::HttpsConnector<HttpConnector>, Full<Bytes>>,
241 io_timeout: Duration,
242 max_body_bytes: usize,
243 method: Method,
244 uri: Uri,
245 headers: HeaderMap,
246 body: Bytes,
247 ) -> Result<(u16, HeaderMap, Bytes), HttpError> {
248 let mut builder = Request::builder().method(method).uri(uri);
249 {
250 let h = builder.headers_mut().expect("builder headers");
251 *h = headers;
252 }
253
254 let req = builder
255 .body(Full::new(body))
256 .map_err(|e| HttpError::InvalidUri(e.to_string()))?;
257
258 let resp: Response<Incoming> = match timeout(io_timeout, client.request(req)).await {
259 Ok(Ok(r)) => r,
260 Ok(Err(e)) => return Err(HttpError::Client(e)),
261 Err(_) => return Err(HttpError::Timeout),
262 };
263
264 let status = resp.status().as_u16();
265 let headers = resp.headers().clone();
266
267 let bytes = collect_limited(resp.into_body(), io_timeout, max_body_bytes).await?;
268 Ok((status, headers, bytes))
269 }
270
271 attempt(
272 &self.client,
273 self.io_timeout,
274 self.max_body_bytes,
275 method,
276 uri,
277 headers,
278 body,
279 )
280 .await
281 }
282}
283
284async fn collect_limited(
288 mut body: Incoming,
289 io_timeout: Duration,
290 max: usize,
291) -> Result<Bytes, HttpError> {
292 let fut = async move {
293 let mut out = BytesMut::new();
294 while let Some(frame) = body.frame().await {
295 let frame = frame?;
296 if let Some(chunk) = frame.data_ref() {
297 if out.len().saturating_add(chunk.len()) > max {
298 return Err(HttpError::BodyTooLarge { limit_bytes: max });
299 }
300 out.put_slice(chunk);
301 }
302 }
303 Ok::<Bytes, HttpError>(out.freeze())
304 };
305
306 match timeout(io_timeout, fut).await {
307 Ok(r) => r,
308 Err(_) => Err(HttpError::Timeout),
309 }
310}
311
312pub async fn decode_content(
320 headers: &HeaderMap,
321 body: Bytes,
322 max_out: usize,
323) -> Result<Bytes, HttpError> {
324 if body.is_empty() {
325 return Ok(body);
326 }
327
328 let encs: Vec<String> = headers
329 .get_all(header::CONTENT_ENCODING)
330 .iter()
331 .filter_map(|v| v.to_str().ok())
332 .flat_map(|s| s.split(','))
333 .map(|t| t.trim().to_ascii_lowercase())
334 .filter(|t| !t.is_empty() && t != "identity")
335 .collect();
336
337 if encs.is_empty() {
338 return Ok(body);
339 }
340
341 let body_vec = body.to_vec();
342 let res = tokio::task::spawn_blocking(move || {
343 let mut cur = body_vec;
344
345 for enc in encs.into_iter().rev() {
346 cur = match enc.as_str() {
347 "gzip" => decode_gzip_limited(&cur, max_out)?,
348 "deflate" => decode_deflate_limited(&cur, max_out)?,
349 "br" => decode_brotli_limited(&cur, max_out)?,
350 other => {
351 return Err(HttpError::DecodeBody(format!(
352 "unsupported content-encoding: {}",
353 other
354 )));
355 }
356 };
357 }
358
359 Ok::<Vec<u8>, HttpError>(cur)
360 })
361 .await;
362
363 match res {
364 Ok(Ok(v)) => Ok(Bytes::from(v)),
365 Ok(Err(e)) => Err(e),
366 Err(e) => Err(HttpError::DecodeBody(format!("decoder task failed: {}", e))),
367 }
368}
369
370fn decode_gzip_limited(input: &[u8], max_out: usize) -> Result<Vec<u8>, HttpError> {
371 use flate2::read::GzDecoder;
372 let mut dec = GzDecoder::new(input);
373 read_to_end_limited(&mut dec, max_out)
374}
375
376fn decode_deflate_limited(input: &[u8], max_out: usize) -> Result<Vec<u8>, HttpError> {
377 use flate2::read::{DeflateDecoder, ZlibDecoder};
378
379 let mut z = ZlibDecoder::new(input);
381 match read_to_end_limited(&mut z, max_out) {
382 Ok(v) => Ok(v),
383 Err(_) => {
384 let mut d = DeflateDecoder::new(input);
385 read_to_end_limited(&mut d, max_out)
386 }
387 }
388}
389
390fn decode_brotli_limited(input: &[u8], max_out: usize) -> Result<Vec<u8>, HttpError> {
391 use brotli::Decompressor;
392 let mut dec = Decompressor::new(input, 4096);
393 read_to_end_limited(&mut dec, max_out)
394}
395
396fn read_to_end_limited<R: std::io::Read>(r: &mut R, max_out: usize) -> Result<Vec<u8>, HttpError> {
397 let mut out = Vec::new();
398 let mut buf = [0u8; 8192];
399
400 loop {
401 let n = r
402 .read(&mut buf)
403 .map_err(|e| HttpError::DecodeBody(e.to_string()))?;
404 if n == 0 {
405 break;
406 }
407 if out.len().saturating_add(n) > max_out {
408 return Err(HttpError::BodyTooLarge {
409 limit_bytes: max_out,
410 });
411 }
412 out.extend_from_slice(&buf[..n]);
413 }
414
415 Ok(out)
416}
417
418fn platform_tls_config() -> Result<rustls::ClientConfig, HttpError> {
422 use rustls::crypto::CryptoProvider;
423 use rustls_platform_verifier::BuilderVerifierExt;
424
425 let provider: std::sync::Arc<CryptoProvider> = if let Some(p) = CryptoProvider::get_default() {
426 p.clone()
427 } else {
428 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
429 CryptoProvider::get_default()
430 .ok_or_else(|| {
431 HttpError::Tls("CryptoProvider still None after install_default()".into())
432 })?
433 .clone()
434 };
435
436 let builder = rustls::ClientConfig::builder_with_provider(provider)
437 .with_safe_default_protocol_versions()
438 .map_err(|e| HttpError::Tls(format!("protocol versions init failed: {e:?}")))?;
439
440 let cfg = builder
441 .with_platform_verifier()
442 .map_err(|e| HttpError::Tls(format!("with_platform_verifier failed: {e:?}")))?
443 .with_no_client_auth();
444
445 Ok(cfg)
446}