use super::fixture::*;
use crate::{
Body, Headers, HttpConfig, Method, Status,
h2::{H2ErrorCode, frame::Frame, settings::H2Settings},
};
use std::task::Poll;
fn data_bytes(frames: &[Frame], stream_id: u32) -> u32 {
frames
.iter()
.filter_map(|f| match f {
Frame::Data {
stream_id: id,
data_length,
..
} if *id == stream_id => Some(*data_length),
_ => None,
})
.sum()
}
#[test]
fn send_window_exhaustion_parks_then_resumes_on_window_update() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake_with_peer_settings(H2Settings::default().with_initial_window_size(5));
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"hello world!" as &[u8])),
);
let _ = fx.tick();
let first = fx.next_outbound_frames();
assert_eq!(
data_bytes(&first, 1),
5,
"send pump should frame exactly the 5-byte window's worth of DATA, then park; got \
{first:?}",
);
assert!(
!first.iter().any(|f| matches!(
f,
Frame::Data {
stream_id: 1,
end_stream: true,
..
}
)),
"no END_STREAM while the window is exhausted mid-body; got {first:?}",
);
assert!(
fx.connection.streams_lock().contains_key(&1),
"stream must stay live while parked on a zero send window",
);
fx.peer_window_update(1, 20);
let _ = fx.tick();
let after = fx.next_outbound_frames();
assert_eq!(
data_bytes(&after, 1),
7,
"after WINDOW_UPDATE the pump should frame the remaining 7 body bytes; got {after:?}",
);
assert!(
after.iter().any(|f| matches!(
f,
Frame::Data {
stream_id: 1,
end_stream: true,
..
}
)),
"resumed send should terminate the stream with END_STREAM; got {after:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"with the body fully sent and recv already closed, the server should remove the stream",
);
}
#[test]
fn peer_window_update_overflow_resets_stream() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.peer_window_update(1, 0x7FFF_FFFF);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::RstStream {
stream_id: 1,
error_code: H2ErrorCode::FlowControlError,
}
)),
"stream-window overflow must earn RST_STREAM(FLOW_CONTROL_ERROR); got {frames:?}",
);
assert!(
!fx.connection.streams_lock().contains_key(&1),
"the overflowed stream should be removed",
);
assert!(
!frames.iter().any(|f| matches!(f, Frame::Goaway { .. })),
"a stream-level flow-control error must not tear down the connection; got {frames:?}",
);
}
#[test]
fn peer_window_update_overflow_on_connection_goaways() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_window_update(0, 0x7FFF_FFFF);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::Goaway {
error_code: H2ErrorCode::FlowControlError,
..
}
)),
"connection-window overflow must GOAWAY with FLOW_CONTROL_ERROR; got {frames:?}",
);
}
#[test]
fn peer_data_past_stream_buffer_cap_is_connection_error() {
let mut fx = DriverFixture::new_server_with_config(
HttpConfig::default().with_h2_max_stream_recv_window_size(100),
);
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.peer_data(1, &[0u8; 150], false);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames.iter().any(|f| matches!(
f,
Frame::Goaway {
error_code: H2ErrorCode::FlowControlError,
..
}
)),
"DATA past the per-stream recv buffer cap must be a connection FLOW_CONTROL_ERROR; got \
{frames:?}",
);
}
#[test]
fn peer_data_past_connection_window_is_connection_error() {
let mut fx = DriverFixture::new_server_with_config(
HttpConfig::default().with_h2_initial_connection_window_size(65_535),
);
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let chunk = [0u8; 16_384];
for _ in 0..4 {
fx.peer_data(1, &chunk, false);
}
let mut frames = Vec::new();
for _ in 0..6 {
let _ = fx.tick();
frames.extend(fx.next_outbound_frames());
if frames.iter().any(|f| matches!(f, Frame::Goaway { .. })) {
break;
}
}
assert!(
frames.iter().any(|f| matches!(
f,
Frame::Goaway {
error_code: H2ErrorCode::FlowControlError,
..
}
)),
"DATA past the connection recv window must be a connection FLOW_CONTROL_ERROR; got \
{frames:?}",
);
}
#[test]
fn peer_window_update_on_closed_stream_is_ignored() {
use crate::{h2::acceptor::types::DriverState, headers::hpack::PseudoHeaders};
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_send(1, pseudos, Headers::new(), None);
let _ = fx.tick();
assert!(
!fx.connection.streams_lock().contains_key(&1),
"body-less response should close + remove the stream",
);
let _ = fx.next_outbound_frames();
fx.peer_window_update(1, 100);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
!frames.iter().any(|f| matches!(f, Frame::Goaway { .. })),
"WINDOW_UPDATE on a closed stream must be ignored, not error the connection; got \
{frames:?}",
);
assert_eq!(
fx.driver.state,
DriverState::Running,
"connection should still be running after a benign late WINDOW_UPDATE",
);
}