#[macro_use]
extern crate lazy_static;
use std::fs;
use std::sync::Arc;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use std::net::{IpAddr, SocketAddr};
use std::io::{Error, Result, ErrorKind};
use tokio;
use percent_encoding::{CONTROLS, percent_encode};
use reqwest::{ClientBuilder, Client, Proxy, Certificate, Identity, Method, RequestBuilder, Request, Body, Response, Version,
header::{HeaderMap, HeaderName, HeaderValue},
redirect::Policy,
multipart::{Part, Form}};
use bytes::{Buf, BufMut};
use flume::bounded;
use pi_hash::XHashMap;
lazy_static! {
static ref ASYNC_HTTPC_RUNTIME: tokio::runtime::Runtime = {
let worker_threads = std::env::var("ASYNC_HTTPC_WORKER_THREADS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(4);
tokio::runtime::Builder::new_multi_thread()
.thread_name("ASYNC-HTTPC")
.worker_threads(worker_threads)
.max_blocking_threads(16)
.enable_all()
.build()
.unwrap()
};
}
pub struct AsyncHttpcBuilder(HeaderMap, ClientBuilder);
impl AsyncHttpcBuilder {
pub fn new() -> Self {
AsyncHttpcBuilder(HeaderMap::default(), ClientBuilder::use_native_tls(ClientBuilder::default()))
}
pub fn with_rustls() -> Self {
AsyncHttpcBuilder(HeaderMap::default(), ClientBuilder::use_rustls_tls(ClientBuilder::default()))
}
pub fn bind_address<'a, V: Into<&'a str>>(self, addr: V) -> Self {
if let Ok(addr) = IpAddr::from_str(addr.into()) {
AsyncHttpcBuilder(self.0, self.1.local_address(Some(addr)))
} else {
panic!("Bind address failed, reason: invalid address");
}
}
pub fn set_default_user_agent<'a, V: Into<&'a str>>(self, value: V) -> Self {
AsyncHttpcBuilder(self.0, self.1.user_agent(value.into()))
}
pub fn new_default_header<'a, V: Into<&'a str>>(mut self, key: V, value: V) -> Self {
self.0.insert(HeaderName::from_str(key.into()).unwrap(),
HeaderValue::from_str(value.into()).unwrap());
AsyncHttpcBuilder(self.0, self.1)
}
pub fn add_default_header<'a, V: Into<&'a str>>(mut self, key: V, value: V) -> Self {
self.0.append(HeaderName::from_str(key.into()).unwrap(),
HeaderValue::from_str(value.into()).unwrap());
AsyncHttpcBuilder(self.0, self.1)
}
pub fn enable_cookie(self, b: bool) -> Self {
AsyncHttpcBuilder(self.0, self.1.cookie_store(b))
}
pub fn enable_gzip(self, b: bool) -> Self {
AsyncHttpcBuilder(self.0, self.1.gzip(b))
}
pub fn enable_brotli(self, b: bool) -> Self {
AsyncHttpcBuilder(self.0, self.1.brotli(b))
}
pub fn enable_deflate(self, b: bool) -> Self {
AsyncHttpcBuilder(self.0, self.1.deflate(b))
}
pub fn enable_auto_referer(self, b: bool) -> Self {
AsyncHttpcBuilder(self.0, self.1.referer(b))
}
pub fn enable_strict(self, b: bool) -> Self {
AsyncHttpcBuilder(self.0,
ClientBuilder::danger_accept_invalid_hostnames(self.1, !b)
.danger_accept_invalid_certs(!b))
}
pub fn set_redirect_limit(self, count: usize) -> Self {
AsyncHttpcBuilder(self.0, self.1.redirect(Policy::limited(count)))
}
pub fn set_http_proxy<'a, V: Into<&'a str>>(self, url: V, auth: Option<(V, V)>) -> Self {
let url = url.into();
let proxy = match Proxy::http(url) {
Err(e) => {
panic!("Set http proxy failed, url: {}, reason: {:?}", url, e);
},
Ok(proxy) => {
if let Some((user, pwd)) = auth {
proxy.basic_auth(user.into(), pwd.into())
} else {
proxy
}
},
};
AsyncHttpcBuilder(self.0, self.1.proxy(proxy))
}
pub fn set_connect_timeout(self, timeout: u64) -> Self {
AsyncHttpcBuilder(self.0, self.1.connect_timeout(Duration::from_millis(timeout)))
}
pub fn set_request_timeout(self, timeout: u64) -> Self {
AsyncHttpcBuilder(self.0, self.1.timeout(Duration::from_millis(timeout)))
}
pub fn set_connection_timeout(self, timeout: Option<u64>) -> Self {
if let Some(timeout) = timeout {
AsyncHttpcBuilder(self.0, self.1.pool_idle_timeout(Some(Duration::from_millis(timeout))))
} else {
AsyncHttpcBuilder(self.0, self.1.pool_idle_timeout(None))
}
}
pub fn set_host_connection_limit(self, count: usize) -> Self {
AsyncHttpcBuilder(self.0, self.1.pool_max_idle_per_host(count))
}
pub fn add_client_cert<P: AsRef<Path>>(self, path: P) -> Self {
let cert = match fs::read(&path) {
Err(e) => {
panic!("Set client cert failed, path: {:?}, reason: {:?}", path.as_ref(), e);
},
Ok(bin) => {
match Certificate::from_pem(&bin[..]) {
Err(e) => {
panic!("Set client root cert failed, path: {:?}, reason: {:?}", path.as_ref(), e);
},
Ok(cert) => cert,
}
},
};
AsyncHttpcBuilder(self.0, self.1.add_root_certificate(cert))
}
pub fn add_client_identity<P: AsRef<Path>>(self, path: P, pwd: &str) -> Self {
let key = match fs::read(&path) {
Err(e) => {
panic!("Set client key failed, path: {:?}, reason: {:?}", path.as_ref(), e);
},
Ok(bin) => {
match Identity::from_pkcs12_der(&bin[..], pwd) {
Err(e) => {
panic!("Set client identity failed, path: {:?}, reason: {:?}", path.as_ref(), e);
},
Ok(key) => key,
}
},
};
AsyncHttpcBuilder(self.0, self.1.identity(key))
}
pub fn build(self) -> Result<AsyncHttpc> {
let headers = self.0;
let mut builder = self.1;
builder = builder.default_headers(headers);
match builder.build() {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Build async http client failed, reason: {:?}", e)))
},
Ok(client) => {
Ok(AsyncHttpc(Arc::new(client)))
},
}
}
}
#[derive(Clone)]
pub struct AsyncHttpc(Arc<Client>);
impl AsyncHttpc {
pub fn build_request(&self,
url: &str,
method: AsyncHttpRequestMethod) -> AsyncHttpRequest {
let builder = match method {
AsyncHttpRequestMethod::Optinos => {
self.0.request(Method::OPTIONS, url)
},
AsyncHttpRequestMethod::Head => {
self.0.head(url)
},
AsyncHttpRequestMethod::Get => {
self.0.get(url)
},
AsyncHttpRequestMethod::Post => {
self.0.post(url)
},
AsyncHttpRequestMethod::Put => {
self.0.put(url)
},
AsyncHttpRequestMethod::Patch => {
self.0.patch(url)
},
AsyncHttpRequestMethod::Delete => {
self.0.delete(url)
},
};
AsyncHttpRequest {
url: url.to_string(),
method: method,
builder,
}
}
}
#[derive(Debug, Clone)]
pub enum AsyncHttpRequestMethod {
Optinos, Head, Get, Post, Put, Patch, Delete, }
pub struct AsyncHttpForm(Form);
impl AsyncHttpForm {
pub fn get_boundary(&self) -> &str {
self.0.boundary()
}
pub fn add_field(mut self, key: String, value: String) -> AsyncHttpForm {
self.0 = self.0.text(key, value);
self
}
pub fn add_file(mut self,
key: String,
filename: String,
mime: String,
content: Vec<u8>) -> Result<AsyncHttpForm> {
let len = content.len();
match Part::bytes(content)
.file_name(filename.clone())
.mime_str(mime.as_str()) {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Add file to form failed, key: {}, file: {}, mime: {}, len: {}, reason: {}", key, filename, mime, len, e)))
},
Ok(part) => {
self.0 = self.0.part(key, part);
Ok(self)
},
}
}
pub fn into_body(self) -> AsyncHttpRequestBody {
let form = self.0.percent_encode_attr_chars(); AsyncHttpRequestBody::Form(form)
}
}
pub enum AsyncHttpRequestBody {
Body(Body), Form(Form), }
impl AsyncHttpRequestBody {
pub fn with_string(body: String, is_encode: bool) -> Self {
if is_encode {
AsyncHttpRequestBody::Body(Body::from(percent_encode(body.into_bytes().as_slice(), CONTROLS).to_string()))
} else {
AsyncHttpRequestBody::Body(Body::from(body))
}
}
pub fn with_binary(body: Vec<u8>) -> Self {
AsyncHttpRequestBody::Body(Body::from(body))
}
pub fn form() -> AsyncHttpForm {
AsyncHttpForm(Form::new())
}
}
pub struct AsyncHttpRequest {
url: String, method: AsyncHttpRequestMethod, builder: RequestBuilder, }
impl AsyncHttpRequest {
pub fn enable_cors(mut self, b: bool) -> AsyncHttpRequest {
if !b {
self.builder = self.builder.fetch_mode_no_cors();
}
self
}
pub fn set_auth<'a, V: Into<&'a str>>(mut self, user: V, pwd: Option<V>) -> AsyncHttpRequest {
if let Some(pwd) = pwd {
self.builder = self.builder.basic_auth(user.into(), Some(pwd.into()));
} else {
self.builder = self.builder.basic_auth(user.into(), Some(""));
}
self
}
pub fn set_pairs(mut self, pairs: &[(&str, &str)]) -> AsyncHttpRequest {
self.builder = self.builder.query(pairs);
self
}
pub fn set_version(mut self, version: f64) -> AsyncHttpRequest {
match version {
0.9 => self.builder = self.builder.version(Version::HTTP_09),
1.0 => self.builder = self.builder.version(Version::HTTP_10),
2.0 => self.builder = self.builder.version(Version::HTTP_2),
3.0 => self.builder = self.builder.version(Version::HTTP_3),
_ => self.builder = self.builder.version(Version::HTTP_11),
}
self
}
pub fn add_header<'a, V: Into<&'a str>>(mut self, key: V, value: V) -> AsyncHttpRequest {
self.builder = self.builder.header(key.into(), value.into());
self
}
pub fn set_body(mut self, body: AsyncHttpRequestBody) -> AsyncHttpRequest {
match body {
AsyncHttpRequestBody::Body(body) => {
self.builder = self.builder.body(body);
},
AsyncHttpRequestBody::Form(form) => {
self.builder = self.builder.multipart(form);
},
}
self
}
pub fn set_timeout(mut self, timeout: u64) -> AsyncHttpRequest {
self.builder = self.builder.timeout(Duration::from_millis(timeout));
self
}
}
impl AsyncHttpRequest {
pub async fn send(self) -> Result<AsyncHttpResponse> {
let url = self.url;
let method = self.method;
let request = self.builder;
let(sender, receiver) = bounded(1);
ASYNC_HTTPC_RUNTIME.spawn(async move {
let result = request.send().await;
sender.send(result);
});
match receiver.recv_async().await {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Async http request failed, method: {:?}, url: {:?}, reason: {:?}", method, url, e)))
},
Ok(result) => {
match result {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Async http request failed, method: {:?}, url: {:?}, reason: {:?}", method, url, e)))
},
Ok(respone) => {
Ok(AsyncHttpResponse(respone))
},
}
}
}
}
}
pub struct AsyncHttpResponse(Response);
impl AsyncHttpResponse {
pub fn get_peer_addr(&self) -> Option<SocketAddr> {
self.0.remote_addr()
}
pub fn get_url(&self) -> String {
self.0.url().to_string()
}
pub fn get_status(&self) -> u16 {
self.0.status().as_u16()
}
pub fn get_version(&self) -> String {
format!("{:?}", self.0.version())
}
pub fn contains_key(&self, key: &str) -> bool {
self.0.headers().contains_key(key)
}
pub fn get_headers(&self, key: &str) -> Option<Vec<String>> {
let r = self.0.headers().get_all(key).iter().map(|value| {
if let Ok(val) = String::from_utf8(value.as_bytes().to_vec()) {
val
} else {
String::new()
}
}).collect::<Vec<String>>();
if r.is_empty() {
None
} else {
Some(r)
}
}
pub fn to_headers(&self) -> XHashMap<String, String> {
let mut headers = XHashMap::default();
for (key, value) in self.0.headers() {
if let Ok(value) = value.to_str() {
headers.insert(key.to_string(), value.to_string());
}
}
headers
}
pub fn get_body_len(&self) -> Option<u64> {
self.0.content_length()
}
}
impl AsyncHttpResponse {
pub async fn get_body(&mut self) -> Result<Option<Box<[u8]>>> {
ASYNC_HTTPC_RUNTIME.block_on(async move {
match self.0.chunk().await {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Get response body failed, reason: {:?}", e)))
},
Ok(Some(chunk)) => {
Ok(Some(chunk.to_vec().into_boxed_slice()))
},
Ok(None) => Ok(None),
}
})
}
pub async fn body(&mut self) -> Result<Box<[u8]>> {
let mut body: Vec<u8> = Vec::new();
loop {
match self.get_body().await {
Err(e) => {
return Err(e);
},
Ok(Some(block)) => {
body.put_slice(&block[..]);
},
Ok(None) => break,
}
}
Ok(body.into_boxed_slice())
}
}