aioduct 0.1.10

Async-native HTTP client built directly on hyper 1.x — no hyper-util, no legacy
Documentation
#![cfg(feature = "blocking")]

use std::convert::Infallible;
use std::net::SocketAddr;
use std::time::Duration;

use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::http1 as server_http1;
use hyper::service::service_fn;
use hyper::{Request, Response};

use aioduct::blocking::Client;

async fn hello(_req: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    Ok(Response::new(Full::new(Bytes::from("hello blocking"))))
}

async fn echo_body(
    req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, Infallible> {
    use http_body_util::BodyExt;
    let body = req.collect().await.unwrap().to_bytes();
    Ok(Response::new(Full::new(body)))
}

async fn slow(_req: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    tokio::time::sleep(Duration::from_secs(5)).await;
    Ok(Response::new(Full::new(Bytes::from("slow"))))
}

fn start_server_with<F, Fut>(handler: F) -> SocketAddr
where
    F: Fn(Request<hyper::body::Incoming>) -> Fut + Send + Clone + 'static,
    Fut: std::future::Future<Output = Result<Response<Full<Bytes>>, Infallible>> + Send,
{
    let (tx, rx) = std::sync::mpsc::channel();
    std::thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async {
            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
            let addr = listener.local_addr().unwrap();
            tx.send(addr).unwrap();

            loop {
                let (stream, _) = listener.accept().await.unwrap();
                let io = aioduct::runtime::tokio_rt::TokioIo::new(stream);
                let handler = handler.clone();
                tokio::spawn(async move {
                    let _ = server_http1::Builder::new()
                        .serve_connection(io, service_fn(handler))
                        .await;
                });
            }
        });
    });
    rx.recv().unwrap()
}

#[test]
fn blocking_get() {
    let addr = start_server_with(hello);
    let client = Client::new();
    let resp = client
        .get(&format!("http://{addr}/"))
        .unwrap()
        .send()
        .unwrap();

    assert_eq!(resp.status(), http::StatusCode::OK);
    let body = resp.text().unwrap();
    assert_eq!(body, "hello blocking");
}

#[test]
fn blocking_post_with_body() {
    let addr = start_server_with(echo_body);
    let client = Client::new();
    let resp = client
        .post(&format!("http://{addr}/"))
        .unwrap()
        .body("request body")
        .send()
        .unwrap();

    assert_eq!(resp.status(), http::StatusCode::OK);
    let body = resp.text().unwrap();
    assert_eq!(body, "request body");
}

#[test]
fn blocking_custom_header() {
    let addr = start_server_with(|req: Request<hyper::body::Incoming>| async move {
        let val = req
            .headers()
            .get("x-custom")
            .map(|v| v.to_str().unwrap().to_owned())
            .unwrap_or_default();
        Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(val))))
    });

    let client = Client::new();
    let resp = client
        .get(&format!("http://{addr}/"))
        .unwrap()
        .header(
            http::header::HeaderName::from_static("x-custom"),
            http::header::HeaderValue::from_static("test-value"),
        )
        .send()
        .unwrap();

    assert_eq!(resp.text().unwrap(), "test-value");
}

#[test]
fn blocking_timeout() {
    let addr = start_server_with(slow);
    let client = Client::builder()
        .timeout(Duration::from_millis(100))
        .build();
    let result = client.get(&format!("http://{addr}/")).unwrap().send();

    assert!(result.is_err());
}

#[test]
fn blocking_head_request() {
    let addr = start_server_with(hello);
    let client = Client::new();
    let resp = client
        .head(&format!("http://{addr}/"))
        .unwrap()
        .send()
        .unwrap();
    assert_eq!(resp.status(), http::StatusCode::OK);
}

#[test]
fn blocking_put_request() {
    let addr = start_server_with(echo_body);
    let client = Client::new();
    let resp = client
        .put(&format!("http://{addr}/"))
        .unwrap()
        .body("put data")
        .send()
        .unwrap();
    assert_eq!(resp.text().unwrap(), "put data");
}

#[test]
fn blocking_error_for_status() {
    let addr = start_server_with(|_req: Request<hyper::body::Incoming>| async move {
        Ok::<_, Infallible>(
            Response::builder()
                .status(404)
                .body(Full::new(Bytes::new()))
                .unwrap(),
        )
    });
    let client = Client::new();
    let resp = client
        .get(&format!("http://{addr}/"))
        .unwrap()
        .send()
        .unwrap();
    assert!(resp.error_for_status().is_err());
}

#[test]
fn blocking_connection_reuse() {
    let addr = start_server_with(hello);
    let client = Client::new();
    let url = format!("http://{addr}/");

    let resp1 = client.get(&url).unwrap().send().unwrap();
    assert_eq!(resp1.status(), http::StatusCode::OK);
    let _ = resp1.bytes().unwrap();

    let resp2 = client.get(&url).unwrap().send().unwrap();
    assert_eq!(resp2.status(), http::StatusCode::OK);
    assert_eq!(resp2.text().unwrap(), "hello blocking");
}

#[test]
fn blocking_content_length() {
    let addr = start_server_with(|_req: Request<hyper::body::Incoming>| async move {
        Ok::<_, Infallible>(
            Response::builder()
                .header("Content-Length", "5")
                .body(Full::new(Bytes::from("12345")))
                .unwrap(),
        )
    });
    let client = Client::new();
    let resp = client
        .get(&format!("http://{addr}/"))
        .unwrap()
        .send()
        .unwrap();
    assert_eq!(resp.content_length(), Some(5));
}

#[cfg(feature = "json")]
#[test]
fn blocking_json() {
    let addr = start_server_with(|_req: Request<hyper::body::Incoming>| async move {
        Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(r#"{"key":"value"}"#))))
    });

    let client = Client::new();
    let resp = client
        .get(&format!("http://{addr}/"))
        .unwrap()
        .send()
        .unwrap();
    let data: serde_json::Value = resp.json().unwrap();
    assert_eq!(data["key"], "value");
}