#![cfg(feature = "http")]
mod cache;
mod ctx;
mod curl;
mod file_cache;
mod util;
pub use cache::{CacheKey, CacheMode, CachePolicy};
pub use ctx::{HttpCache, HttpClient, http_cache, http_client, set_http_cache, set_http_client, set_request_default};
pub use curl::CurlProcessClient;
pub use file_cache::FileSystemCache;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub use http::{
StatusCode, header,
method::{self, Method},
uri::{self, Uri},
};
use serde::{Deserialize, Serialize};
use zng_var::{Var, const_var};
use std::time::Duration;
use std::{fmt, mem};
use crate::{channel::IpcBytes, http::ctx::REQUEST_DEFAULT, io::Metrics};
use super::io::AsyncRead;
use zng_txt::{ToTxt, Txt};
use zng_unit::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct Request {
#[serde(with = "http_serde::uri")]
pub uri: Uri,
#[serde(with = "http_serde::method")]
pub method: Method,
#[serde(with = "http_serde::header_map")]
pub headers: http::HeaderMap,
pub timeout: Duration,
pub connect_timeout: Duration,
pub low_speed_timeout: (Duration, ByteLength),
pub redirect_limit: u16,
#[cfg(feature = "http_compression")]
pub auto_decompress: bool,
pub max_upload_speed: ByteLength,
pub max_download_speed: ByteLength,
pub require_length: bool,
pub max_length: ByteLength,
pub cache: CacheMode,
#[cfg(feature = "http_cookie")]
pub cookies: bool,
pub metrics: bool,
pub body: IpcBytes,
}
impl Request {
pub fn new(method: Method, uri: Uri) -> Self {
match REQUEST_DEFAULT.lock().clone() {
Some(mut r) => {
r.method = method;
r.uri = uri;
r
}
None => Self {
uri,
method,
require_length: false,
max_length: ByteLength::MAX,
headers: header::HeaderMap::new(),
timeout: Duration::MAX,
connect_timeout: 90.secs(),
low_speed_timeout: (Duration::MAX, 0.bytes()),
redirect_limit: 20,
#[cfg(feature = "http_compression")]
auto_decompress: true,
max_upload_speed: ByteLength::MAX,
max_download_speed: ByteLength::MAX,
cache: CacheMode::Default,
#[cfg(feature = "http_cookie")]
cookies: false,
metrics: true,
body: IpcBytes::default(),
},
}
}
pub fn get<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
Ok(Self::new(Method::GET, uri.try_into()?))
}
pub fn put<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
Ok(Self::new(Method::PUT, uri.try_into()?))
}
pub fn post<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
Ok(Self::new(Method::POST, uri.try_into()?))
}
pub fn delete<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
Ok(Self::new(Method::DELETE, uri.try_into()?))
}
pub fn patch<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
Ok(Self::new(Method::PATCH, uri.try_into()?))
}
pub fn head<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
Ok(Self::new(Method::HEAD, uri.try_into()?))
}
pub fn header<K, V>(mut self, name: K, value: V) -> Result<Self, Error>
where
K: TryInto<header::HeaderName>,
V: TryInto<header::HeaderValue>,
Error: From<<K as TryInto<header::HeaderName>>::Error>,
Error: From<<V as TryInto<header::HeaderValue>>::Error>,
{
self.headers.insert(name.try_into()?, value.try_into()?);
Ok(self)
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
pub fn low_speed_timeout(mut self, timeout: Duration, bytes_per_sec: ByteLength) -> Self {
self.low_speed_timeout = (timeout, bytes_per_sec);
self
}
pub fn redirect_limit(mut self, count: u16) -> Self {
self.redirect_limit = count;
self
}
#[cfg(feature = "http_compression")]
pub fn auto_decompress(mut self, enabled: bool) -> Self {
self.auto_decompress = enabled;
self
}
pub fn require_length(mut self, enabled: bool) -> Self {
self.require_length = enabled;
self
}
pub fn max_length(mut self, max: ByteLength) -> Self {
self.max_length = max;
self
}
pub fn max_upload_speed(mut self, bytes_per_sec: ByteLength) -> Self {
self.max_upload_speed = bytes_per_sec;
self
}
pub fn max_download_speed(mut self, bytes_per_sec: ByteLength) -> Self {
self.max_download_speed = bytes_per_sec;
self
}
#[cfg(feature = "http_compression")]
pub fn cookies(mut self, enable: bool) -> Self {
self.cookies = enable;
self
}
pub fn metrics(mut self, enabled: bool) -> Self {
self.metrics = enabled;
self
}
pub fn body(mut self, body: IpcBytes) -> Self {
self.body = body;
self
}
pub fn body_text(mut self, body: &str) -> Result<Self, Error> {
if !self.headers.contains_key("Content-Type") {
self = self.header("Content-Type", "text/plain; charset=utf-8")?;
}
Ok(self.body(IpcBytes::from_slice_blocking(body.as_bytes())?))
}
pub fn body_json<T: Serialize>(mut self, body: &T) -> Result<Self, Error> {
if !self.headers.contains_key("Content-Type") {
self = self.header("Content-Type", "text/json; charset=utf-8")?;
}
let body = serde_json::to_vec(body)?;
Ok(self.body(IpcBytes::from_vec_blocking(body)?))
}
}
impl From<Request> for http::Request<IpcBytes> {
fn from(mut r: Request) -> Self {
let mut b = http::Request::builder().uri(mem::take(&mut r.uri)).method(r.method.clone());
if !r.headers.is_empty() {
*b.headers_mut().unwrap() = mem::take(&mut r.headers);
}
let body = mem::take(&mut r.body);
let b = b.extension(r);
b.body(body).unwrap()
}
}
impl From<http::Request<IpcBytes>> for Request {
fn from(value: http::Request<IpcBytes>) -> Self {
let (mut parts, body) = value.into_parts();
if let Some(mut r) = parts.extensions.remove::<Request>() {
r.method = parts.method;
r.uri = parts.uri;
r.headers = parts.headers;
r.body = body;
r
} else {
let mut r = Request::new(parts.method, parts.uri);
r.headers = parts.headers;
r.body = body;
r
}
}
}
pub struct Response {
status: StatusCode,
headers: header::HeaderMap,
effective_uri: Uri,
body: ResponseBody,
metrics: Var<Metrics>,
}
impl fmt::Debug for Response {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Response")
.field("status", &self.status)
.field("effective_uri", &self.effective_uri)
.field("header", &self.headers)
.field("metrics", &self.metrics.get())
.finish_non_exhaustive()
}
}
enum ResponseBody {
Done { bytes: IpcBytes },
Read { read: Box<dyn AsyncRead + Send> },
}
impl Response {
pub fn from_read(
status: StatusCode,
header: header::HeaderMap,
effective_uri: Uri,
metrics: Var<Metrics>,
read: Box<dyn AsyncRead + Send>,
) -> Self {
Self {
status,
headers: header,
effective_uri,
metrics,
body: ResponseBody::Read { read },
}
}
pub fn from_done(status: StatusCode, mut headers: header::HeaderMap, effective_uri: Uri, metrics: Metrics, body: IpcBytes) -> Self {
if !headers.contains_key(header::CONTENT_LENGTH) {
headers.insert(header::CONTENT_LENGTH, body.len().into());
}
Self {
status,
headers,
effective_uri,
metrics: const_var(metrics),
body: ResponseBody::Done { bytes: body },
}
}
pub fn from_msg(status: StatusCode, msg: impl ToTxt) -> Self {
Self::from_done(
status,
header::HeaderMap::new(),
Uri::from_static("/"),
Metrics::zero(),
IpcBytes::from_slice_blocking(msg.to_txt().as_bytes()).unwrap(),
)
}
pub fn status(&self) -> StatusCode {
self.status
}
pub fn header(&self) -> &header::HeaderMap {
&self.headers
}
pub fn effective_uri(&self) -> &Uri {
&self.effective_uri
}
pub fn content_len(&self) -> Option<ByteLength> {
match &self.body {
ResponseBody::Done { bytes, .. } => Some(bytes.len().bytes()),
ResponseBody::Read { .. } => {
let len = self
.headers
.get(header::CONTENT_LENGTH)?
.to_str()
.ok()?
.parse::<usize>()
.ok()?
.bytes();
Some(len)
}
}
}
pub async fn download(&mut self) -> Result<(), Error> {
if let ResponseBody::Done { .. } = &self.body {
return Ok(());
}
let downloader = match mem::replace(
&mut self.body,
ResponseBody::Done {
bytes: IpcBytes::default(),
},
) {
ResponseBody::Read { read: downloader } => downloader,
ResponseBody::Done { .. } => unreachable!(),
};
let mut downloader = Box::into_pin(downloader);
let body = IpcBytes::from_read(downloader.as_mut()).await?;
self.body = ResponseBody::Done { bytes: body };
Ok(())
}
pub async fn body(&mut self) -> Result<IpcBytes, Error> {
self.download().await?;
match &self.body {
ResponseBody::Done { bytes, .. } => Ok(bytes.clone()),
ResponseBody::Read { .. } => unreachable!(),
}
}
pub async fn body_text(&mut self) -> Result<Txt, Error> {
let content_type = self
.headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<mime::Mime>().ok());
let encoding_name = content_type
.as_ref()
.and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
.unwrap_or("utf-8");
let bytes = self.body().await?;
let encoding = encoding_rs::Encoding::for_label(encoding_name.as_bytes()).unwrap_or(encoding_rs::UTF_8);
let (text, _, _) = encoding.decode(&bytes);
Ok(Txt::from_str(&text))
}
pub async fn body_json<O>(&mut self) -> Result<O, Error>
where
O: serde::de::DeserializeOwned + std::marker::Unpin,
{
let bytes = self.body().await?;
let r = serde_json::from_slice(&bytes)?;
Ok(r)
}
pub fn metrics(&self) -> Var<Metrics> {
self.metrics.read_only()
}
}
pub async fn get<U>(uri: U) -> Result<Response, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::get(uri)?).await
}
pub async fn get_txt<U>(uri: U) -> Result<Txt, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::get(uri)?).await?.body_text().await
}
pub async fn get_bytes<U>(uri: U) -> Result<IpcBytes, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::get(uri)?).await?.body().await
}
pub async fn get_json<U, O>(uri: U) -> Result<O, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
O: serde::de::DeserializeOwned + std::marker::Unpin,
{
send(Request::get(uri)?).await?.body_json().await
}
pub async fn head<U>(uri: U) -> Result<Response, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::head(uri)?).await
}
pub async fn put<U>(uri: U, body: IpcBytes) -> Result<Response, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::put(uri)?.body(body)).await
}
pub async fn post<U>(uri: U, body: IpcBytes) -> Result<Response, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::post(uri)?.body(body)).await
}
pub async fn delete<U>(uri: U) -> Result<Response, Error>
where
U: TryInto<Uri>,
Error: From<<U as TryInto<Uri>>::Error>,
{
send(Request::delete(uri)?).await
}
pub async fn send(request: Request) -> Result<Response, Error> {
let client = http_client();
if client.is_cache_manager() {
client.send(request).await
} else {
match request.cache {
CacheMode::NoCache => client.send(request).await,
CacheMode::Default => cache::send_cache(client, request).await,
CacheMode::Permanent => cache::send_cache_perm(client, request).await,
}
}
}