httpclienty 0.1.1

HTTP client library based on hyper and tokio, with support for TLS/HTTPS.
Documentation
pub use hyper;
pub use hyper_util;
pub use futures;
pub use tokio;
pub use tokio_util;
pub use tokio_stream;
pub use bytes;
pub use http_body_util;
pub use miniz_oxide;
#[cfg(feature = "tls")]
pub use rustls_native_certs;
#[cfg(feature = "tls")]
pub use tokio_rustls;

use std::net::SocketAddr;
#[cfg(feature = "tls")]
use std::sync::Arc;
use std::pin::Pin;
use hyper::body::Body;
use hyper_util::rt::TokioIo;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bstr::ByteSlice;

pub use hyper::{Request, Response};
pub use hyper::header;

pub use http_body_util::{Full,BodyExt,Limited,StreamBody,combinators::BoxBody};

pub type BoxedBody = BoxBody<bytes::Bytes,String>;

pub fn box_body<T,E: std::fmt::Display>(body: T) -> BoxedBody
where T: Body<Data = bytes::Bytes, Error = E> + Send + Sync + 'static,
{
    BoxBody::new(body.map_err(|err| err.to_string()))
}

pub type BodyVec = Full<Bytes>;

const BUFFER_SIZE : usize = 65536;

#[cfg(not(feature = "tls"))]
pub type HTTPClientState = ();

#[cfg(not(feature = "tls"))]
pub fn get_http_client_state() {
}

#[cfg(feature = "tls")]
pub type HTTPClientState = Arc<tokio_rustls::rustls::RootCertStore>;

#[cfg(feature = "tls")]
pub fn get_http_client_state() -> HTTPClientState {
    let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
    let mut roots = tokio_rustls::rustls::RootCertStore::empty();
    for cert in rustls_native_certs::load_native_certs().expect("could not load platform certificates") {
        roots.add(cert).unwrap();
    }
    roots.into()
}

#[allow(unused_variables)]
pub async fn perform_http_request<'a,E>(mut client_req: Request<impl Body<Data = Bytes, Error = E> + Send + 'static>, root_cert_store: HTTPClientState, connections: Option<tokio::runtime::Handle>) -> Result<Response<BoxedBody>, Box<dyn std::error::Error + Send + Sync>>
where E: Into<Box<dyn std::error::Error + Send + Sync>>
{
    let url = client_req.uri();
    let url_copy = url.to_string();
    let host = url.host().expect("uri has no host").to_string();
    let https = url.scheme_str() == Some("https");
    let port = url.port_u16().unwrap_or(if https { 443 } else { 80 });

    // This client uses HTTP/1.1 now.
    // And HTTP/1.1 needs no hostname in URL part, but as seperate Host header
    let mut path_and_query = url.path_and_query().ok_or("no path/query in URL")?.to_string();
    if path_and_query.is_empty() {
        path_and_query = "/".to_string();
    }
    *client_req.uri_mut() = hyper::Uri::builder().path_and_query(path_and_query).build()?;

    if !client_req.headers_mut().contains_key(hyper::header::HOST) {
        if (https && port == 443) || (!https && port == 80) {
            client_req.headers_mut().insert(hyper::header::HOST, hyper::header::HeaderValue::from_str(&host)?);
        } else {
            let host_header = format!("{host}:{port}");
            client_req.headers_mut().insert(hyper::header::HOST, hyper::header::HeaderValue::from_str(&host_header)?);
        }
    }
    let mut deflate = false;
    if !client_req.headers_mut().contains_key(hyper::header::ACCEPT_ENCODING) {
        deflate = true;
        client_req.headers_mut().insert(hyper::header::ACCEPT_ENCODING, hyper::header::HeaderValue::from_str("deflate, gzip")?);
    }
    // some sites like https://www.rijksoverheid.nl need an explicit user-agent set.
    if !client_req.headers_mut().contains_key(hyper::header::USER_AGENT) {
        client_req.headers_mut().insert(hyper::header::USER_AGENT, hyper::header::HeaderValue::from_str(&format!("indigo {}-{}-{}", buildinfy::build_project_name().unwrap_or("httpclient"), buildinfy::build_reference().unwrap_or("dev"), buildinfy::build_pipeline_id_per_project().unwrap_or("0")))?);
    }

    let host_str = host.clone();
    let addrs = tokio::task::spawn_blocking(move || -> Result<Vec<SocketAddr>,std::io::Error> {
        Ok(std::net::ToSocketAddrs::to_socket_addrs(&(host_str,port))?.collect())
    }).await??;
    let mut connection = None;
    for (i,a) in addrs.iter().enumerate() {
        match tokio::net::TcpStream::connect(a).await {
            Ok(s) => {
                connection = Some((a, s));
                break;
            },
            Err(err) => {
                if i == addrs.len()-1 {
                    return Err(err.into());
                }
            },
        }
    };
    let (addr, stream) = connection.ok_or(std::io::Error::from(std::io::ErrorKind::NotFound))?;
    log::trace!("making http(s) request to {url_copy} with address {addr}");
    let resp = if https {
        #[cfg(feature = "tls")]
        {
            let mut config = tokio_rustls::rustls::ClientConfig::builder()
                .with_root_certificates(root_cert_store)
                .with_no_client_auth();
            config.alpn_protocols = vec![b"http/1.1".to_vec()];
            let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
            let domain = tokio_rustls::rustls::pki_types::ServerName::try_from(host)
                .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid DNS name"))?
                .to_owned();
            let stream = connector.connect(domain, stream).await.map_err(|x| format!("perform_http_request TLS connect error: {}", x))?;
            let stream = TokioIo::new(stream);
            let mut client = hyper::client::conn::http1::Builder::new();
            client.title_case_headers(true);
            let (mut sender, conn) = client.handshake(stream).await.map_err(|x| format!("perform_http_request client handshake error: {}", x))?;
            connections.unwrap_or_else(|| tokio::runtime::Handle::current()).spawn(async move {
                if let Err(err) = conn.await {
                    log::error!("HTTPS connection failed: {:?}", err);
                }
            });
            sender.send_request(client_req)
        }
        #[cfg(not(feature = "tls"))]
        return Err("HTTPS not supported".into());
    } else {
        let stream = TokioIo::new(stream);
        let mut client = hyper::client::conn::http1::Builder::new();
        client.title_case_headers(true);
        let (mut sender, conn) = client.handshake(stream).await?;
        connections.unwrap_or_else(|| tokio::runtime::Handle::current()).spawn(async move {
            if let Err(err) = conn.await {
                log::error!("HTTP connection failed: {:?}", err);
            }
        });
        sender.send_request(client_req)
    };
    let resp = resp.await.map_err(|x| format!("perform_http_request: {}", x))?;
    if deflate {
        if let Some(content_encoding) = resp.headers().get(hyper::header::CONTENT_ENCODING) {
            if content_encoding == "gzip" || content_encoding == "deflate" {
                let (parts, body) = resp.into_parts();
                return Ok(Response::from_parts(parts, box_body(DecompressBody::new(box_body(body)))))
            } else {
                return Err(format!("unsupported content encoding send by {url_copy}").into())
            }
        }
    }
    let (parts, body) = resp.into_parts();
    Ok(Response::from_parts(parts, box_body(body)))
}

/// A boxed [`Body`] trait object.
pub struct DecompressBody {
    inner: Pin<Box<BoxedBody>>,
    current: BytesMut,
    stopping: bool,
    finished: bool,
    compressor: Option<Box<miniz_oxide::inflate::stream::InflateState>>,
}

impl std::fmt::Debug for DecompressBody {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "DecompressBody")
    }
}

impl DecompressBody {
    fn new(inner: BoxedBody) -> Self {
        Self {
            inner: Box::pin(inner),
            current: BytesMut::new(),
            stopping: false,
            finished: false,
            compressor: None,
        }
    }
    fn process(self: &mut Pin<&mut Self>) -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, String>>> {
        let mut compressor = if let Some(compressor) = self.compressor.take() {
            compressor
        } else {
            if self.current.len() <= 2 {
                // more data needed
                return std::task::Poll::Pending;
            }
            // detection bytes from https://thushw.blogspot.com/2014/05/decoding-html-pages-with-content.html
            let input : &[u8] = self.current.as_bytes();
            let data_format = if input.len() >= 2 && input[0] == 0x78 && matches!(input[1], 0x01 | 0x9c | 0xDA) {
                miniz_oxide::DataFormat::Zlib
            } else if input.len() >= 2 && input[0] == 0x1f && input[1] == 0x8b && input[2] == 8 {
                if input.len() <= 9 {
                    return std::task::Poll::Pending;
                }
                static FHCRC: u8 = 1 << 1;
                static FEXTRA: u8 = 1 << 2;
                static FNAME: u8 = 1 << 3;
                static FCOMMENT: u8 = 1 << 4;
                let mut start = 10;
                let flag = input[3];
                let _mtime = input[4] as u32 | (input[5] as u32) << 8 | (input[6] as u32) << 16 | (input[7] as u32) << 24;
                let _xfl = input[8];
                let _os = input[9];
                if flag & FEXTRA != 0 {
                    if start+2 > input.len() {
                        return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
                    }
                    let xlen = input[start] as u16 | (input[start+1] as u16) << 8;
                    start += 2 + xlen as usize;
                }
                if flag & FNAME != 0 {
                    while start < input.len() && input[start] != b'\0' {
                        start += 1;
                    }
                    if start == input.len() {
                        return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
                    }
                    start += 1;
                }
                if flag & FCOMMENT != 0 {
                    while start < input.len() && input[start] != b'\0' {
                        start += 1;
                    }
                    if start == input.len() {
                        return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
                    }
                    start += 1;
                }
                if flag & FHCRC != 0 {
                    if start+2 > input.len() {
                        return std::task::Poll::Ready(Some(Err(format!("gzip header did not fit in first frame of {} bytes", input.len()))));
                    }
                    start += 2;
                }
                self.current.advance(start);
                miniz_oxide::DataFormat::Raw
            } else {
                miniz_oxide::DataFormat::Raw
            };
            Box::new(miniz_oxide::inflate::stream::InflateState::new(data_format))
        };
        // can be empty after gzip processing, more data needed
        if self.current.is_empty() {
            self.compressor = Some(compressor);
            return std::task::Poll::Pending;
        }
        let mut output = vec![0u8; BUFFER_SIZE];
        let mode = if self.is_end_stream() { miniz_oxide::MZFlush::Finish } else { miniz_oxide::MZFlush::None };
        let miniz_oxide::StreamResult{bytes_consumed, bytes_written, status} = miniz_oxide::inflate::stream::inflate(&mut compressor, self.current.as_bytes(), &mut output, mode);
        self.compressor = Some(compressor);
        self.current.advance(bytes_consumed);
        output.truncate(bytes_written);
        self.process_result(output, status)
    }

    fn process_result(self: &mut Pin<&mut Self>, output: Vec<u8>, status: Result<miniz_oxide::MZStatus,miniz_oxide::MZError>) -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, String>>> {
        if let Err(err) = status {
            return std::task::Poll::Ready(Some(Err(format!("inflate error {}/{:?}", err as usize, err))));
        }
        if Ok(miniz_oxide::MZStatus::StreamEnd) == status {
            self.stopping = true;
            self.finished = true;
        }
        if output.is_empty() {
            if self.finished {
                std::task::Poll::Ready(None)
            } else {
                std::task::Poll::Pending
            }
        } else {
            let result = Bytes::from(output);
            let frame = hyper::body::Frame::data(result);
            std::task::Poll::Ready(Some(Ok(frame)))
        }
    }

    fn process_end(self: &mut Pin<&mut Self>) -> std::task::Poll<Option<Result<hyper::body::Frame<Bytes>, String>>> {
        assert!(!self.finished);
        assert!(self.current.is_empty());
        if self.compressor.is_none() {
            return std::task::Poll::Ready(Some(Err("inflate end error: cannot deflate empty string".to_string())));
        }
        let mut output = vec![0u8; BUFFER_SIZE];
        let miniz_oxide::StreamResult{bytes_written, status, ..} = miniz_oxide::inflate::stream::inflate(self.compressor.as_mut().unwrap(), &[], &mut output, miniz_oxide::MZFlush::Finish);
        output.truncate(bytes_written);
        match self.process_result(output, status) {
            std::task::Poll::Ready(v) => std::task::Poll::Ready(v),
            // cannot happen, as it is the end of the stream
            std::task::Poll::Pending => std::task::Poll::Ready(Some(Err("decompressor end missing".to_string()))),
        }
    }
}

impl Body for DecompressBody {
    type Data = <BoxedBody as Body>::Data;
    type Error = <BoxedBody as Body>::Error;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
        if self.finished {
            return std::task::Poll::Ready(None);
        }
        loop {
            let retval = Self::process(&mut self);
            if retval.is_ready() {
                return retval;
            }
            match self.inner.as_mut().poll_frame(cx) {
                std::task::Poll::Ready(frame) => {
                    match frame {
                        Some(Ok(frame)) => {
                            if let Ok(data) = frame.into_data() {
                                self.current.put(data);
                            }
                        },
                        Some(Err(_)) => {
                            // propagate error
                            return std::task::Poll::Ready(frame);
                        },
                        None => {
                            // end of stream
                            self.stopping = true;
                            return Self::process_end(&mut self);
                        },
                    }
                },
                // no progress is being made, so more data needed, waiting
                std::task::Poll::Pending => return std::task::Poll::Pending,
            }
        }
    }

    fn is_end_stream(&self) -> bool {
        self.finished
    }

    fn size_hint(&self) -> hyper::body::SizeHint {
        let inner = self.inner.size_hint();
        let mut retval = hyper::body::SizeHint::default();
        if inner.lower() <= 32 {
            retval.set_lower(0);
        } else {
            retval.set_lower(inner.lower() - 32);
        }
        retval
    }
}