use super::*;
use crate::body::RequestBodySend;
use crate::runtime::TokioRuntime;
use crate::runtime::tokio_rt::TokioIo;
async fn make_h1_conn() -> PooledConnection<RequestBodySend> {
let (client_io, mut server_io) = tokio::io::duplex(1024);
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 1024];
loop {
match server_io.read(&mut buf).await {
Ok(0) | Err(_) => break,
_ => {}
}
}
});
let io = TokioIo::new(client_io);
let (sender, conn) = hyper::client::conn::http1::handshake(io)
.await
.expect("h1 handshake should succeed on duplex");
tokio::spawn(async move {
let _ = conn.await;
});
PooledConnection::new_h1(sender)
}
fn key(host: &str) -> PoolKey {
PoolKey::new(
Scheme::HTTP,
host.parse::<Authority>().expect("valid authority"),
)
}
#[test]
fn checkout_returns_none_on_empty_pool() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
assert!(pool.checkout(&key("example.com:80")).is_none());
}
#[tokio::test]
async fn checkin_then_checkout_returns_connection() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let out = pool.checkout(&k);
assert!(
out.is_some(),
"checkout should return the checked-in connection"
);
}
#[tokio::test]
async fn checkout_with_different_key_returns_none() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let conn = make_h1_conn().await;
pool.checkin(key("a.example.com:80"), conn);
tokio::task::yield_now().await;
assert!(
pool.checkout(&key("b.example.com:80")).is_none(),
"checkout with a different key should return None"
);
}
#[tokio::test]
async fn pool_respects_max_idle_per_host() {
let max_idle = 2;
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(max_idle, Duration::from_secs(30));
let k = key("example.com:80");
for _ in 0..3 {
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
}
tokio::task::yield_now().await;
assert!(pool.checkout(&k).is_some(), "1st checkout should succeed");
assert!(pool.checkout(&k).is_some(), "2nd checkout should succeed");
assert!(
pool.checkout(&k).is_none(),
"3rd checkout should return None (capacity was 2)"
);
}
#[tokio::test]
async fn checkin_checkout_is_lifo() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let conn1 = make_h1_conn().await;
let addr1 = std::net::SocketAddr::from(([1, 1, 1, 1], 80));
let mut conn1 = conn1;
conn1.remote_addr = Some(addr1);
pool.checkin(k.clone(), conn1);
let conn2 = make_h1_conn().await;
let addr2 = std::net::SocketAddr::from(([2, 2, 2, 2], 80));
let mut conn2 = conn2;
conn2.remote_addr = Some(addr2);
pool.checkin(k.clone(), conn2);
tokio::task::yield_now().await;
let out = pool.checkout(&k).expect("should get a connection");
assert_eq!(
out.remote_addr,
Some(addr2),
"LIFO: most recent connection first"
);
}
#[tokio::test]
async fn checkout_expired_connection_returns_none() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_millis(50));
let k = key("example.com:80");
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
pool.checkout(&k).is_none(),
"expired connection should be discarded"
);
}
#[tokio::test]
async fn reaper_removes_expired_connections() {
let pool = ConnectionPool::<RequestBodySend>::new(1, Duration::from_millis(50));
pool.ensure_reaper::<TokioRuntime>();
let k = key("example.com:80");
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(
pool.checkout(&k).is_none(),
"reaper should have removed the expired connection"
);
}
use std::net::IpAddr;
async fn make_h2_conn() -> PooledConnection<RequestBodySend> {
let (client_io, server_io) = tokio::io::duplex(65536);
tokio::spawn(async move {
use hyper::server::conn::http2::Builder;
use hyper::service::service_fn;
let io = TokioIo::new(server_io);
let _ = Builder::new(crate::runtime::executor::poll_executor::<TokioRuntime>())
.serve_connection(
io,
service_fn(|_req| async {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
http_body_util::Empty::<bytes::Bytes>::new(),
))
}),
)
.await;
});
let io = TokioIo::new(client_io);
let (sender, conn) = hyper::client::conn::http2::handshake(
crate::runtime::executor::poll_executor::<TokioRuntime>(),
io,
)
.await
.expect("h2 handshake should succeed on duplex");
tokio::spawn(async move {
let _ = conn.await;
});
PooledConnection::new_h2(sender)
}
fn key_https(host: &str) -> PoolKey {
PoolKey::new(
Scheme::HTTPS,
host.parse::<Authority>().expect("valid authority"),
)
}
#[tokio::test]
async fn checkout_coalesced_finds_by_san() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"cdn.example.com".into(),
"api.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(result.is_some(), "should find coalesced connection via SAN");
}
#[tokio::test]
async fn checkout_coalesced_rejects_h1() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h1_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(result.is_none(), "h1 connections should not be coalesced");
}
#[tokio::test]
async fn checkout_coalesced_rejects_different_ip() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let different_ip: IpAddr = [10, 0, 0, 2].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(different_ip));
assert!(result.is_none(), "different IP should prevent coalescing");
}
#[tokio::test]
async fn checkout_coalesced_skips_expired() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_millis(50));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::time::sleep(Duration::from_millis(100)).await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(
result.is_none(),
"expired connection should not be returned"
);
}
#[test]
fn checkout_coalesced_empty_pool_returns_none() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(result.is_none(), "empty pool should return None");
}
#[test]
fn mark_connecting_h2_returns_false_first_time() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
assert!(!pool.mark_connecting_h2(&k));
}
#[test]
fn mark_connecting_h2_returns_true_when_already_present() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
assert!(!pool.mark_connecting_h2(&k));
assert!(pool.mark_connecting_h2(&k));
}
#[test]
fn unmark_connecting_h2_allows_re_mark() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
assert!(!pool.mark_connecting_h2(&k));
pool.unmark_connecting_h2(&k);
assert!(!pool.mark_connecting_h2(&k));
}
#[test]
fn pool_key_new_vs_with_hint_default() {
let k1 = PoolKey::new(Scheme::HTTP, "example.com:80".parse().unwrap());
let k2 = PoolKey::with_hint(
Scheme::HTTP,
"example.com:80".parse().unwrap(),
ProtocolHint::Auto,
);
assert_eq!(k1, k2);
}
#[test]
fn pool_key_different_hints_not_equal() {
let k1 = PoolKey::with_hint(
Scheme::HTTP,
"example.com:80".parse().unwrap(),
ProtocolHint::Auto,
);
let k2 = PoolKey::with_hint(
Scheme::HTTP,
"example.com:80".parse().unwrap(),
ProtocolHint::H2c,
);
assert_ne!(k1, k2);
}
#[test]
fn protocol_hint_debug() {
assert_eq!(format!("{:?}", ProtocolHint::Auto), "Auto");
assert_eq!(format!("{:?}", ProtocolHint::H2c), "H2c");
assert_eq!(format!("{:?}", ProtocolHint::AdaptiveH2c), "AdaptiveH2c");
}
#[tokio::test]
async fn checkout_h2_clone_for_multiplex() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("example.com:443");
let conn = make_h2_conn().await;
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let out1 = pool.checkout(&k);
assert!(out1.is_some(), "first checkout should succeed");
let out2 = pool.checkout(&k);
assert!(
out2.is_some(),
"second checkout should succeed (H2 multiplex clone)"
);
}
#[tokio::test]
async fn checkout_coalesced_without_ip_check() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let result = pool.checkout_coalesced("cdn.example.com", None);
assert!(
result.is_some(),
"should find coalesced connection when resolved_ip is None"
);
}
#[tokio::test]
async fn checkout_coalesced_san_not_found() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("unknown.example.com", Some(ip));
assert!(result.is_none(), "SAN not in cert should return None");
}
#[tokio::test]
async fn reaper_cleans_san_index_for_expired_connections() {
let pool = ConnectionPool::<RequestBodySend>::new(1, Duration::from_millis(50));
pool.ensure_reaper::<TokioRuntime>();
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::time::sleep(Duration::from_millis(150)).await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(
result.is_none(),
"reaper should have cleaned expired connections and SAN index"
);
}
#[tokio::test]
async fn reaper_retains_live_connections() {
let pool = ConnectionPool::<RequestBodySend>::new(4, Duration::from_secs(10));
pool.ensure_reaper::<TokioRuntime>();
let k = key("example.com:80");
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
tokio::time::sleep(Duration::from_millis(50)).await;
let result = pool.checkout(&k);
assert!(
result.is_some(),
"reaper should retain connections that haven't expired"
);
}
#[tokio::test]
async fn pooled_connection_new_h1_defaults() {
let conn = make_h1_conn().await;
assert!(conn.remote_addr.is_none());
assert!(conn.tls_info.is_none());
assert!(conn.tls_handshake_duration.is_none());
assert!(conn.sans.is_empty());
assert_eq!(conn.requests_served, 0);
assert_eq!(conn.bytes_sent, 0);
assert_eq!(conn.bytes_received, 0);
assert!(!conn.is_multiplex_clone);
assert!(!conn.is_h2_or_h3());
}
#[tokio::test]
async fn pooled_connection_new_h2_defaults() {
let conn = make_h2_conn().await;
assert!(conn.remote_addr.is_none());
assert!(conn.tls_info.is_none());
assert!(conn.tls_handshake_duration.is_none());
assert!(conn.sans.is_empty());
assert_eq!(conn.requests_served, 0);
assert_eq!(conn.bytes_sent, 0);
assert_eq!(conn.bytes_received, 0);
assert!(!conn.is_multiplex_clone);
assert!(conn.is_h2_or_h3());
assert!(conn.is_ready());
}
#[tokio::test]
async fn clone_for_multiplex_returns_none_for_h1() {
let conn = make_h1_conn().await;
assert!(conn.clone_for_multiplex().is_none());
}
#[tokio::test]
async fn clone_for_multiplex_returns_some_for_h2() {
let mut conn = make_h2_conn().await;
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
conn.requests_served = 5;
conn.bytes_sent = 1024;
conn.bytes_received = 4096;
let cloned = conn.clone_for_multiplex();
assert!(cloned.is_some());
let cloned = cloned.unwrap();
assert!(cloned.is_multiplex_clone);
assert_eq!(
cloned.remote_addr,
Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)))
);
assert_eq!(cloned.requests_served, 0);
assert_eq!(cloned.bytes_sent, 0);
assert_eq!(cloned.bytes_received, 0);
assert!(cloned.is_h2_or_h3());
}
#[test]
fn pool_key_debug_format() {
let k = PoolKey::new(Scheme::HTTPS, "example.com:443".parse().unwrap());
let debug = format!("{:?}", k);
assert!(debug.contains("example.com:443"));
assert!(debug.contains("Auto"));
}
#[test]
fn pool_key_with_hint_h2c_debug() {
let k = PoolKey::with_hint(
Scheme::HTTP,
"example.com:80".parse().unwrap(),
ProtocolHint::H2c,
);
let debug = format!("{:?}", k);
assert!(debug.contains("H2c"));
}
#[test]
fn pool_key_with_hint_adaptive_debug() {
let k = PoolKey::with_hint(
Scheme::HTTP,
"example.com:80".parse().unwrap(),
ProtocolHint::AdaptiveH2c,
);
let debug = format!("{:?}", k);
assert!(debug.contains("AdaptiveH2c"));
}
#[tokio::test]
async fn pool_clone_shares_state() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let pool2 = pool.clone();
let k = key("example.com:80");
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let out = pool2.checkout(&k);
assert!(out.is_some(), "cloned pool should share internal state");
}
#[tokio::test]
async fn checkin_evicts_oldest_when_at_capacity() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(1, Duration::from_secs(30));
let k = key("example.com:80");
let mut conn1 = make_h1_conn().await;
conn1.remote_addr = Some(std::net::SocketAddr::from(([1, 1, 1, 1], 80)));
pool.checkin(k.clone(), conn1);
let mut conn2 = make_h1_conn().await;
conn2.remote_addr = Some(std::net::SocketAddr::from(([2, 2, 2, 2], 80)));
pool.checkin(k.clone(), conn2);
tokio::task::yield_now().await;
let out = pool.checkout(&k).expect("should have a connection");
assert_eq!(
out.remote_addr,
Some(std::net::SocketAddr::from(([2, 2, 2, 2], 80))),
"should get the newer connection after eviction"
);
assert!(
pool.checkout(&k).is_none(),
"only one connection should remain"
);
}
#[tokio::test]
async fn checkin_populates_san_index() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "alt.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("alt.example.com", Some(ip));
assert!(result.is_some(), "SAN index should be populated on checkin");
}
#[tokio::test]
async fn checkout_coalesced_multiple_sans_finds_any() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"first.example.com".into(),
"second.example.com".into(),
"third.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
assert!(
pool.checkout_coalesced("first.example.com", Some(ip))
.is_some()
);
assert!(
pool.checkout_coalesced("second.example.com", Some(ip))
.is_some()
);
assert!(
pool.checkout_coalesced("third.example.com", Some(ip))
.is_some()
);
}
#[test]
fn mark_connecting_h2_independent_keys() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k1 = key("a.example.com:80");
let k2 = key("b.example.com:80");
assert!(!pool.mark_connecting_h2(&k1));
assert!(!pool.mark_connecting_h2(&k2));
assert!(pool.mark_connecting_h2(&k1));
assert!(pool.mark_connecting_h2(&k2));
pool.unmark_connecting_h2(&k1);
assert!(!pool.mark_connecting_h2(&k1));
assert!(pool.mark_connecting_h2(&k2));
}
#[tokio::test]
async fn checkout_coalesced_cleans_stale_san_index() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_millis(50));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"stale.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::time::sleep(Duration::from_millis(100)).await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("stale.example.com", Some(ip));
assert!(
result.is_none(),
"expired connection should not be returned"
);
let result2 = pool.checkout_coalesced("stale.example.com", Some(ip));
assert!(result2.is_none());
}
async fn make_h2_conn_with_shutdown() -> (
PooledConnection<RequestBodySend>,
tokio::sync::oneshot::Sender<()>,
) {
let (client_io, server_io) = tokio::io::duplex(65536);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
use hyper::server::conn::http2::Builder;
use hyper::service::service_fn;
let io = TokioIo::new(server_io);
let conn = Builder::new(crate::runtime::executor::poll_executor::<TokioRuntime>())
.serve_connection(
io,
service_fn(|_req| async {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
http_body_util::Empty::<bytes::Bytes>::new(),
))
}),
);
tokio::select! {
_ = conn => {},
_ = shutdown_rx => {},
}
});
let io = TokioIo::new(client_io);
let (sender, conn) = hyper::client::conn::http2::handshake(
crate::runtime::executor::poll_executor::<TokioRuntime>(),
io,
)
.await
.expect("h2 handshake should succeed on duplex");
tokio::spawn(async move {
let _ = conn.await;
});
(PooledConnection::new_h2(sender), shutdown_tx)
}
#[tokio::test]
async fn checkout_coalesced_removes_not_ready_connection() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let (mut conn, shutdown_tx) = make_h2_conn_with_shutdown().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
let _ = shutdown_tx.send(());
tokio::time::sleep(Duration::from_millis(50)).await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(
result.is_none(),
"not-ready connection should be removed and return None"
);
}
#[tokio::test]
async fn checkout_coalesced_cleans_san_index_when_no_connections_remain() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let (mut conn, shutdown_tx) = make_h2_conn_with_shutdown().await;
conn.sans = std::sync::Arc::from(vec!["origin.example.com".into(), "cdn.example.com".into()]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
let _ = shutdown_tx.send(());
tokio::time::sleep(Duration::from_millis(50)).await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(result.is_none());
let result2 = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(result2.is_none());
assert!(
pool.checkout(&k).is_none(),
"pool should have no connections left"
);
}
#[tokio::test]
async fn checkout_coalesced_san_index_stale_entry_continue_on_missing_queue() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k1 = key_https("origin1.example.com:443");
let k2 = key_https("origin2.example.com:443");
let (mut conn1, shutdown1) = make_h2_conn_with_shutdown().await;
conn1.sans = std::sync::Arc::from(vec!["origin1.example.com".into(), "cdn.example.com".into()]);
conn1.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k1.clone(), conn1);
let mut conn2 = make_h2_conn().await;
conn2.sans = std::sync::Arc::from(vec!["origin2.example.com".into(), "cdn.example.com".into()]);
conn2.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k2.clone(), conn2);
let _ = shutdown1.send(());
tokio::time::sleep(Duration::from_millis(50)).await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cdn.example.com", Some(ip));
assert!(
result.is_some(),
"should find the second healthy connection even if first is not-ready"
);
}
#[tokio::test]
async fn pool_zero_max_idle_always_evicts() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(1, Duration::from_secs(30));
let k = key("example.com:80");
let conn = make_h1_conn().await;
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
assert!(pool.checkout(&k).is_some());
assert!(pool.checkout(&k).is_none());
}
#[tokio::test]
async fn checkout_coalesced_san_index_has_entry_but_conn_sans_dont_match() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"shared.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("shared.example.com", Some(ip));
assert!(result.is_some(), "shared SAN should match");
let result = pool.checkout_coalesced("other.example.com", Some(ip));
assert!(result.is_none(), "host not in SANs should not match");
}
#[tokio::test]
async fn checkout_coalesced_last_connection_in_queue_triggers_removal() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h1_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"coalesced.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("coalesced.example.com", Some(ip));
assert!(result.is_none(), "H1 should be rejected by coalescing");
let result = pool.checkout(&k);
assert!(result.is_some(), "H1 connection should still be in pool");
}
#[tokio::test]
async fn ensure_reaper_local_removes_expired() {
let pool = ConnectionPool::<RequestBodySend>::new(1, Duration::from_millis(50));
pool.ensure_reaper::<TokioRuntime>();
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"reaper-test.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(
pool.checkout(&k).is_none(),
"reaper should have removed expired connection"
);
let ip: IpAddr = [10, 0, 0, 1].into();
assert!(
pool.checkout_coalesced("reaper-test.example.com", Some(ip))
.is_none(),
"reaper should have cleaned SAN index"
);
}
#[tokio::test]
async fn checkin_populates_san_index_for_all_sans() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"san1.example.com".into(),
"san2.example.com".into(),
"san3.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k, conn);
tokio::task::yield_now().await;
let ip: IpAddr = [10, 0, 0, 1].into();
assert!(
pool.checkout_coalesced("san1.example.com", Some(ip))
.is_some()
);
assert!(
pool.checkout_coalesced("san2.example.com", Some(ip))
.is_some()
);
assert!(
pool.checkout_coalesced("san3.example.com", Some(ip))
.is_some()
);
}
#[tokio::test]
async fn checkout_skips_not_ready_h1_connection() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let (mut conn, shutdown_tx) = make_h2_conn_with_shutdown().await;
conn.remote_addr = Some(std::net::SocketAddr::from(([1, 1, 1, 1], 80)));
let (client_io, server_io) = tokio::io::duplex(8192);
drop(server_io);
let io = TokioIo::new(client_io);
let (sender, conn_task) = hyper::client::conn::http1::handshake(io)
.await
.expect("h1 handshake");
tokio::spawn(async move {
let _ = conn_task.await;
});
let mut h1_conn = PooledConnection::new_h1(sender);
h1_conn.remote_addr = Some(std::net::SocketAddr::from(([1, 1, 1, 1], 80)));
pool.checkin(k.clone(), h1_conn);
tokio::time::sleep(Duration::from_millis(50)).await;
let result = pool.checkout(&k);
assert!(
result.is_none(),
"not-ready H1 connection should be skipped"
);
let _ = shutdown_tx.send(());
}
#[tokio::test]
async fn checkout_coalesced_san_index_key_but_idle_empty() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("origin.example.com:443");
let mut conn = make_h1_conn().await;
conn.sans = std::sync::Arc::from(vec![
"origin.example.com".into(),
"coalesce-target.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let out = pool.checkout(&k);
assert!(out.is_some(), "regular checkout should succeed");
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("coalesce-target.example.com", Some(ip));
assert!(
result.is_none(),
"should return None when idle map has no entry for the san_index key"
);
}
#[tokio::test]
async fn checkout_coalesced_san_index_cleanup_removes_empty_set() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key_https("only-origin.example.com:443");
let mut conn = make_h1_conn().await;
conn.sans = std::sync::Arc::from(vec![
"only-origin.example.com".into(),
"cleanup-target.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
tokio::task::yield_now().await;
let _ = pool.checkout(&k);
let ip: IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("cleanup-target.example.com", Some(ip));
assert!(result.is_none());
let result2 = pool.checkout_coalesced("cleanup-target.example.com", Some(ip));
assert!(
result2.is_none(),
"SAN index should have been fully cleaned"
);
}
#[tokio::test]
async fn reaper_loop_runs_multiple_cycles() {
let pool = ConnectionPool::<RequestBodySend>::new(4, Duration::from_millis(30));
pool.ensure_reaper::<TokioRuntime>();
let k = key("reaper-multi.example.com:80");
let mut conn1 = make_h1_conn().await;
conn1.remote_addr = Some(std::net::SocketAddr::from(([1, 1, 1, 1], 80)));
pool.checkin(k.clone(), conn1);
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(pool.checkout(&k).is_none(), "first conn should be reaped");
let mut conn2 = make_h1_conn().await;
conn2.remote_addr = Some(std::net::SocketAddr::from(([2, 2, 2, 2], 80)));
pool.checkin(k.clone(), conn2);
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(pool.checkout(&k).is_none(), "second conn should be reaped");
}
#[tokio::test]
async fn reaper_cleans_san_index_keys_not_in_idle() {
let pool = ConnectionPool::<RequestBodySend>::new(4, Duration::from_millis(30));
pool.ensure_reaper::<TokioRuntime>();
let k = key_https("reaper-san.example.com:443");
let mut conn = make_h2_conn().await;
conn.sans = std::sync::Arc::from(vec![
"reaper-san.example.com".into(),
"reaper-alt.example.com".into(),
]);
conn.remote_addr = Some(std::net::SocketAddr::from(([10, 0, 0, 1], 443)));
pool.checkin(k.clone(), conn);
tokio::time::sleep(Duration::from_millis(80)).await;
assert!(pool.checkout(&k).is_none());
let ip: IpAddr = [10, 0, 0, 1].into();
assert!(
pool.checkout_coalesced("reaper-alt.example.com", Some(ip))
.is_none()
);
}
#[tokio::test]
async fn checkout_tries_multiple_candidates_lifo() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let mut conn1 = make_h1_conn().await;
conn1.remote_addr = Some(std::net::SocketAddr::from(([1, 1, 1, 1], 80)));
pool.checkin(k.clone(), conn1);
let mut conn2 = make_h1_conn().await;
conn2.remote_addr = Some(std::net::SocketAddr::from(([2, 2, 2, 2], 80)));
pool.checkin(k.clone(), conn2);
tokio::task::yield_now().await;
let out1 = pool.checkout(&k).expect("first checkout");
assert_eq!(
out1.remote_addr,
Some(std::net::SocketAddr::from(([2, 2, 2, 2], 80)))
);
let out2 = pool.checkout(&k).expect("second checkout");
assert_eq!(
out2.remote_addr,
Some(std::net::SocketAddr::from(([1, 1, 1, 1], 80)))
);
assert!(pool.checkout(&k).is_none());
}
#[tokio::test]
async fn evict_removes_all_connections_for_key() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let conn1 = make_h1_conn().await;
pool.checkin(k.clone(), conn1);
let conn2 = make_h1_conn().await;
pool.checkin(k.clone(), conn2);
tokio::task::yield_now().await;
pool.evict(&k);
assert!(
pool.checkout(&k).is_none(),
"evict should remove all connections for the key"
);
}
#[tokio::test]
async fn evict_does_not_affect_other_keys() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k1 = key("a.example.com:80");
let k2 = key("b.example.com:80");
let conn1 = make_h1_conn().await;
pool.checkin(k1.clone(), conn1);
let conn2 = make_h1_conn().await;
pool.checkin(k2.clone(), conn2);
tokio::task::yield_now().await;
pool.evict(&k1);
assert!(pool.checkout(&k1).is_none());
assert!(
pool.checkout(&k2).is_some(),
"evict should not affect other keys"
);
}