aioduct 0.1.10

Async-native HTTP client built directly on hyper 1.x — no hyper-util, no legacy
Documentation
use super::*;
use crate::runtime::CompioRuntime;
use crate::runtime::compio_rt::CompioIo;

async fn make_h1_conn() -> PooledConnection<CompioRuntime> {
    let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
    let addr = listener.local_addr().unwrap();
    listener.set_nonblocking(true).unwrap();
    let async_listener = async_io::Async::new(listener).unwrap();

    let client_tcp = async_io::Async::<std::net::TcpStream>::connect(addr)
        .await
        .unwrap();
    let (server_tcp, _) = async_listener.accept().await.unwrap();

    // Keep server socket alive — drain reads in a background task.
    // Must stay as async_io::Async (not into_inner) so the reactor keeps the fd registered.
    compio_runtime::spawn(async move {
        use futures_io::AsyncRead;
        let mut server = server_tcp;
        let mut buf = [0u8; 1024];
        while std::future::poll_fn(|cx| std::pin::Pin::new(&mut server).poll_read(cx, &mut buf))
            .await
            .unwrap_or(0)
            > 0
        {}
    })
    .detach();

    let io = CompioIo::new(client_tcp);
    let (sender, conn) = hyper::client::conn::http1::handshake(io)
        .await
        .expect("h1 handshake should succeed");

    CompioRuntime::spawn(async move {
        let _ = conn.await;
    });

    PooledConnection::new_h1(sender)
}

fn key(host: &str) -> PoolKey {
    PoolKey::new(
        Scheme::HTTP,
        host.parse::<Authority>().expect("valid authority"),
    )
}

/// Wait for async-io's background reactor to drive the connection driver,
/// yielding multiple times so the cross-reactor wakeup has time to land.
async fn wait_for_ready(pool: &ConnectionPool<CompioRuntime>, k: &PoolKey) -> bool {
    for _ in 0..10 {
        CompioRuntime::sleep(Duration::from_millis(5)).await;
        let inner = pool.inner.lock().unwrap();
        if let Some(queue) = inner.idle.get(k)
            && queue.back().is_some_and(|e| e.connection.is_ready())
        {
            return true;
        }
    }
    false
}

#[test]
fn pool_creates_with_given_parameters() {
    let _pool = ConnectionPool::<CompioRuntime>::new_no_reaper(8, Duration::from_secs(30));
}

#[test]
fn checkout_returns_none_on_empty_pool() {
    let pool = ConnectionPool::<CompioRuntime>::new_no_reaper(8, Duration::from_secs(30));
    assert!(pool.checkout(&key("example.com:80")).is_none());
}

#[test]
fn checkin_then_checkout_returns_connection() {
    compio_runtime::Runtime::new().unwrap().block_on(async {
        let pool = ConnectionPool::<CompioRuntime>::new_no_reaper(8, Duration::from_secs(30));
        let k = key("example.com:80");

        let conn = make_h1_conn().await;
        pool.checkin(k.clone(), conn);

        assert!(
            wait_for_ready(&pool, &k).await,
            "connection should become ready"
        );

        let out = pool.checkout(&k);
        assert!(
            out.is_some(),
            "checkout should return the checked-in connection"
        );
    });
}

#[test]
fn checkout_with_different_key_returns_none() {
    compio_runtime::Runtime::new().unwrap().block_on(async {
        let pool = ConnectionPool::<CompioRuntime>::new_no_reaper(8, Duration::from_secs(30));

        let conn = make_h1_conn().await;
        pool.checkin(key("a.example.com:80"), conn);

        assert!(
            wait_for_ready(&pool, &key("a.example.com:80")).await,
            "connection should become ready"
        );

        assert!(
            pool.checkout(&key("b.example.com:80")).is_none(),
            "checkout with a different key should return None"
        );
    });
}

#[test]
fn checkin_checkout_is_lifo() {
    compio_runtime::Runtime::new().unwrap().block_on(async {
        let pool = ConnectionPool::<CompioRuntime>::new_no_reaper(8, Duration::from_secs(30));
        let k = key("example.com:80");

        let conn1 = make_h1_conn().await;
        let addr1 = std::net::SocketAddr::from(([1, 1, 1, 1], 80));
        let mut conn1 = conn1;
        conn1.remote_addr = Some(addr1);
        pool.checkin(k.clone(), conn1);

        let conn2 = make_h1_conn().await;
        let addr2 = std::net::SocketAddr::from(([2, 2, 2, 2], 80));
        let mut conn2 = conn2;
        conn2.remote_addr = Some(addr2);
        pool.checkin(k.clone(), conn2);

        assert!(
            wait_for_ready(&pool, &k).await,
            "connections should become ready"
        );

        let out = pool.checkout(&k).expect("should get a connection");
        assert_eq!(
            out.remote_addr,
            Some(addr2),
            "LIFO: most recent connection first"
        );
    });
}

#[test]
fn pool_respects_max_idle_per_host() {
    compio_runtime::Runtime::new().unwrap().block_on(async {
        let max_idle = 2;
        let pool =
            ConnectionPool::<CompioRuntime>::new_no_reaper(max_idle, Duration::from_secs(30));
        let k = key("example.com:80");

        for _ in 0..3 {
            let conn = make_h1_conn().await;
            pool.checkin(k.clone(), conn);
        }

        assert!(
            wait_for_ready(&pool, &k).await,
            "connections should become ready"
        );

        assert!(pool.checkout(&k).is_some(), "1st checkout should succeed");
        assert!(pool.checkout(&k).is_some(), "2nd checkout should succeed");
        assert!(
            pool.checkout(&k).is_none(),
            "3rd checkout should return None (capacity was 2)"
        );
    });
}

#[test]
fn checkout_expired_connection_returns_none() {
    compio_runtime::Runtime::new().unwrap().block_on(async {
        let pool = ConnectionPool::<CompioRuntime>::new_no_reaper(8, Duration::from_millis(50));
        let k = key("example.com:80");

        let conn = make_h1_conn().await;
        pool.checkin(k.clone(), conn);

        CompioRuntime::sleep(Duration::from_millis(100)).await;

        assert!(
            pool.checkout(&k).is_none(),
            "expired connection should be discarded"
        );
    });
}

#[test]
fn reaper_removes_expired_connections() {
    compio_runtime::Runtime::new().unwrap().block_on(async {
        let pool = ConnectionPool::<CompioRuntime>::new(1, Duration::from_millis(50));
        let k = key("example.com:80");

        let conn = make_h1_conn().await;
        pool.checkin(k.clone(), conn);

        CompioRuntime::sleep(Duration::from_millis(150)).await;

        assert!(
            pool.checkout(&k).is_none(),
            "reaper should have removed the expired connection"
        );
    });
}