use super::fixture::*;
use crate::{
Headers, HttpConfig, KnownHeaderName, Method, Status,
h2::{H2ErrorCode, frame::Frame},
headers::hpack::PseudoHeaders,
};
use futures_lite::io::AsyncRead;
use std::{
pin::Pin,
task::{Context, Poll},
};
fn goaway_with(frames: &[Frame], code: H2ErrorCode) -> bool {
frames
.iter()
.any(|f| matches!(f, Frame::Goaway { error_code, .. } if *error_code == code))
}
fn rst_with(frames: &[Frame], stream_id: u32, code: H2ErrorCode) -> bool {
frames.iter().any(|f| {
matches!(f, Frame::RstStream { stream_id: s, error_code }
if *s == stream_id && *error_code == code)
})
}
fn post_with_content_length(content_length: u64) -> (PseudoHeaders<'static>, Headers) {
let pseudos = PseudoHeaders::default()
.with_method(Method::Post)
.with_path("/")
.with_scheme("http")
.with_authority("test");
let mut fields = Headers::new();
fields.insert(KnownHeaderName::ContentLength, content_length.to_string());
(pseudos, fields)
}
#[test]
fn continuation_flood_past_max_header_list_size_enhances_calm() {
let mut fx =
DriverFixture::new_server_with_config(HttpConfig::default().with_max_header_list_size(100));
fx.complete_handshake();
fx.peer_open_stream_no_end_headers(1, Method::Post, "/", false);
fx.peer_continuation(1, &[0u8; 200], false);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::EnhanceYourCalm),
"a header block exceeding MAX_HEADER_LIST_SIZE across CONTINUATION frames must \
GOAWAY(ENHANCE_YOUR_CALM); got {frames:?}",
);
}
#[test]
fn request_header_block_split_across_continuation_yields_conn() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream_split(1, Method::Get, "/split", true, 3);
match fx.tick() {
Poll::Ready(Some(Ok(conn))) => {
assert_eq!(conn.method(), Method::Get);
assert_eq!(conn.path(), "/split");
}
other => panic!(
"a header block reassembled from HEADERS + CONTINUATION should yield the request \
Conn; got {other:?}",
),
}
}
#[test]
fn continuation_without_open_header_block_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_continuation(1, &[0u8; 4], true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::ProtocolError),
"a CONTINUATION with no in-progress header block must be a connection PROTOCOL_ERROR; got \
{frames:?}",
);
}
#[test]
fn continuation_on_mismatched_stream_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream_no_end_headers(1, Method::Post, "/", false);
fx.peer_continuation(3, &[0u8; 4], true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::ProtocolError),
"a CONTINUATION on a different stream than the in-progress block must be a connection \
PROTOCOL_ERROR; got {frames:?}",
);
}
#[test]
fn non_continuation_frame_during_header_block_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream_no_end_headers(1, Method::Post, "/", false);
fx.peer_data(1, b"x", false);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::ProtocolError),
"a non-CONTINUATION frame interleaving an in-progress header block must be a connection \
PROTOCOL_ERROR; got {frames:?}",
);
}
#[test]
fn exceeding_max_concurrent_streams_refuses_with_rst() {
let mut fx = DriverFixture::new_server_with_config(
HttpConfig::default().with_h2_max_concurrent_streams(1),
);
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
fx.peer_open_stream(3, Method::Get, "/", true);
let polled = fx.tick();
assert!(
!matches!(polled, Poll::Ready(Some(Ok(_)))),
"a stream past MAX_CONCURRENT_STREAMS must not yield a Conn; got {polled:?}",
);
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 3,
error_code: H2ErrorCode::RefusedStream,
}
)),
"the excess stream must be refused with RST_STREAM(REFUSED_STREAM); got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&3),
"the refused stream must not be registered",
);
}
#[test]
fn malformed_request_rejected_before_handler_with_rst() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let pseudos = PseudoHeaders::default()
.with_method(Method::Get)
.with_path("/")
.with_scheme("http")
.with_authority("test");
let mut fields = Headers::new();
fields.insert("connection", "close");
fx.peer_headers(1, pseudos, &fields, true);
let polled = fx.tick();
assert!(
!matches!(polled, Poll::Ready(Some(Ok(_)))),
"a malformed request must not yield a Conn to a handler; got {polled:?}",
);
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::ProtocolError,
}
)),
"a malformed request must be rejected with RST_STREAM(PROTOCOL_ERROR); got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"the rejected stream must not be registered",
);
}
#[test]
fn headers_on_half_closed_remote_stream_is_stream_closed() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
let mut trailers = Headers::new();
trailers.insert("grpc-status", "0");
fx.peer_trailers(1, &trailers);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::StreamClosed,
}
)),
"HEADERS on a half-closed-remote stream must earn RST_STREAM(STREAM_CLOSED); got \
{frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"the stream should be removed after the illegal HEADERS",
);
}
#[test]
fn even_peer_stream_id_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(2, Method::Get, "/", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::ProtocolError),
"an even peer stream id must be a connection PROTOCOL_ERROR; got {frames:?}",
);
}
#[test]
fn headers_on_reset_stream_is_stream_level_stream_closed() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
fx.peer_rst_stream(1, H2ErrorCode::Cancel);
let _ = fx.tick();
assert!(!fx.connection.streams_lock().contains_key(&1));
let _ = fx.next_outbound_frames();
fx.peer_open_stream(1, Method::Post, "/", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::StreamClosed,
}
)),
"HEADERS on a reset stream must earn a stream-level RST_STREAM(STREAM_CLOSED); got \
{frames:?}",
);
assert!(
!frames.iter().any(|f| matches!(f, Frame::Goaway { .. })),
"a stream we reset must not escalate to a connection error; got {frames:?}",
);
}
#[test]
fn headers_on_cleanly_closed_stream_goaways_stream_closed() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_send(1, pseudos, Headers::new(), None);
let _ = fx.tick();
assert!(!fx.connection.streams_lock().contains_key(&1));
let _ = fx.next_outbound_frames();
fx.peer_open_stream(1, Method::Get, "/", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::StreamClosed),
"HEADERS on a cleanly-closed stream must be a connection STREAM_CLOSED; got {frames:?}",
);
}
#[test]
fn headers_on_never_opened_lower_id_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(3, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 3, got {other:?}"),
};
let _ = fx.next_outbound_frames();
fx.peer_open_stream(1, Method::Get, "/", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
goaway_with(&frames, H2ErrorCode::ProtocolError),
"HEADERS on a never-opened lower id must be a connection PROTOCOL_ERROR; got {frames:?}",
);
}
#[test]
fn data_exceeding_content_length_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let (pseudos, fields) = post_with_content_length(1);
fx.peer_headers(1, pseudos, &fields, false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
fx.peer_data(1, b"test", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
rst_with(&frames, 1, H2ErrorCode::ProtocolError),
"DATA longer than content-length must earn RST_STREAM(PROTOCOL_ERROR); got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"the stream should be removed after the content-length violation",
);
}
#[test]
fn data_short_of_content_length_at_end_stream_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let (pseudos, fields) = post_with_content_length(10);
fx.peer_headers(1, pseudos, &fields, false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
fx.peer_data(1, b"test", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
rst_with(&frames, 1, H2ErrorCode::ProtocolError),
"a body short of content-length must earn RST_STREAM(PROTOCOL_ERROR) at END_STREAM; got \
{frames:?}",
);
assert!(!fx.connection.streams_lock().contains_key(&1));
}
#[test]
fn multiple_data_frames_summing_past_content_length_is_protocol_error() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let (pseudos, fields) = post_with_content_length(5);
fx.peer_headers(1, pseudos, &fields, false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
fx.peer_data(1, b"test", false);
let _ = fx.tick();
fx.peer_data(1, b"test", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
rst_with(&frames, 1, H2ErrorCode::ProtocolError),
"DATA frames summing past content-length must earn RST_STREAM(PROTOCOL_ERROR); got \
{frames:?}",
);
assert!(!fx.connection.streams_lock().contains_key(&1));
}
#[test]
fn data_matching_content_length_is_accepted() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let (pseudos, fields) = post_with_content_length(4);
fx.peer_headers(1, pseudos, &fields, false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let _ = fx.next_outbound_frames();
fx.peer_data(1, b"test", true);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
!rst_with(&frames, 1, H2ErrorCode::ProtocolError),
"a body matching content-length must not be reset; got {frames:?}",
);
assert!(
fx.connection.streams_lock().contains_key(&1),
"a well-formed request stream should remain open for its response",
);
}
#[test]
fn request_content_length_defers_eof_until_end_stream_so_trailers_survive() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
let (pseudos, fields) = post_with_content_length(0);
fx.peer_headers(1, pseudos, &fields, false);
let mut conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn for stream 1, got {other:?}"),
};
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut buf = [0u8; 16];
{
let mut body = conn.request_body();
assert!(
Pin::new(&mut body)
.poll_read(&mut cx, &mut buf)
.is_pending(),
"request body must not declare EOF on content-length alone before END_STREAM (would \
lose trailers)",
);
}
let mut trailers = Headers::new();
trailers.insert("x-trailer", "present");
fx.peer_trailers(1, &trailers);
let _ = fx.tick();
{
let mut body = conn.request_body();
assert!(
matches!(
Pin::new(&mut body).poll_read(&mut cx, &mut buf),
Poll::Ready(Ok(0))
),
"request body should reach clean EOF once END_STREAM has arrived",
);
}
assert_eq!(
conn.request_trailers
.as_ref()
.and_then(|t| t.get_str("x-trailer")),
Some("present"),
"request trailers delivered with END_STREAM must be surfaced, not dropped",
);
}