pub use hyper;
pub use hyper_util;
pub use futures;
pub use tokio;
pub use tokio_util;
pub use tokio_stream;
pub use bytes;
pub use http_body_util;
pub use miniz_oxide;
#[cfg(feature = "tls")]
pub use rustls_native_certs;
#[cfg(feature = "tls")]
pub use tokio_rustls;
use std::net::SocketAddr;
#[cfg(feature = "tls")]
use std::sync::Arc;
use std::pin::Pin;
use hyper::body::Body;
use hyper_util::rt::TokioIo;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bstr::ByteSlice;
pub use hyper::{Request, Response};
pub use hyper::header;
pub use http_body_util::{Full,BodyExt,Limited,StreamBody,combinators::BoxBody};
pub type BoxedBody = BoxBody<bytes::Bytes,String>;
pub fn box_body<T,E: std::fmt::Display>(body: T) -> BoxedBody
where T: Body<Data = bytes::Bytes, Error = E> + Send + Sync + 'static,
{
BoxBody::new(body.map_err(|err| err.to_string()))
}
pub type BodyVec = Full<Bytes>;
const BUFFER_SIZE : usize = 65536;
#[cfg(not(feature = "tls"))]
pub type HTTPClientState = ();
#[cfg(not(feature = "tls"))]
pub fn get_http_client_state() {
}
#[cfg(feature = "tls")]
pub type HTTPClientState = Arc<tokio_rustls::rustls::RootCertStore>;
#[cfg(feature = "tls")]
pub fn get_http_client_state() -> HTTPClientState {
let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
let mut roots = tokio_rustls::rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs().expect("could not load platform certificates") {
roots.add(cert).unwrap();
}
roots.into()
}
#[allow(unused_variables)]
pub async fn perform_http_request<'a,E>(mut client_req: Request<impl Body<Data = Bytes, Error = E> + Send + 'static>, root_cert_store: HTTPClientState, connections: Option<tokio::runtime::Handle>) -> Result<Response<BoxedBody>, Box<dyn std::error::Error + Send + Sync>>
where E: Into<Box<dyn std::error::Error + Send + Sync>>
{
let url = client_req.uri();
let url_copy = url.to_string();
let host = url.host().expect("uri has no host").to_string();
let https = url.scheme_str() == Some("https");
let port = url.port_u16().unwrap_or(if https { 443 } else { 80 });
let mut path_and_query = url.path_and_query().ok_or("no path/query in URL")?.to_string();
if path_and_query.is_empty() {
path_and_query = "/".to_string();
}
*client_req.uri_mut() = hyper::Uri::builder().path_and_query(path_and_query).build()?;
if !client_req.headers_mut().contains_key(hyper::header::HOST) {
if (https && port == 443) || (!https && port == 80) {
client_req.headers_mut().insert(hyper::header::HOST, hyper::header::HeaderValue::from_str(&host)?);
} else {
let host_header = format!("{host}:{port}");
client_req.headers_mut().insert(hyper::header::HOST, hyper::header::HeaderValue::from_str(&host_header)?);
}
}
let mut deflate = false;
if !client_req.headers_mut().contains_key(hyper::header::ACCEPT_ENCODING) {
deflate = true;
client_req.headers_mut().insert(hyper::header::ACCEPT_ENCODING, hyper::header::HeaderValue::from_str("deflate, gzip")?);
}
if !client_req.headers_mut().contains_key(hyper::header::USER_AGENT) {
client_req.headers_mut().insert(hyper::header::USER_AGENT, hyper::header::HeaderValue::from_str(&format!("indigo {}-{}-{}", buildinfy::build_project_name().unwrap_or("httpclient"), buildinfy::build_reference().unwrap_or("dev"), buildinfy::build_pipeline_id_per_project().unwrap_or("0")))?);
}
let host_str = host.clone();
let addrs = tokio::task::spawn_blocking(move || -> Result<Vec<SocketAddr>,std::io::Error> {
Ok(std::net::ToSocketAddrs::to_socket_addrs(&(host_str,port))?.collect())
}).await??;
let mut connection = None;
for (i,a) in addrs.iter().enumerate() {
match tokio::net::TcpStream::connect(a).await {
Ok(s) => {
connection = Some((a, s));
break;
},
Err(err) => {
if i == addrs.len()-1 {
return Err(err.into());
}
},
}
};
let (addr, stream) = connection.ok_or(std::io::Error::from(std::io::ErrorKind::NotFound))?;
log::trace!("making http(s) request to {url_copy} with address {addr}");
let resp = if https {
#[cfg(feature = "tls")]
{
let mut config = tokio_rustls::rustls::ClientConfig::builder()
.with_root_certificates(root_cert_store)
.with_no_client_auth();
config.alpn_protocols = vec![b"http/1.1".to_vec()];
let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
let domain = tokio_rustls::rustls::pki_types::ServerName::try_from(host)
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid DNS name"))?
.to_owned();
let stream = connector.connect(domain, stream).await.map_err(|x| format!("perform_http_request TLS connect error: {}", x))?;
let stream = TokioIo::new(stream);
let mut client = hyper::client::conn::http1::Builder::new();
client.title_case_headers(true);
let (mut sender, conn) = client.handshake(stream).await.map_err(|x| format!("perform_http_request client handshake error: {}", x))?;
connections.unwrap_or_else(|| tokio::runtime::Handle::current()).spawn(async move {
if let Err(err) = conn.await {
log::error!("HTTPS connection failed: {:?}", err);
}
});
sender.send_request(client_req)
}
#[cfg(not(feature = "tls"))]
return Err("HTTPS not supported".into());
} else {
let stream = TokioIo::new(stream);
let mut client = hyper::client::conn::http1::Builder::new();
client.title_case_headers(true);
let (mut sender, conn) = client.handshake(stream).await?;
connections.unwrap_or_else(|| tokio::runtime::Handle::current()).spawn(async move {
if let Err(err) = conn.await {
log::error!("HTTP connection failed: {:?}", err);
}
});
sender.send_request(client_req)
};
let resp = resp.await.map_err(|x| format!("perform_http_request: {}", x))?;
if deflate {
if let Some(content_encoding) = resp.headers().get(hyper::header::CONTENT_ENCODING) {
if content_encoding == "gzip" || content_encoding == "deflate" {
let (parts, body) = resp.into_parts();
return Ok(Response::from_parts(parts, box_body(DecompressBody::new(box_body(body)))))
} else {
return Err(format!("unsupported content encoding send by {url_copy}").into())
}
}
}
let (parts, body) = resp.into_parts();
Ok(Response::from_parts(parts, box_body(body)))
}
pub struct DecompressBody {
inner: Pin<Box<BoxedBody>>,
current: BytesMut,
stopping: bool,
finished: bool,
compressor: Option<Box<miniz_oxide::inflate::stream::InflateState>>,
}
impl std::fmt::Debug for DecompressBody {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DecompressBody")
}
}
impl DecompressBody {
fn new(inner: BoxedBody) -> Self {
Self {
inner: Box::pin(inner),
current: BytesMut::new(),
stopping: false,
finished: false,
compressor: None,
}
}
fn process(self: &mut Pin<&mut Self>) -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, String>>> {
let mut compressor = if let Some(compressor) = self.compressor.take() {
compressor
} else {
if self.current.len() <= 2 {
return std::task::Poll::Pending;
}
let input : &[u8] = self.current.as_bytes();
let data_format = if input.len() >= 2 && input[0] == 0x78 && matches!(input[1], 0x01 | 0x9c | 0xDA) {
miniz_oxide::DataFormat::Zlib
} else if input.len() >= 2 && input[0] == 0x1f && input[1] == 0x8b && input[2] == 8 {
if input.len() <= 9 {
return std::task::Poll::Pending;
}
static FHCRC: u8 = 1 << 1;
static FEXTRA: u8 = 1 << 2;
static FNAME: u8 = 1 << 3;
static FCOMMENT: u8 = 1 << 4;
let mut start = 10;
let flag = input[3];
let _mtime = input[4] as u32 | (input[5] as u32) << 8 | (input[6] as u32) << 16 | (input[7] as u32) << 24;
let _xfl = input[8];
let _os = input[9];
if flag & FEXTRA != 0 {
if start+2 > input.len() {
return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
}
let xlen = input[start] as u16 | (input[start+1] as u16) << 8;
start += 2 + xlen as usize;
}
if flag & FNAME != 0 {
while start < input.len() && input[start] != b'\0' {
start += 1;
}
if start == input.len() {
return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
}
start += 1;
}
if flag & FCOMMENT != 0 {
while start < input.len() && input[start] != b'\0' {
start += 1;
}
if start == input.len() {
return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
}
start += 1;
}
if flag & FHCRC != 0 {
if start+2 > input.len() {
return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
}
start += 2;
}
self.current.advance(start);
miniz_oxide::DataFormat::Raw
} else {
miniz_oxide::DataFormat::Raw
};
Box::new(miniz_oxide::inflate::stream::InflateState::new(data_format))
};
if self.current.is_empty() {
self.compressor = Some(compressor);
return std::task::Poll::Pending;
}
let mut output = vec![0u8; BUFFER_SIZE];
let mode = if self.is_end_stream() { miniz_oxide::MZFlush::Finish } else { miniz_oxide::MZFlush::None };
let miniz_oxide::StreamResult{bytes_consumed, bytes_written, status} = miniz_oxide::inflate::stream::inflate(&mut compressor, self.current.as_bytes(), &mut output, mode);
self.compressor = Some(compressor);
self.current.advance(bytes_consumed);
output.truncate(bytes_written);
self.process_result(output, status)
}
fn process_result(self: &mut Pin<&mut Self>, output: Vec<u8>, status: Result<miniz_oxide::MZStatus,miniz_oxide::MZError>) -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, String>>> {
if let Err(err) = status {
return std::task::Poll::Ready(Some(Err(format!("inflate error {}/{:?}", err as usize, err))));
}
if Ok(miniz_oxide::MZStatus::StreamEnd) == status {
self.stopping = true;
self.finished = true;
}
if output.is_empty() {
if self.finished {
std::task::Poll::Ready(None)
} else {
std::task::Poll::Pending
}
} else {
let result = Bytes::from(output);
let frame = hyper::body::Frame::data(result);
std::task::Poll::Ready(Some(Ok(frame)))
}
}
fn process_end(self: &mut Pin<&mut Self>) -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, String>>> {
assert!(!self.finished);
assert!(self.current.is_empty());
if self.compressor.is_none() {
return std::task::Poll::Ready(Some(Err("inflate end error: cannot deflate empty string".to_string())));
}
let mut output = vec![0u8; BUFFER_SIZE];
let miniz_oxide::StreamResult{bytes_written, status, ..} = miniz_oxide::inflate::stream::inflate(self.compressor.as_mut().unwrap(), &[], &mut output, miniz_oxide::MZFlush::Finish);
output.truncate(bytes_written);
match self.process_result(output, status) {
std::task::Poll::Ready(v) => std::task::Poll::Ready(v),
std::task::Poll::Pending => std::task::Poll::Ready(Some(Err("decompressor end missing".to_string()))),
}
}
}
impl Body for DecompressBody {
type Data = <BoxedBody as Body>::Data;
type Error = <BoxedBody as Body>::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
if self.finished {
return std::task::Poll::Ready(None);
}
loop {
let retval = Self::process(&mut self);
if retval.is_ready() {
return retval;
}
match self.inner.as_mut().poll_frame(cx) {
std::task::Poll::Ready(frame) => {
match frame {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
self.current.put(data);
}
},
Some(Err(_)) => {
return std::task::Poll::Ready(frame);
},
None => {
self.stopping = true;
return Self::process_end(&mut self);
},
}
},
std::task::Poll::Pending => return std::task::Poll::Pending,
}
}
}
fn is_end_stream(&self) -> bool {
self.finished
}
fn size_hint(&self) -> hyper::body::SizeHint {
let inner = self.inner.size_hint();
let mut retval = hyper::body::SizeHint::default();
if inner.lower() <= 32 {
retval.set_lower(0);
} else {
retval.set_lower(inner.lower() - 32);
}
retval
}
}