use crate::component::Body;
use crate::component::{utils, Request, Response};
use crate::request::Exts;
use crate::response::InnerResponse;
use crate::response::MetaResponse;
use futures_util::{future::join_all, Future};
use http::Extensions;
use hyper::body::Buf;
use hyper::client::HttpConnector;
#[cfg(feature = "proxy")]
use hyper_proxy::ProxyConnector;
use hyper_tls::HttpsConnector;
use std::collections::HashMap;
use std::io::{BufReader, Read};
type ClientPlain = hyper::Client<HttpsConnector<HttpConnector>>;
#[cfg(feature = "proxy")]
type ClientProxy = hyper::Client<ProxyConnector<HttpConnector>>;
pub enum ClientType {
Plain(ClientPlain),
#[cfg_attr(docsrs, doc(cfg(feature = "proxy")))]
#[cfg(feature = "proxy")]
Proxy(ClientProxy),
}
pub static mut CLIENTPOOL: Option<HashMap<u64, Client>> = None;
pub struct Client {
pub id: u64,
pub(crate) inner: ClientType,
}
impl Client {
pub fn new_plain() -> &'static Client {
let id = 0;
unsafe {
if let Some(d) = CLIENTPOOL.as_ref().and_then(|pool| pool.get(&id)) {
return d;
}
}
let https = HttpsConnector::new();
let client = hyper::Client::builder().build::<_, hyper::Body>(https);
let downloader = Client {
id,
inner: ClientType::Plain(client),
};
unsafe {
match CLIENTPOOL {
None => {
let mut pool = HashMap::new();
pool.insert(id, downloader);
CLIENTPOOL = Some(pool);
}
Some(ref mut pool) => {
pool.insert(id, downloader);
}
}
CLIENTPOOL.as_ref().unwrap().get(&id).unwrap()
}
}
pub async fn request(&self, req: Request) -> Result<Response, MetaResponse> {
let (mta, req, ext_t, ext_p) = req.into();
let mut mta = MetaResponse::from(mta);
let tic = utils::now();
let result = match self.inner {
ClientType::Plain(ref client) => client.request(req).await,
#[cfg(feature = "proxy")]
ClientType::Proxy(ref client) => client.request(req).await,
};
let toc = utils::now();
match result {
Ok(response) => {
let (parts, body_future) = response.into_parts();
let bod = hyper::body::aggregate(body_future).await;
match bod {
Ok(body) => {
let mut reader = BufReader::new(body.reader());
let mut data = Vec::new();
if let Some(t) = parts.headers.get("content-encoding") {
match t.to_str() {
#[cfg(feature = "compression")]
Ok("gzip") | Ok("deflate") => {
let mut gz = flate2::read::GzDecoder::new(reader);
gz.read_to_end(&mut data).unwrap();
}
#[cfg(feature = "compression")]
Ok("br") => {
let mut br = brotli2::read::BrotliDecoder::new(reader);
br.read_to_end(&mut data).unwrap();
}
_ => {
reader.read_to_end(&mut data).unwrap();
}
}
} else {
reader.read_to_end(&mut data).unwrap();
}
let body = Body::from(data);
let inn = InnerResponse {
status: parts.status,
version: parts.version,
headers: parts.headers,
extensions: Exts(ext_t, ext_p, Extensions::new(), parts.extensions),
};
mta.info.gap = toc - tic;
let ret = Response::from_parts(inn, body, mta);
Ok(ret)
}
Err(_) => Err(mta),
}
}
Err(e) => {
if format!("{:?}", e).contains("Cancelled") {
log::error!("Timeout request: {:?}", e);
} else {
log::error!("Failed request: {:?}", e);
}
Err(mta)
}
}
}
pub async fn join_all<I>(i: I) -> Vec<<<I as IntoIterator>::Item as Future>::Output>
where
I: IntoIterator,
I::Item: Future,
{
join_all(i).await
}
}