cross-stream 0.12.0

An event stream store for personal, local-first use, specializing in event sourcing.
Documentation
use http_body_util::BodyExt;
use hyper::{Method, Request};
use hyper_util::rt::TokioIo;

use super::connect::connect;
use super::types::{BoxError, RequestParts};

pub async fn request<B>(
    addr: &str,
    method: Method,
    path: &str,
    query: Option<&str>,
    body: B,
    headers: Option<Vec<(String, String)>>,
) -> Result<hyper::Response<hyper::body::Incoming>, BoxError>
where
    B: hyper::body::Body<Data = hyper::body::Bytes> + Send + 'static,
    B::Error: Into<BoxError> + Send,
{
    let parts = RequestParts::parse(addr, path, query)?;
    let stream = connect(&parts).await?;
    let io = TokioIo::new(stream);
    let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;

    tokio::spawn(async move {
        if let Err(e) = conn.await {
            eprintln!("Connection error: {e}");
        }
    });

    let mut builder = Request::builder()
        .method(method)
        .uri(parts.uri)
        .header(hyper::header::USER_AGENT, "xs/0.1");

    // Only set default Accept header if no custom Accept header is provided
    let has_custom_accept = headers
        .as_ref()
        .map(|h| {
            h.iter()
                .any(|(name, _)| name.eq_ignore_ascii_case("Accept"))
        })
        .unwrap_or(false);

    if !has_custom_accept {
        builder = builder.header(hyper::header::ACCEPT, "*/*");
    }

    if let Some(host) = parts.host {
        builder = builder.header(hyper::header::HOST, host);
    }
    if let Some(auth) = parts.authorization {
        builder = builder.header(hyper::header::AUTHORIZATION, auth);
    }

    if let Some(extra_headers) = headers {
        for (name, value) in extra_headers {
            builder = builder.header(name, value);
        }
    }

    let req = builder.body(body)?;
    let res = sender.send_request(req).await?;

    // Handle non-OK responses
    if res.status() != hyper::StatusCode::OK && res.status() != hyper::StatusCode::NO_CONTENT {
        let status = res.status();
        if status == hyper::StatusCode::NOT_FOUND {
            return Err(Box::new(crate::error::NotFound));
        }
        let body = res.collect().await?.to_bytes();
        return Err(String::from_utf8_lossy(&body).to_string().into());
    }

    Ok(res)
}