#![cfg(all(feature = "std", feature = "async"))]
extern crate bitreq;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::task::JoinSet;
async fn spawn_keep_alive_max_one_server() -> std::net::SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (mut sock, _) = match listener.accept().await {
Ok(x) => x,
Err(_) => return,
};
tokio::spawn(async move {
let mut buf = [0u8; 4096];
let mut acc: Vec<u8> = Vec::new();
loop {
let n = match sock.read(&mut buf).await {
Ok(0) | Err(_) => return,
Ok(n) => n,
};
acc.extend_from_slice(&buf[..n]);
while let Some(end) = find_double_crlf(&acc) {
acc.drain(..end);
let response = b"HTTP/1.1 200 OK\r\n\
Content-Length: 0\r\n\
Keep-Alive: max=1\r\n\
Connection: keep-alive\r\n\
\r\n";
if sock.write_all(response).await.is_err() {
return;
}
}
}
});
}
});
addr
}
fn find_double_crlf(buf: &[u8]) -> Option<usize> {
buf.windows(4).position(|w| w == b"\r\n\r\n").map(|i| i + 4)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn pipelined_requests_on_keep_alive_max_one() {
const PIPELINED_REQUESTS: usize = 100;
const POOL_SIZE: usize = 20;
let addr = spawn_keep_alive_max_one_server().await;
let url = format!("http://{}/", addr);
let client = bitreq::Client::new(POOL_SIZE);
let _ = client
.send_async(bitreq::Request::new(bitreq::Method::Get, &url))
.await
.expect("priming request succeeds");
let mut set = JoinSet::new();
for i in 0..PIPELINED_REQUESTS {
let client = client.clone();
let url = url.clone();
set.spawn(async move {
println!("Launching request {}", i);
let req = bitreq::Request::new(bitreq::Method::Get, url).with_pipelining();
let res = client.send_async(req).await.expect("pipelined request succeeds");
println!("Got response {}", i);
res
});
}
let collect = async {
let mut results = Vec::with_capacity(PIPELINED_REQUESTS);
while let Some(res) = set.join_next().await {
results.push(res.expect("task panicked"));
}
results
};
let results =
tokio::time::timeout(Duration::from_secs(10), collect).await.unwrap_or_else(|_| {
panic!("{PIPELINED_REQUESTS} pipelined requests did not finish within 10s")
});
assert_eq!(results.len(), PIPELINED_REQUESTS);
}