use super::fixture::*;
use crate::{
Body, Headers, Method, Status,
h2::{
H2ErrorCode,
acceptor::types::{CloseOutcome, DriverState},
frame::Frame,
settings::H2Settings,
},
headers::hpack::PseudoHeaders,
};
use std::task::Poll;
fn count_rst(frames: &[Frame], stream_id: u32) -> usize {
frames
.iter()
.filter(|f| matches!(f, Frame::RstStream { stream_id: id, .. } if *id == stream_id))
.count()
}
#[test]
fn closing_to_drained_waits_for_in_flight_stream() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"in-flight stream's open recv side should hold the driver in Closing",
);
fx.peer_data(1, &[], true);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"recv-closed alone must not drain: the handler still owes a response (send half open)",
);
drop(conn);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Drained,
"with the in-flight stream fully closed (recv + send), Closing should advance to Drained",
);
}
#[test]
fn submit_after_connection_death_resolves_instead_of_hanging() {
use std::{
future::Future,
task::{Context, Poll as TaskPoll},
};
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.peer.close();
match fx.tick() {
Poll::Ready(Some(Err(_))) => {}
other => panic!("expected the driver to finish with an i/o error, got {other:?}"),
}
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"ok" as &[u8])),
);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut submit = Box::pin(submit);
assert!(
matches!(submit.as_mut().poll(&mut cx), TaskPoll::Ready(Err(_))),
"SubmitSend on a connection whose driver has exited must resolve with an error, not park \
forever — a parked handler holds its swansong guard and hangs graceful shutdown",
);
}
#[test]
fn connection_death_wakes_parked_submit() {
use std::{
future::Future,
task::{Context, Poll as TaskPoll},
};
let mut fx = DriverFixture::new_server();
fx.complete_handshake_with_peer_settings(H2Settings::default().with_initial_window_size(0));
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"stalled-body" as &[u8])),
);
let _ = fx.tick();
let (counting, waker) = counting_waker();
let mut cx = Context::from_waker(&waker);
let mut submit = Box::pin(submit);
assert!(
matches!(submit.as_mut().poll(&mut cx), TaskPoll::Pending),
"window-stalled submit should be Pending before the connection dies",
);
assert_eq!(counting.count(), 0, "no wake yet");
fx.peer.close();
match fx.tick() {
Poll::Ready(Some(Err(_))) => {}
other => panic!("expected the driver to finish with an i/o error, got {other:?}"),
}
assert!(
counting.count() >= 1,
"connection death must wake a parked SubmitSend's completion waker",
);
assert!(
matches!(submit.as_mut().poll(&mut cx), TaskPoll::Ready(Err(_))),
"after the connection dies the parked submit must resolve with an error",
);
}
#[test]
fn stream_reset_wakes_handler_parked_reading_body() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let state = fx
.connection
.streams_lock()
.get(&1)
.cloned()
.expect("stream 1 registered");
let (counting, waker) = counting_waker();
state.recv.waker.register(&waker);
fx.peer_rst_stream(1, H2ErrorCode::Cancel);
let _ = fx.tick();
assert!(
state.lifecycle_lock().recv_closed(),
"a reset stream must report recv-closed so the body read sees EOF",
);
assert!(
counting.count() >= 1,
"tearing the stream down must wake a handler parked reading the request body — otherwise \
it never observes EOF and leaks its swansong guard",
);
}
#[test]
fn flow_control_rst_leaves_stream_terminal() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let state = fx
.connection
.streams_lock()
.get(&1)
.cloned()
.expect("stream 1 registered");
fx.peer_window_update(1, i32::MAX as u32);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert_eq!(
count_rst(&frames, 1),
1,
"a stream send-window overflow must emit a stream-level RST_STREAM; got {frames:?}",
);
assert!(
state.lifecycle_lock().recv_closed(),
"a flow-control RST must leave the stream recv-closed so a handler polling the body after \
the reset sees EOF instead of parking forever (the http2/6.9 guard leak)",
);
}
#[test]
fn send_pump_emits_response_in_closing() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let body = Body::new_static(b"hi" as &[u8]);
let _submit = fx
.connection
.submit_send(1, pseudos, Headers::new(), Some(body));
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
let response_headers = frames
.iter()
.filter(|f| matches!(f, Frame::Headers { stream_id: 1, .. }))
.count();
let data_frames = frames
.iter()
.filter(|f| matches!(f, Frame::Data { stream_id: 1, .. }))
.count();
assert!(
response_headers >= 1,
"send pump should emit response HEADERS for stream 1 while Closing; got {frames:?}",
);
assert!(
data_frames >= 1,
"send pump should emit DATA for stream 1 while Closing; got {frames:?}",
);
let end_stream_data = frames.iter().any(|f| {
matches!(
f,
Frame::Data {
stream_id: 1,
end_stream: true,
..
}
)
});
assert!(
end_stream_data,
"send pump should terminate stream 1 with END_STREAM; got {frames:?}",
);
}
#[test]
fn recv_pump_decodes_trailing_headers_in_closing() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let state = fx
.connection
.streams_lock()
.get(&1)
.cloned()
.expect("stream 1 registered");
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(fx.driver.state, DriverState::Closing);
let mut trailers_in = Headers::new();
trailers_in.insert("grpc-status", "0");
trailers_in.insert("grpc-message", "ok");
fx.peer_trailers(1, &trailers_in);
let _ = fx.tick();
let stashed = state
.recv
.trailers
.lock()
.expect("recv.trailers mutex poisoned")
.clone()
.expect("driver should have stashed trailers from the post-GOAWAY frame");
assert_eq!(stashed.get_str("grpc-status"), Some("0"));
assert_eq!(stashed.get_str("grpc-message"), Some("ok"));
}
#[test]
fn closing_discards_new_stream_headers() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let stream_one = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(fx.driver.state, DriverState::Closing);
let _ = fx.next_outbound_bytes();
fx.peer_open_stream(3, Method::Get, "/late", true);
let polled = fx.tick();
assert!(
!matches!(polled, Poll::Ready(Some(Ok(_)))),
"post-GOAWAY HEADERS opening a new stream must not yield a Conn; got {polled:?}",
);
drop(stream_one);
}
#[test]
fn begin_close_is_idempotent() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
assert_eq!(fx.driver.state, DriverState::Running);
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(fx.driver.state, DriverState::Drained);
let first_round = fx.next_outbound_frames();
assert_eq!(
count_goaways(&first_round),
1,
"graceful begin_close should emit exactly one GOAWAY; got {first_round:?}",
);
let first_goaway_code = first_round.iter().find_map(|f| match f {
Frame::Goaway { error_code, .. } => Some(*error_code),
_ => None,
});
assert_eq!(
first_goaway_code,
Some(H2ErrorCode::NoError),
"graceful close should queue NoError, got {first_goaway_code:?}",
);
fx.driver
.begin_close(CloseOutcome::Protocol(H2ErrorCode::InternalError));
let _ = fx.tick();
let second_round = fx.next_outbound_frames();
assert_eq!(
count_goaways(&second_round),
0,
"second begin_close after Closing/Drained must not re-queue GOAWAY; got {second_round:?}",
);
}
#[test]
fn peer_rst_clears_closing_drain_gate() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"in-flight stream's open recv side should hold the driver in Closing",
);
fx.peer_rst_stream(1, H2ErrorCode::Cancel);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Drained,
"peer RST removing the last in-flight stream should let Closing advance to Drained",
);
}
#[test]
fn handler_drop_during_closing_resets_and_drains_without_peer_frame() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Post, "/", false);
let conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"in-flight stream's open recv side should hold the driver in Closing",
);
let _ = fx.next_outbound_bytes();
drop(conn);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert_eq!(
count_rst(&frames, 1),
1,
"a handler-dropped stream must emit RST_STREAM so the peer learns; got {frames:?}",
);
assert_eq!(
fx.driver.state,
DriverState::Drained,
"a locally-abandoned stream must clear the drain gate without an inbound peer frame",
);
}
#[test]
fn window_stalled_send_holds_closing_until_inbound_frame() {
let mut fx = DriverFixture::new_server();
fx.complete_handshake_with_peer_settings(H2Settings::default().with_initial_window_size(0));
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"hello" as &[u8])),
);
let _ = fx.tick();
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"a window-stalled in-flight send should hold the driver in Closing",
);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"with no inbound frame, the window-stalled send keeps the gate closed",
);
fx.peer_window_update(1, 100);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Drained,
"an inbound WINDOW_UPDATE should unblock the send and let Closing advance to Drained",
);
}
#[test]
fn half_closed_remote_holds_closing_until_response_sent() {
use crate::headers::hpack::PseudoHeaders;
let mut fx = DriverFixture::new_server();
fx.complete_handshake();
fx.peer_open_stream(1, Method::Get, "/", true);
let _conn = match fx.tick() {
Poll::Ready(Some(Ok(conn))) => conn,
other => panic!("expected Conn yielded for stream 1, got {other:?}"),
};
fx.driver.begin_close(CloseOutcome::Graceful);
let _ = fx.tick();
assert_eq!(
fx.driver.state,
DriverState::Closing,
"a half-closed-remote stream with no response yet must hold the driver in Closing — \
draining here would orphan the response the handler is about to submit",
);
for _ in 0..3 {
let _ = fx.tick();
assert_eq!(fx.driver.state, DriverState::Closing);
}
let pseudos = PseudoHeaders::default().with_status(Status::Ok);
let _submit = fx.connection.submit_send(
1,
pseudos,
Headers::new(),
Some(Body::new_static(b"ok" as &[u8])),
);
let _ = fx.tick();
let frames = fx.next_outbound_frames();
assert!(
frames
.iter()
.any(|f| matches!(f, Frame::Headers { stream_id: 1, .. })),
"the response HEADERS must be framed while Closing, not dropped; got {frames:?}",
);
assert_eq!(
fx.driver.state,
DriverState::Drained,
"once the response is sent (send half closed), Closing should advance to Drained",
);
}