use std::{hint::black_box, time::Duration};
use assert_matches::assert_matches;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::future;
use http::{request, HeaderMap, Request, Response, StatusCode};
use crate::{
client,
config::Settings,
error::{Code, ConnectionError, LocalError, StreamError},
proto::{
coding::Encode,
frame::{self, Frame, FrameType},
headers::Header,
push::PushId,
varint::VarInt,
},
qpack,
quic::ConnectionErrorIncoming,
server,
tests::get_stream_blocking,
ConnectionState,
};
use super::h3_quinn;
use super::{init_tracing, Pair};
#[tokio::test]
async fn get() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async move {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
let response = request_stream.recv_response().await.expect("recv response");
assert_eq!(response.status(), StatusCode::OK);
let body = request_stream
.recv_data()
.await
.expect("recv data")
.expect("body");
assert_eq!(body.chunk(), b"wonderful hypertext");
};
tokio::join!(req_fut, drive_fut)
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
request_stream
.send_data("wonderful hypertext".into())
.await
.expect("send_data");
request_stream.finish().await.expect("finish");
assert_matches!(
incoming_req.accept().await.err().unwrap(),
ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose{error_code: code, ..})
if code == Code::H3_NO_ERROR.value()
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn get_with_trailers_unknown_content_type() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async move {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream.recv_response().await.expect("recv response");
request_stream
.recv_data()
.await
.expect("recv data")
.expect("body");
assert!(request_stream.recv_data().await.unwrap().is_none());
let trailers = request_stream
.recv_trailers()
.await
.expect("recv trailers")
.expect("trailers none");
assert_eq!(trailers.get("trailer").unwrap(), &"value");
};
tokio::join!(req_fut, drive_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
request_stream
.send_data("wonderful hypertext".into())
.await
.expect("send_data");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
request_stream
.send_trailers(trailers)
.await
.expect("send_trailers");
request_stream.finish().await.expect("finish");
assert_matches!(
incoming_req.accept().await.err().unwrap(),
ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose{error_code: code, ..})
if code == Code::H3_NO_ERROR.value()
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn get_with_trailers_known_content_type() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async move {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream.recv_response().await.expect("recv response");
request_stream
.recv_data()
.await
.expect("recv data")
.expect("body");
let trailers = request_stream
.recv_trailers()
.await
.expect("recv trailers")
.expect("trailers none");
assert_eq!(trailers.get("trailer").unwrap(), &"value");
};
tokio::join!(req_fut, drive_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
request_stream
.send_data("wonderful hypertext".into())
.await
.expect("send_data");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
request_stream
.send_trailers(trailers)
.await
.expect("send_trailers");
request_stream.finish().await.expect("finish");
assert_matches!(
incoming_req.accept().await.err().unwrap(),
ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose{error_code: code, ..})
if code == Code::H3_NO_ERROR.value()
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn post() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async move {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream
.send_data("wonderful json".into())
.await
.expect("send_data");
request_stream.finish().await.expect("client finish");
request_stream.recv_response().await.expect("recv response");
};
tokio::join!(req_fut, drive_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
let request_body = request_stream
.recv_data()
.await
.expect("recv data")
.expect("server recv body");
assert_eq!(request_body.chunk(), b"wonderful json");
request_stream.finish().await.expect("client finish");
assert_matches!(
incoming_req.accept().await.err().unwrap(),
ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose{error_code: code, ..})
if code == Code::H3_NO_ERROR.value()
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_response_from_server() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async move {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream.finish().await.expect("client finish");
let response = request_stream.recv_response().await.unwrap();
assert_eq!(
response.status(),
StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
);
};
tokio::join!(req_fut, drive_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::builder()
.max_field_section_size(12)
.build(conn)
.await
.unwrap();
let resolver = incoming_req.accept().await.unwrap().unwrap();
let err_kind = resolver
.resolve_request()
.await
.err()
.expect("should return an error");
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 42,
max_size: 12
}
);
assert_matches!(
incoming_req.accept().await.err().unwrap(),
ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose{error_code: code, ..})
if code == Code::H3_NO_ERROR.value()
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_response_from_server_trailers() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream
.send_data("wonderful json".into())
.await
.expect("send_data");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "A".repeat(200).parse().unwrap());
request_stream
.send_trailers(trailers)
.await
.expect("send trailers");
request_stream.finish().await.expect("client finish");
let _ = request_stream.recv_response().await;
};
tokio::select! {biased; _ = req_fut => (), _ = drive_fut => () }
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::builder()
.max_field_section_size(207)
.build(conn)
.await
.unwrap();
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
let _ = request_stream
.recv_data()
.await
.expect("recv data")
.expect("body");
let err_kind = request_stream.recv_trailers().await.unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 239,
max_size: 207,
..
}
);
let _ = incoming_req.accept().await;
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_client_error() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async {
assert_matches!(
future::poll_fn(|cx| driver.poll_close(cx)).await,
ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose{
error_code: code,
..
}) if code == Code::H3_NO_ERROR.value()
);
};
let req_fut = async {
let mut settings = Settings::default();
settings.max_field_section_size = 12;
client.set_settings(settings);
let req = Request::get("http://localhost/salut").body(()).unwrap();
let err_kind = client.send_request(req).await.map(|_| ()).unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 179,
max_size: 12,
..
}
);
};
tokio::join! {req_fut, drive_fut }
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::builder()
.max_field_section_size(12)
.build(conn)
.await
.unwrap();
let incoming = incoming_req.accept().await.unwrap().unwrap();
assert_matches!(
incoming
.resolve_request()
.await
.err()
.expect("should return an error"),
StreamError::StreamError {
code: Code::H3_REQUEST_INCOMPLETE,
reason: _
}
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_client_error_trailer() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await).await.expect("client init");
let drive_fut = async {
let err = future::poll_fn(|cx| driver.poll_close(cx)).await;
match err {
ConnectionError::Timeout => (),
_ => panic!("unexpected error: {:?}", err),
}
};
let req_fut = async {
let mut settings = Settings::default();
settings.max_field_section_size = 200;
client.set_settings(settings);
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream
.send_data("wonderful json".into())
.await
.expect("send_data");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "A".repeat(200).parse().unwrap());
let err_kind = request_stream.send_trailers(trailers).await.unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 239,
max_size: 200,
..
}
);
request_stream.finish().await.expect("client finish");
};
tokio::join! {req_fut,drive_fut};
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::builder()
.max_field_section_size(207)
.build(conn)
.await
.unwrap();
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
let _ = request_stream
.recv_data()
.await
.expect("recv data")
.expect("body");
let _ = incoming_req.accept().await;
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_discard_from_client() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::builder()
.max_field_section_size(12)
.send_settings(false)
.build::<_, _, Bytes>(pair.client().await)
.await
.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream.finish().await.expect("client finish");
let err_kind = request_stream.recv_response().await.unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 42,
max_size: 12,
..
}
);
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream.finish().await.expect("client finish");
let _ = request_stream.recv_response().await.unwrap_err();
};
tokio::select! {biased; _ = req_fut => (), _ = drive_fut => () }
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
let mut err = None;
for _ in 0..100 {
if let Err(e) = request_stream.send_data("some data".into()).await {
err = Some(e);
break;
}
tokio::time::sleep(Duration::from_millis(2)).await;
}
assert_matches!(
err.as_ref().unwrap(),
StreamError::RemoteTerminate {
code: Code::H3_REQUEST_CANCELLED,
..
}
);
let _ = incoming_req.accept().await;
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_discard_from_client_trailers() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::builder()
.max_field_section_size(200)
.send_settings(false)
.build::<_, _, Bytes>(pair.client().await)
.await
.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
request_stream.recv_response().await.expect("recv response");
request_stream.recv_data().await.expect("recv data");
let err_kind = request_stream.recv_trailers().await.unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 539,
max_size: 200,
..
}
);
request_stream.finish().await.expect("client finish");
};
tokio::select! {biased; _ = req_fut => (), _ = drive_fut => () }
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
request_stream
.send_data("wonderful hypertext".into())
.await
.expect("send_data");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".repeat(100).parse().unwrap());
request_stream
.send_trailers(trailers)
.await
.expect("send_trailers");
request_stream.finish().await.expect("finish");
let _ = incoming_req.accept().await;
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_server_error() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await) .await
.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async {
let req = Request::get("http://localhost/salut").body(()).unwrap();
let _ = client
.send_request(req)
.await
.unwrap()
.recv_response()
.await;
};
tokio::select! { _ = req_fut => (), _ = drive_fut => () }
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let mut settings = Settings::default();
settings.max_field_section_size = 12;
incoming_req.set_settings(settings);
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
let err_kind = request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.map(|_| ())
.unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 42,
max_size: 12,
..
}
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn header_too_big_server_error_trailers() {
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let (mut driver, mut client) = client::new(pair.client().await) .await
.expect("client init");
let drive_fut = async { future::poll_fn(|cx| driver.poll_close(cx)).await };
let req_fut = async {
let req = Request::get("http://localhost/salut").body(()).unwrap();
let _ = client
.send_request(req)
.await
.unwrap()
.recv_response()
.await;
};
tokio::select! { _ = req_fut => (), _ = drive_fut => () }
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let mut settings = Settings::default();
settings.max_field_section_size = 42;
incoming_req.set_settings(settings);
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.unwrap();
request_stream
.send_data("wonderful hypertext".into())
.await
.expect("send_data");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".repeat(100).parse().unwrap());
let err_kind = request_stream.send_trailers(trailers).await.unwrap_err();
assert_matches!(
err_kind,
StreamError::HeaderTooBig {
actual_size: 539,
max_size: 42,
..
}
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn get_timeout_client_recv_response() {
init_tracing();
let mut pair = Pair::default();
pair.with_timeout(Duration::from_millis(100));
let mut server = pair.server();
let client_fut = async {
let (mut conn, mut client) = client::new(pair.client().await).await.expect("client init");
let request_fut = async {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
let response = request_stream.recv_response().await;
assert_matches!(
response.unwrap_err(),
StreamError::ConnectionError(ConnectionError::Timeout)
);
};
let drive_fut = async move {
let result = future::poll_fn(|cx| conn.poll_close(cx)).await;
assert_matches!(result, ConnectionError::Timeout);
};
tokio::join!(drive_fut, request_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let _req = incoming_req.accept().await.expect("accept").unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn get_timeout_client_recv_data() {
init_tracing();
let mut pair = Pair::default();
pair.with_timeout(Duration::from_millis(200));
let mut server = pair.server();
let client_fut = async {
let (mut conn, mut client) = client::new(pair.client().await).await.expect("client init");
let request_fut = async {
let mut request_stream = client
.send_request(Request::get("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
let _ = request_stream.recv_response().await.unwrap();
let data = request_stream.recv_data().await;
assert_matches!(
data.map(|_| ()).unwrap_err(),
StreamError::ConnectionError(ConnectionError::Timeout)
);
};
let drive_fut = async move {
let result = future::poll_fn(|cx| conn.poll_close(cx)).await;
assert_matches!(result, ConnectionError::Timeout);
};
tokio::join!(drive_fut, request_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_request, mut request_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
request_stream
.send_response(
Response::builder()
.status(200)
.body(())
.expect("build response"),
)
.await
.expect("send_response");
tokio::time::sleep(Duration::from_millis(500)).await;
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn get_timeout_server_accept() {
init_tracing();
let mut pair = Pair::default();
pair.with_timeout(Duration::from_millis(200));
let mut server = pair.server();
let client_fut = async {
let (mut conn, _client) = client::new(pair.client().await).await.expect("client init");
let request_fut = async {
tokio::time::sleep(Duration::from_millis(500)).await;
};
let drive_fut = async move {
let result = future::poll_fn(|cx| conn.poll_close(cx)).await;
assert_matches!(result, ConnectionError::Timeout);
};
tokio::join!(drive_fut, request_fut);
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
assert_matches!(
incoming_req.accept().await.map(|_| ()).unwrap_err(),
ConnectionError::Timeout
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn post_timeout_server_recv_data() {
init_tracing();
let mut pair = Pair::default();
pair.with_timeout(Duration::from_millis(100));
let mut server = pair.server();
let client_fut = async {
let (_conn, mut client) = client::new(pair.client().await).await.expect("client init");
let _request_stream = client
.send_request(Request::post("http://localhost/salut").body(()).unwrap())
.await
.expect("request");
tokio::time::sleep(Duration::from_millis(500)).await;
};
let server_fut = async {
let conn = server.next().await;
let mut incoming_req = server::Connection::new(conn).await.unwrap();
let (_, mut req_stream) = get_stream_blocking(&mut incoming_req)
.await
.expect("accept");
assert_matches!(
req_stream.recv_data().await.map(|_| ()).unwrap_err(),
StreamError::ConnectionError(ConnectionError::Timeout)
);
};
tokio::join!(server_fut, client_fut);
}
#[tokio::test]
async fn request_valid_one_header() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
})
.await;
}
#[tokio::test]
async fn request_valid_header_data() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
})
.await;
}
#[tokio::test]
async fn request_valid_header_data_trailer() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
})
.await;
}
#[tokio::test]
async fn request_valid_header_multiple_data_trailer() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
})
.await;
}
#[tokio::test]
async fn request_valid_header_trailer() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_before() {
request_sequence_ok(|mut buf| {
unknown_frame_encode(buf);
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_after_one_header() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
unknown_frame_encode(buf);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_interleaved_after_header() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
unknown_frame_encode(buf);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_interleaved_between_data() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
unknown_frame_encode(buf);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_interleaved_after_data() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
unknown_frame_encode(buf);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_interleaved_before_trailers() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
unknown_frame_encode(buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
})
.await;
}
#[tokio::test]
async fn request_valid_unknown_frame_after_trailers() {
request_sequence_ok(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
unknown_frame_encode(buf);
})
.await;
}
fn invalid_request_frames() -> Vec<Frame<Bytes>> {
vec![
Frame::CancelPush(PushId(0)),
Frame::Settings(frame::Settings::default()),
Frame::Goaway(VarInt(1)),
Frame::MaxPushId(PushId(1)),
]
}
#[tokio::test]
async fn request_invalid_frame_first() {
for frame in invalid_request_frames() {
request_sequence_unexpected(|mut buf| frame.encode(&mut buf)).await;
}
}
#[tokio::test]
async fn request_invalid_frame_after_header() {
for frame in invalid_request_frames() {
request_sequence_unexpected(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
frame.encode(&mut buf);
})
.await;
}
}
#[tokio::test]
async fn request_invalid_frame_after_data() {
for frame in invalid_request_frames() {
request_sequence_unexpected(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
frame.encode(&mut buf);
})
.await;
}
}
#[tokio::test]
async fn request_invalid_frame_after_trailers() {
for frame in invalid_request_frames() {
request_sequence_unexpected(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
frame.encode(&mut buf);
})
.await;
}
}
#[tokio::test]
async fn request_invalid_data_after_trailers() {
request_sequence_unexpected(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
})
.await;
}
#[tokio::test]
async fn request_invalid_data_first() {
request_sequence_unexpected(|mut buf| {
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
})
.await;
}
#[tokio::test]
async fn request_invalid_two_trailers() {
request_sequence_unexpected(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers.clone());
trailers_encode(buf, trailers);
})
.await;
}
#[tokio::test]
async fn request_invalid_trailing_byte() {
request_sequence_frame_error(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
Frame::Data(Bytes::from("fada")).encode_with_payload(&mut buf);
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
buf.put_u8(255);
})
.await;
}
#[tokio::test]
async fn request_invalid_data_frame_length_too_large() {
request_sequence_frame_error(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
FrameType::DATA.encode(&mut buf);
VarInt::from(5u32).encode(&mut buf);
buf.put_slice(b"fada");
let mut trailers = HeaderMap::new();
trailers.insert("trailer", "value".parse().unwrap());
trailers_encode(buf, trailers);
})
.await;
}
#[tokio::test]
async fn request_invalid_data_frame_length_too_short() {
request_sequence_frame_error(|mut buf| {
request_encode(
&mut buf,
Request::post("http://localhost/salut").body(()).unwrap(),
);
FrameType::DATA.encode(&mut buf);
VarInt::from(3u32).encode(&mut buf);
buf.put_slice(b"fada");
})
.await;
}
fn request_encode<B: BufMut>(buf: &mut B, req: http::Request<()>) {
let (parts, _) = req.into_parts();
let request::Parts {
method,
uri,
headers,
extensions,
..
} = parts;
let headers = Header::request(method, uri, headers, extensions).unwrap();
let mut block = BytesMut::new();
qpack::encode_stateless(&mut block, headers).unwrap();
Frame::headers(block).encode_with_payload(buf);
}
fn trailers_encode<B: BufMut>(buf: &mut B, fields: HeaderMap) {
let headers = Header::trailer(fields);
let mut block = BytesMut::new();
qpack::encode_stateless(&mut block, headers).unwrap();
Frame::headers(block).encode_with_payload(buf);
}
fn unknown_frame_encode<B: BufMut>(buf: &mut B) {
buf.put_slice(&[22, 4, 0, 255, 128, 0]);
}
async fn request_sequence_ok<F>(request: F)
where
F: Fn(&mut BytesMut),
{
request_sequence_check(request, None).await;
}
async fn request_sequence_unexpected<F>(request: F)
where
F: Fn(&mut BytesMut),
{
request_sequence_check(request, Some(Code::H3_FRAME_UNEXPECTED)).await;
}
async fn request_sequence_frame_error<F>(request: F)
where
F: Fn(&mut BytesMut),
{
request_sequence_check(request, Some(Code::H3_FRAME_ERROR)).await;
}
async fn request_sequence_check<F>(request: F, expected_error_code: Option<Code>)
where
F: Fn(&mut BytesMut),
{
init_tracing();
let mut pair = Pair::default();
let mut server = pair.server();
let client_fut = async {
let connection = pair.client_inner().await;
let (mut driver, send) = client::new(h3_quinn::Connection::new(connection.clone()))
.await
.unwrap();
let (mut req_send, mut req_recv) = connection.open_bi().await.unwrap();
let client = async move {
let mut buf = BytesMut::new();
request(&mut buf);
req_send.write_all(&buf[..]).await.unwrap();
req_send.finish().unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
loop {
match req_recv.read(&mut buf).await {
Ok(Some(i)) => {
black_box(i);
}
Ok(None) => break,
Err(err) => {
return Err(err);
}
}
}
drop(send);
Result::<(), quinn::ReadError>::Ok(())
};
let driver = async {
return Result::<(), ConnectionError>::Err(
future::poll_fn(|cx| driver.poll_close(cx)).await,
);
};
tokio::join!(client, driver)
};
let server_fut = async {
let conn = server.next().await;
let mut incoming = server::Connection::new(conn).await.unwrap();
let request_resolver = incoming
.accept()
.await
.unwrap()
.expect("request stream end unexpected");
let driver = async move {
match incoming.accept().await {
Ok(_) => (),
Err(err) => return Err(err.into()),
};
Result::<(), ConnectionError>::Ok(())
};
let stream = async {
let (_, mut stream) = request_resolver.resolve_request().await?;
while stream.recv_data().await?.is_some() {}
stream.recv_trailers().await?;
Result::<(), StreamError>::Ok(())
};
tokio::join!(driver, stream)
};
let (
(server_result_driver, server_result_stream),
(client_result_stream, client_result_driver),
) = tokio::join!(server_fut, client_fut);
if let Err(err) = client_result_stream {
assert_matches!(err, quinn::ReadError::ConnectionLost(quinn::ConnectionError::ApplicationClosed(code))
if code.error_code.into_inner() == expected_error_code.expect("If this is a error an error was expected").value());
}
if let Some(expected_error_code) = expected_error_code {
assert_matches!(
server_result_driver,
Err(ConnectionError::Local { error: LocalError::Application { code: err, .. } }) if err == expected_error_code
);
assert_matches!(
client_result_driver,
Err(ConnectionError::Remote(ConnectionErrorIncoming::ApplicationClose { error_code: err } )) if err == expected_error_code.value()
);
assert_matches!(
server_result_stream,
Err(StreamError::ConnectionError(ConnectionError::Local { error: LocalError::Application { code: err, .. } })) if err == expected_error_code
);
} else {
assert_matches!(
client_result_driver,
Err(ConnectionError::Local {
error: LocalError::Application {
code: Code::H3_NO_ERROR,
..
},
})
);
assert_matches!(
server_result_driver,
Err(ConnectionError::Remote(
ConnectionErrorIncoming::ApplicationClose {
error_code: err
}
)) if err == Code::H3_NO_ERROR.value()
);
assert_matches!(server_result_stream, Ok(()));
}
}