use std::collections::{HashMap, VecDeque};
use crate::error::{ErrorCode, H2Error};
use crate::flowcontrol::{self, FlowControl};
use crate::frame::{self, Frame};
use crate::hpack::{Decoder, Encoder, HeaderField};
use crate::settings::Settings;
use crate::stream::{H2Stream, StreamState};
const CLIENT_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
const WINDOW_UPDATE_THRESHOLD: i64 = 32768;
#[derive(Debug)]
pub enum H2Event {
Response {
stream_id: u32,
headers: Vec<HeaderField>,
end_stream: bool,
},
Data {
stream_id: u32,
data: Vec<u8>,
end_stream: bool,
},
Trailers {
stream_id: u32,
headers: Vec<HeaderField>,
},
StreamReset {
stream_id: u32,
error_code: ErrorCode,
},
GoAway {
last_stream_id: u32,
error_code: ErrorCode,
debug_data: Vec<u8>,
},
PingAcknowledged { opaque_data: [u8; 8] },
SettingsAcknowledged,
Error(H2Error),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ConnState {
WaitingPreface,
Ready,
Closing,
Closed,
}
pub struct H2Connection {
state: ConnState,
local_settings: Settings,
remote_settings: Settings,
streams: HashMap<u32, H2Stream>,
next_stream_id: u32,
conn_recv_window: FlowControl,
conn_send_window: FlowControl,
encoder: Encoder,
decoder: Decoder,
continuation_stream: Option<u32>,
recv_buf: Vec<u8>,
send_buf: Vec<u8>,
events: VecDeque<H2Event>,
initial_recv_window: i64,
}
impl H2Connection {
pub fn new(settings: Settings) -> Self {
let mut send_buf = Vec::new();
send_buf.extend_from_slice(CLIENT_PREFACE);
let settings_frame = Frame::Settings {
ack: false,
settings: settings.clone(),
};
settings_frame.encode(&mut send_buf);
let initial_recv = settings.initial_window_size as i64;
if initial_recv > flowcontrol::DEFAULT_WINDOW_SIZE {
let increment = (initial_recv - flowcontrol::DEFAULT_WINDOW_SIZE) as u32;
let wu = Frame::WindowUpdate {
stream_id: 0,
increment,
};
wu.encode(&mut send_buf);
}
Self {
state: ConnState::WaitingPreface,
local_settings: settings.clone(),
remote_settings: Settings::default(),
streams: HashMap::new(),
next_stream_id: 1,
conn_recv_window: FlowControl::new(initial_recv),
conn_send_window: FlowControl::default(),
encoder: Encoder::new(settings.header_table_size as usize),
decoder: Decoder::new(4096), continuation_stream: None,
recv_buf: Vec::new(),
send_buf,
events: VecDeque::new(),
initial_recv_window: initial_recv,
}
}
pub fn recv(&mut self, data: &[u8]) -> Result<(), H2Error> {
if matches!(self.state, ConnState::Closing | ConnState::Closed) {
return Ok(());
}
self.recv_buf.extend_from_slice(data);
self.process_recv_buf()
}
pub fn poll_event(&mut self) -> Option<H2Event> {
self.events.pop_front()
}
pub fn take_pending_send(&mut self) -> Vec<u8> {
std::mem::take(&mut self.send_buf)
}
pub fn has_pending_send(&self) -> bool {
!self.send_buf.is_empty()
}
pub fn send_request(
&mut self,
headers: &[HeaderField],
end_stream: bool,
) -> Result<u32, H2Error> {
if self.state == ConnState::Closed || self.state == ConnState::Closing {
return Err(H2Error::ConnectionError(ErrorCode::RefusedStream));
}
if let Some(max) = self.remote_settings.max_concurrent_streams {
let active = self
.streams
.values()
.filter(|s| !matches!(s.state, StreamState::Closed))
.count() as u32;
if active >= max {
return Err(H2Error::ConnectionError(ErrorCode::RefusedStream));
}
}
let stream_id = self.next_stream_id;
self.next_stream_id += 2;
let mut encoded = Vec::new();
self.encoder.encode(headers, &mut encoded);
let frame = Frame::Headers {
stream_id,
encoded,
end_stream,
end_headers: true,
priority: None,
};
frame.encode(&mut self.send_buf);
let initial_send = self.remote_settings.initial_window_size as i64;
let mut stream = H2Stream::new(self.initial_recv_window, initial_send);
if end_stream {
stream.state = StreamState::HalfClosedLocal;
}
self.streams.insert(stream_id, stream);
Ok(stream_id)
}
pub fn send_data(
&mut self,
stream_id: u32,
data: &[u8],
end_stream: bool,
) -> Result<(), H2Error> {
let stream = self
.streams
.get_mut(&stream_id)
.ok_or(H2Error::Internal("unknown stream".into()))?;
match stream.state {
StreamState::Open | StreamState::HalfClosedRemote => {}
_ => {
return Err(H2Error::StreamError(stream_id, ErrorCode::StreamClosed));
}
}
let len = data.len() as u32;
if len > 0 {
if stream.send_window.window() < i64::from(len) {
return Err(H2Error::FlowControlError);
}
if self.conn_send_window.window() < i64::from(len) {
return Err(H2Error::FlowControlError);
}
let _ = stream.send_window.consume(len);
let _ = self.conn_send_window.consume(len);
}
let max_frame = self.remote_settings.max_frame_size as usize;
let chunks: Vec<&[u8]> = if data.is_empty() {
vec![&[]]
} else {
data.chunks(max_frame).collect()
};
let last_idx = chunks.len() - 1;
for (i, chunk) in chunks.iter().enumerate() {
let is_last_chunk = i == last_idx;
let frame = Frame::Data {
stream_id,
payload: chunk.to_vec(),
end_stream: end_stream && is_last_chunk,
};
frame.encode(&mut self.send_buf);
}
if end_stream {
let stream = self.streams.get_mut(&stream_id).unwrap();
stream.state = match stream.state {
StreamState::HalfClosedRemote => StreamState::Closed,
_ => StreamState::HalfClosedLocal,
};
}
Ok(())
}
pub fn reset_stream(&mut self, stream_id: u32, error_code: ErrorCode) {
let frame = Frame::RstStream {
stream_id,
error_code,
};
frame.encode(&mut self.send_buf);
if let Some(stream) = self.streams.get_mut(&stream_id) {
stream.state = StreamState::Closed;
}
}
pub fn send_ping(&mut self) {
let frame = Frame::Ping {
ack: false,
opaque_data: [0; 8],
};
frame.encode(&mut self.send_buf);
}
pub fn send_goaway(&mut self, error_code: ErrorCode) {
let last_stream_id = 0; let frame = Frame::GoAway {
last_stream_id,
error_code,
debug_data: Vec::new(),
};
frame.encode(&mut self.send_buf);
self.state = ConnState::Closing;
}
pub fn is_closed(&self) -> bool {
self.state == ConnState::Closed
}
fn process_recv_buf(&mut self) -> Result<(), H2Error> {
loop {
let max_frame = self.local_settings.max_frame_size;
match frame::decode_frame(&self.recv_buf, max_frame) {
Ok(Some((frame, consumed))) => {
self.recv_buf.drain(..consumed);
self.handle_frame(frame)?;
}
Ok(None) => break,
Err(e) => {
self.recv_buf.clear();
self.state = ConnState::Closing;
self.events
.push_back(H2Event::Error(H2Error::ProtocolError(format!("{e}"))));
break;
}
}
}
Ok(())
}
fn handle_frame(&mut self, frame: Frame) -> Result<(), H2Error> {
if let Some(expected_sid) = self.continuation_stream {
match &frame {
Frame::Continuation { stream_id, .. } if *stream_id == expected_sid => {
}
_ => {
return Err(H2Error::ProtocolError("expected CONTINUATION frame".into()));
}
}
}
match frame {
Frame::Settings { ack, settings } => {
self.handle_settings(ack, settings)?;
}
Frame::Headers {
stream_id,
encoded,
end_stream,
end_headers,
..
} => {
self.handle_headers(stream_id, encoded, end_stream, end_headers)?;
}
Frame::Continuation {
stream_id,
encoded,
end_headers,
} => {
self.handle_continuation(stream_id, encoded, end_headers)?;
}
Frame::Data {
stream_id,
payload,
end_stream,
} => {
self.handle_data(stream_id, payload, end_stream)?;
}
Frame::RstStream {
stream_id,
error_code,
} => {
if let Some(stream) = self.streams.get_mut(&stream_id) {
stream.state = StreamState::Closed;
}
self.events.push_back(H2Event::StreamReset {
stream_id,
error_code,
});
}
Frame::Ping { ack, opaque_data } => {
if ack {
self.events
.push_back(H2Event::PingAcknowledged { opaque_data });
} else {
let pong = Frame::Ping {
ack: true,
opaque_data,
};
pong.encode(&mut self.send_buf);
}
}
Frame::GoAway {
last_stream_id,
error_code,
debug_data,
} => {
self.state = ConnState::Closing;
self.events.push_back(H2Event::GoAway {
last_stream_id,
error_code,
debug_data,
});
let to_reset: Vec<u32> = self
.streams
.keys()
.filter(|&&id| id > last_stream_id)
.copied()
.collect();
for id in to_reset {
self.streams.remove(&id);
self.events.push_back(H2Event::StreamReset {
stream_id: id,
error_code: ErrorCode::RefusedStream,
});
}
}
Frame::WindowUpdate {
stream_id,
increment,
} => {
self.handle_window_update(stream_id, increment)?;
}
Frame::Priority { .. } => {
}
Frame::PushPromise { .. } => {
if !self.local_settings.enable_push {
return Err(H2Error::ProtocolError(
"PUSH_PROMISE received but ENABLE_PUSH=0".into(),
));
}
}
Frame::Unknown { .. } => {
}
}
Ok(())
}
fn handle_settings(&mut self, ack: bool, settings: Settings) -> Result<(), H2Error> {
if ack {
self.events.push_back(H2Event::SettingsAcknowledged);
return Ok(());
}
let old_initial_window = self.remote_settings.initial_window_size as i64;
let new_initial_window = settings.initial_window_size as i64;
let delta = new_initial_window - old_initial_window;
self.remote_settings = settings;
self.encoder
.update_max_table_size(self.remote_settings.header_table_size as usize);
if delta != 0 {
for stream in self.streams.values_mut() {
if stream.state != StreamState::Closed {
stream.send_window.adjust(delta)?;
}
}
}
let ack_frame = Frame::Settings {
ack: true,
settings: Settings::default(),
};
ack_frame.encode(&mut self.send_buf);
if self.state == ConnState::WaitingPreface {
self.state = ConnState::Ready;
}
Ok(())
}
fn handle_headers(
&mut self,
stream_id: u32,
encoded: Vec<u8>,
end_stream: bool,
end_headers: bool,
) -> Result<(), H2Error> {
let stream = match self.streams.get_mut(&stream_id) {
Some(s) => s,
None => {
return Ok(());
}
};
if end_headers {
let mut full_block = std::mem::take(&mut stream.header_buf);
full_block.extend_from_slice(&encoded);
self.decode_and_emit_headers(stream_id, &full_block, end_stream)?;
} else {
stream.header_buf.extend_from_slice(&encoded);
stream.receiving_headers = true;
stream.headers_end_stream = end_stream;
self.continuation_stream = Some(stream_id);
}
Ok(())
}
fn handle_continuation(
&mut self,
stream_id: u32,
encoded: Vec<u8>,
end_headers: bool,
) -> Result<(), H2Error> {
let stream = match self.streams.get_mut(&stream_id) {
Some(s) => s,
None => return Ok(()),
};
stream.header_buf.extend_from_slice(&encoded);
if end_headers {
self.continuation_stream = None;
let full_block = {
let stream = self.streams.get_mut(&stream_id).unwrap();
stream.receiving_headers = false;
std::mem::take(&mut stream.header_buf)
};
let end_stream = self
.streams
.get(&stream_id)
.map(|s| s.headers_end_stream)
.unwrap_or(false);
self.decode_and_emit_headers(stream_id, &full_block, end_stream)?;
}
Ok(())
}
fn decode_and_emit_headers(
&mut self,
stream_id: u32,
encoded: &[u8],
end_stream: bool,
) -> Result<(), H2Error> {
let headers = self.decoder.decode(encoded)?;
let stream = match self.streams.get_mut(&stream_id) {
Some(s) => s,
None => return Ok(()),
};
let is_initial_response =
stream.state == StreamState::Open || stream.state == StreamState::HalfClosedLocal;
if end_stream {
stream.state = match stream.state {
StreamState::HalfClosedLocal => StreamState::Closed,
_ => StreamState::HalfClosedRemote,
};
}
let has_status = headers.iter().any(|h| h.name == b":status");
if has_status && is_initial_response {
self.events.push_back(H2Event::Response {
stream_id,
headers,
end_stream,
});
} else {
self.events
.push_back(H2Event::Trailers { stream_id, headers });
}
Ok(())
}
fn handle_data(
&mut self,
stream_id: u32,
payload: Vec<u8>,
end_stream: bool,
) -> Result<(), H2Error> {
let data_len = payload.len() as u32;
if data_len > 0 {
self.conn_recv_window.consume(data_len)?;
}
let stream = match self.streams.get_mut(&stream_id) {
Some(s) => s,
None => return Ok(()),
};
if data_len > 0 {
stream.recv_window.consume(data_len)?;
}
if end_stream {
stream.state = match stream.state {
StreamState::HalfClosedLocal => StreamState::Closed,
_ => StreamState::HalfClosedRemote,
};
}
self.events.push_back(H2Event::Data {
stream_id,
data: payload,
end_stream,
});
self.maybe_send_window_updates(stream_id, data_len);
Ok(())
}
fn handle_window_update(&mut self, stream_id: u32, increment: u32) -> Result<(), H2Error> {
if stream_id == 0 {
self.conn_send_window.increase(increment)?;
} else if let Some(stream) = self.streams.get_mut(&stream_id) {
stream.send_window.increase(increment)?;
}
Ok(())
}
fn maybe_send_window_updates(&mut self, stream_id: u32, data_len: u32) {
if data_len == 0 {
return;
}
if self.conn_recv_window.window() < WINDOW_UPDATE_THRESHOLD {
let increment = (self.initial_recv_window - self.conn_recv_window.window()) as u32;
if increment > 0 {
let frame = Frame::WindowUpdate {
stream_id: 0,
increment,
};
frame.encode(&mut self.send_buf);
let _ = self.conn_recv_window.increase(increment);
}
}
if let Some(stream) = self.streams.get_mut(&stream_id)
&& stream.state != StreamState::Closed
&& stream.state != StreamState::HalfClosedRemote
&& stream.recv_window.window() < WINDOW_UPDATE_THRESHOLD
{
let increment = (self.initial_recv_window - stream.recv_window.window()) as u32;
if increment > 0 {
let frame = Frame::WindowUpdate {
stream_id,
increment,
};
frame.encode(&mut self.send_buf);
let _ = stream.recv_window.increase(increment);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_settings_frame(settings: &Settings, ack: bool) -> Vec<u8> {
let frame = Frame::Settings {
ack,
settings: settings.clone(),
};
let mut buf = Vec::new();
frame.encode(&mut buf);
buf
}
#[test]
fn connection_preface_includes_magic_and_settings() {
let conn = H2Connection::new(Settings::client_default());
let send = conn.send_buf.clone();
assert!(send.starts_with(CLIENT_PREFACE));
let after_magic = &send[CLIENT_PREFACE.len()..];
let header = frame::decode_frame_header(after_magic).unwrap();
assert_eq!(header.frame_type, frame::FRAME_SETTINGS);
assert_eq!(header.flags, 0); assert_eq!(header.stream_id, 0);
}
#[test]
fn settings_exchange() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
assert_eq!(conn.state, ConnState::Ready);
let send = conn.take_pending_send();
let header = frame::decode_frame_header(&send).unwrap();
assert_eq!(header.frame_type, frame::FRAME_SETTINGS);
assert_eq!(header.flags, frame::FLAG_ACK);
}
#[test]
fn send_request_and_receive_response() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
let headers = vec![
HeaderField::new(b":method", b"GET"),
HeaderField::new(b":path", b"/"),
HeaderField::new(b":scheme", b"https"),
HeaderField::new(b":authority", b"example.com"),
];
let stream_id = conn.send_request(&headers, true).unwrap();
assert_eq!(stream_id, 1);
let send = conn.take_pending_send();
let header = frame::decode_frame_header(&send).unwrap();
assert_eq!(header.frame_type, frame::FRAME_HEADERS);
assert_eq!(header.stream_id, 1);
assert_ne!(header.flags & frame::FLAG_END_STREAM, 0);
assert_ne!(header.flags & frame::FLAG_END_HEADERS, 0);
let mut response_encoder = Encoder::new(4096);
let mut encoded_response = Vec::new();
response_encoder.encode(
&[HeaderField::new(b":status", b"200")],
&mut encoded_response,
);
let resp_frame = Frame::Headers {
stream_id: 1,
encoded: encoded_response,
end_stream: false,
end_headers: true,
priority: None,
};
let mut resp_buf = Vec::new();
resp_frame.encode(&mut resp_buf);
conn.recv(&resp_buf).unwrap();
let event = conn.poll_event().unwrap();
match event {
H2Event::Response {
stream_id: sid,
headers: h,
end_stream: es,
} => {
assert_eq!(sid, 1);
assert!(!es);
assert_eq!(h[0].name, b":status");
assert_eq!(h[0].value, b"200");
}
_ => panic!("expected Response event, got {event:?}"),
}
}
#[test]
fn ping_response() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
let ping = Frame::Ping {
ack: false,
opaque_data: [1, 2, 3, 4, 5, 6, 7, 8],
};
let mut ping_buf = Vec::new();
ping.encode(&mut ping_buf);
conn.recv(&ping_buf).unwrap();
let send = conn.take_pending_send();
let (decoded, _) = frame::decode_frame(&send, 16384).unwrap().unwrap();
match decoded {
Frame::Ping { ack, opaque_data } => {
assert!(ack);
assert_eq!(opaque_data, [1, 2, 3, 4, 5, 6, 7, 8]);
}
_ => panic!("expected Ping ACK"),
}
}
#[test]
fn ping_ack_event() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
while conn.poll_event().is_some() {}
conn.send_ping();
let _ = conn.take_pending_send();
let ping_ack = Frame::Ping {
ack: true,
opaque_data: [0; 8],
};
let mut buf = Vec::new();
ping_ack.encode(&mut buf);
conn.recv(&buf).unwrap();
match conn.poll_event() {
Some(H2Event::PingAcknowledged { opaque_data }) => {
assert_eq!(opaque_data, [0; 8]);
}
other => panic!("expected PingAcknowledged, got {other:?}"),
}
}
#[test]
fn goaway_handling() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
let goaway = Frame::GoAway {
last_stream_id: 0,
error_code: ErrorCode::NoError,
debug_data: Vec::new(),
};
let mut buf = Vec::new();
goaway.encode(&mut buf);
conn.recv(&buf).unwrap();
assert_eq!(conn.state, ConnState::Closing);
match conn.poll_event().unwrap() {
H2Event::GoAway { error_code, .. } => assert_eq!(error_code, ErrorCode::NoError),
e => panic!("expected GoAway, got {e:?}"),
}
}
#[test]
fn stream_ids_increment() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
let headers = vec![HeaderField::new(b":method", b"GET")];
let s1 = conn.send_request(&headers, true).unwrap();
let s2 = conn.send_request(&headers, true).unwrap();
let s3 = conn.send_request(&headers, true).unwrap();
assert_eq!(s1, 1);
assert_eq!(s2, 3);
assert_eq!(s3, 5);
}
#[test]
fn window_update_on_data() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
let headers = vec![HeaderField::new(b":method", b"GET")];
let stream_id = conn.send_request(&headers, true).unwrap();
let _ = conn.take_pending_send();
let mut enc = Encoder::new(4096);
let mut encoded = Vec::new();
enc.encode(&[HeaderField::new(b":status", b"200")], &mut encoded);
let resp = Frame::Headers {
stream_id,
encoded,
end_stream: false,
end_headers: true,
priority: None,
};
let mut buf = Vec::new();
resp.encode(&mut buf);
conn.recv(&buf).unwrap();
let _ = conn.poll_event(); let _ = conn.take_pending_send();
let mut data_buf = Vec::new();
for _ in 0..4 {
let chunk = vec![0u8; 10000];
let data_frame = Frame::Data {
stream_id,
payload: chunk,
end_stream: false,
};
data_frame.encode(&mut data_buf);
}
conn.recv(&data_buf).unwrap();
let send = conn.take_pending_send();
assert!(!send.is_empty(), "expected WINDOW_UPDATE frames");
}
#[test]
fn decode_error_emits_single_event_and_transitions_to_closing() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
while conn.poll_event().is_some() {}
let bad_settings = [
0x00, 0x00, 0x05, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04, ];
conn.recv(&bad_settings).unwrap();
let event = conn.poll_event();
assert!(
matches!(event, Some(H2Event::Error(_))),
"expected Error event, got {event:?}"
);
assert!(conn.poll_event().is_none(), "expected no more events");
assert_eq!(conn.state, ConnState::Closing);
conn.recv(&bad_settings).unwrap();
assert!(
conn.poll_event().is_none(),
"expected no events after Closing"
);
}
#[test]
fn goaway_resets_streams_above_last_stream_id() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
while conn.poll_event().is_some() {}
let headers = vec![HeaderField::new(b":method", b"GET")];
let s1 = conn.send_request(&headers, true).unwrap();
let s3 = conn.send_request(&headers, true).unwrap();
let s5 = conn.send_request(&headers, true).unwrap();
let _ = conn.take_pending_send();
assert_eq!(s1, 1);
assert_eq!(s3, 3);
assert_eq!(s5, 5);
let goaway = Frame::GoAway {
last_stream_id: 1,
error_code: ErrorCode::NoError,
debug_data: Vec::new(),
};
let mut buf = Vec::new();
goaway.encode(&mut buf);
conn.recv(&buf).unwrap();
let mut got_goaway = false;
let mut reset_ids = Vec::new();
while let Some(event) = conn.poll_event() {
match event {
H2Event::GoAway { .. } => got_goaway = true,
H2Event::StreamReset { stream_id, .. } => reset_ids.push(stream_id),
_ => {}
}
}
assert!(got_goaway, "expected GoAway event");
reset_ids.sort();
assert_eq!(
reset_ids,
vec![3, 5],
"expected streams 3 and 5 to be reset"
);
}
#[test]
fn send_data_splits_large_payload_into_multiple_frames() {
let mut conn = H2Connection::new(Settings::client_default());
let _ = conn.take_pending_send();
let server_settings = make_settings_frame(&Settings::default(), false);
conn.recv(&server_settings).unwrap();
let _ = conn.take_pending_send();
let headers = vec![
HeaderField::new(b":method", b"POST"),
HeaderField::new(b":path", b"/upload"),
HeaderField::new(b":scheme", b"https"),
HeaderField::new(b":authority", b"example.com"),
];
let stream_id = conn.send_request(&headers, false).unwrap();
let _ = conn.take_pending_send();
let large_data = vec![0xABu8; 40000];
conn.send_data(stream_id, &large_data, true).unwrap();
let send = conn.take_pending_send();
let mut offset = 0;
let mut frame_count = 0;
let mut total_payload = 0;
let mut last_end_stream = false;
while offset + 9 <= send.len() {
let header = frame::decode_frame_header(&send[offset..]).unwrap();
assert_eq!(header.frame_type, frame::FRAME_DATA);
assert_eq!(header.stream_id, stream_id);
assert!(
header.length <= 16384,
"frame payload {} exceeds max_frame_size 16384",
header.length
);
total_payload += header.length as usize;
last_end_stream = header.flags & frame::FLAG_END_STREAM != 0;
offset += 9 + header.length as usize;
frame_count += 1;
}
assert!(
frame_count >= 3,
"expected at least 3 frames for 40000 bytes, got {frame_count}"
);
assert_eq!(total_payload, 40000, "total payload mismatch");
assert!(
last_end_stream,
"END_STREAM should be set only on the last frame"
);
}
}