#![allow(clippy::vec_init_then_push)]
use rstest::rstest;
use super::*;
use crate::parse::parse_frames_stateful;
fn create_settings_frame() -> Vec<u8> {
vec![
0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, ]
}
fn create_headers_frame(stream_id: u32, header_block: &[u8]) -> Vec<u8> {
let mut frame = vec![
0x00,
0x00,
header_block.len() as u8, 0x01, 0x05, 0x00,
0x00,
0x00,
stream_id as u8, ];
frame.extend_from_slice(header_block);
frame
}
#[test]
fn test_cache_new() {
let cache: H2SessionCache<String> = H2SessionCache::new();
assert_eq!(cache.len(), 0);
assert!(cache.is_empty());
}
#[test]
fn test_cache_operations() {
let cache = H2SessionCache::new();
let key = "conn1".to_string();
assert!(!cache.contains(&key));
assert_eq!(cache.len(), 0);
let preface = frame::CONNECTION_PREFACE.to_vec();
let settings = create_settings_frame();
let mut buffer = preface;
buffer.extend_from_slice(&settings);
let result = cache.parse(key.clone(), &buffer);
assert!(matches!(result, Ok(ref m) if m.is_empty()));
assert!(cache.contains(&key));
assert_eq!(cache.len(), 1);
let removed = cache.remove(&key);
assert!(removed.is_some());
assert!(!cache.contains(&key));
assert_eq!(cache.len(), 0);
}
#[test]
fn test_hpack_persistence() {
let cache = H2SessionCache::new();
let key = "conn1".to_string();
let mut buffer1 = frame::CONNECTION_PREFACE.to_vec();
buffer1.extend_from_slice(&create_settings_frame());
let header_block1 = vec![0x82]; buffer1.extend_from_slice(&create_headers_frame(1, &header_block1));
let result1 = cache.parse(key.clone(), &buffer1);
assert!(result1.is_ok());
let mut buffer2 = Vec::new();
let header_block2 = vec![0x82]; buffer2.extend_from_slice(&create_headers_frame(3, &header_block2));
let result2 = cache.parse(key.clone(), &buffer2);
assert!(result2.is_ok());
}
#[test]
fn test_multi_stream_tracking() {
let cache = H2SessionCache::new();
let key = "conn1".to_string();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let header_block1 = vec![0x82]; buffer.extend_from_slice(&create_headers_frame(1, &header_block1));
let header_block2 = vec![0x82]; buffer.extend_from_slice(&create_headers_frame(3, &header_block2));
let result = cache.parse(key.clone(), &buffer);
assert!(result.is_ok());
let messages = result.unwrap();
assert!(!messages.is_empty()); }
#[test]
fn test_generic_key_string() {
let cache: H2SessionCache<String> = H2SessionCache::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let header_block = vec![0x82];
buffer.extend_from_slice(&create_headers_frame(1, &header_block));
let result = cache.parse("session_123".to_string(), &buffer);
assert!(result.is_ok());
assert!(cache.contains(&"session_123".to_string()));
}
#[test]
fn test_generic_key_tuple() {
let cache: H2SessionCache<(u32, u32)> = H2SessionCache::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let header_block = vec![0x82];
buffer.extend_from_slice(&create_headers_frame(1, &header_block));
let result = cache.parse((1234, 5678), &buffer);
assert!(result.is_ok());
assert!(cache.contains(&(1234, 5678)));
}
#[test]
fn test_parse_returns_hashmap_keyed_by_stream_id() {
let cache = H2SessionCache::new();
let key = "conn1".to_string();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let header_block1 = vec![0x82]; buffer.extend_from_slice(&create_headers_frame(1, &header_block1));
let header_block2 = vec![0x82]; buffer.extend_from_slice(&create_headers_frame(3, &header_block2));
let result = cache.parse(key.clone(), &buffer);
assert!(result.is_ok());
let messages = result.unwrap();
assert!(messages.contains_key(&StreamId(1)) || messages.contains_key(&StreamId(3)));
for (stream_id, msg) in &messages {
assert_eq!(*stream_id, msg.stream_id);
}
}
fn build_many_headers_hpack(count: usize) -> Vec<u8> {
let mut block = Vec::new();
for i in 0..count {
let name = format!("x-header-{i:04}");
let value = format!("value-{i:04}");
block.push(0x00);
block.push(name.len() as u8);
block.extend_from_slice(name.as_bytes());
block.push(value.len() as u8);
block.extend_from_slice(value.as_bytes());
}
block
}
fn build_test_headers_frame(stream_id: u32, hpack_block: &[u8]) -> Vec<u8> {
let mut frame = vec![
0x00, 0x00, 0x00, 0x01, 0x05, 0x00, 0x00, 0x00, 0x00, ];
let len = hpack_block.len();
frame[0] = (len >> 16) as u8;
frame[1] = (len >> 8) as u8;
frame[2] = len as u8;
frame[5] = (stream_id >> 24) as u8 & 0x7F;
frame[6] = (stream_id >> 16) as u8;
frame[7] = (stream_id >> 8) as u8;
frame[8] = stream_id as u8;
frame.extend_from_slice(hpack_block);
frame
}
#[test]
fn test_header_count_limit_exceeded() {
let mut state = H2ConnectionState::new();
let hpack = build_many_headers_hpack(200);
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(result.is_ok(), "Header limit violation should be non-fatal");
assert!(
state.try_pop().is_none(),
"Stream should not complete with too many headers"
);
}
#[test]
fn test_header_count_within_limit() {
let mut state = H2ConnectionState::new();
let mut hpack = Vec::new();
hpack.push(0x82); hpack.extend(build_many_headers_hpack(50));
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(result.is_ok());
let msg = state.try_pop();
assert!(
msg.is_some(),
"Stream should complete with headers within limit"
);
}
#[test]
fn test_header_list_size_limit_exceeded() {
let mut hpack = Vec::new();
for i in 0..20 {
let name = format!("x-big-{i:02}");
let value = "X".repeat(4096);
hpack.push(0x00);
hpack.push(name.len() as u8);
hpack.extend_from_slice(name.as_bytes());
let vlen = value.len();
if vlen < 127 {
hpack.push(vlen as u8);
} else {
hpack.push(0x7F);
let mut remaining = vlen - 127;
while remaining >= 128 {
hpack.push(0x80 | (remaining & 0x7F) as u8);
remaining >>= 7;
}
hpack.push(remaining as u8);
}
hpack.extend_from_slice(value.as_bytes());
}
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(
result.is_ok(),
"Header size limit violation should be non-fatal"
);
assert!(
state.try_pop().is_none(),
"Stream should not complete when header list too large"
);
}
#[test]
fn test_individual_header_value_size_limit() {
let mut hpack = Vec::new();
let name = "x-huge-value";
let value = "Y".repeat(9000); hpack.push(0x00);
hpack.push(name.len() as u8);
hpack.extend_from_slice(name.as_bytes());
let vlen = value.len();
hpack.push(0x7F);
let mut remaining = vlen - 127;
while remaining >= 128 {
hpack.push(0x80 | (remaining & 0x7F) as u8);
remaining >>= 7;
}
hpack.push(remaining as u8);
hpack.extend_from_slice(value.as_bytes());
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(
result.is_ok(),
"Header value size violation should be non-fatal"
);
assert!(
state.try_pop().is_none(),
"Stream should not complete with oversized header value"
);
}
#[test]
fn test_invalid_utf8_header_rejected() {
let mut hpack = Vec::new();
hpack.push(0x82); hpack.push(0x87); hpack.push(0x84); hpack.push(0x00);
hpack.push(0x05); hpack.extend_from_slice(b"x-bad");
hpack.push(0x03); hpack.extend_from_slice(&[0xFF, 0xFE, 0x41]);
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(
result.is_ok(),
"Invalid UTF-8 should be non-fatal via feed()"
);
assert!(
state.try_pop().is_none(),
"Stream with invalid UTF-8 header should not complete"
);
}
#[test]
fn test_invalid_utf8_returns_encoding_error_variant() {
let mut hpack = Vec::new();
hpack.push(0x82); hpack.push(0x87); hpack.push(0x84); hpack.push(0x00);
hpack.push(0x05); hpack.extend_from_slice(b"x-bad");
hpack.push(0x03); hpack.extend_from_slice(&[0xFF, 0xFE, 0x41]);
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2InvalidHeaderEncoding)),
"Should return Http2InvalidHeaderEncoding, got: {result:?}"
);
}
#[test]
fn test_valid_ascii_headers_accepted() {
let mut hpack = Vec::new();
hpack.push(0x82); hpack.push(0x87); hpack.push(0x84); hpack.push(0x00);
hpack.push(0x06); hpack.extend_from_slice(b"x-good");
hpack.push(0x05); hpack.extend_from_slice(b"hello");
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(result.is_ok());
let msg = state.try_pop();
assert!(
msg.is_some(),
"Stream with valid ASCII headers should complete"
);
}
#[test]
fn test_settings_header_table_size_applied() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
let settings_payload = [0x00u8, 0x01, 0x00, 0x00, 0x00, 0x00]; let mut settings_frame = vec![
0x00, 0x00, 0x06, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, ];
settings_frame.extend_from_slice(&settings_payload);
buffer.extend(settings_frame);
let mut hpack = Vec::new();
hpack.push(0x82); hpack.push(0x40); hpack.push(0x06); hpack.extend_from_slice(b"x-test");
hpack.push(0x05); hpack.extend_from_slice(b"value");
buffer.extend(build_test_headers_frame(1, &hpack));
let result = state.feed(&buffer, TimestampNs(1_000_000));
assert!(
result.is_ok(),
"Parsing should succeed with table_size=0: {result:?}"
);
}
#[test]
fn test_stream_timeout_eviction() {
let mut state = H2ConnectionState::with_limits(H2Limits {
stream_timeout_ns: 1_000_000_000, max_concurrent_streams: 100,
..H2Limits::default()
});
let hpack = vec![0x82]; let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let mut headers = vec![
0x00,
0x00,
hpack.len() as u8,
0x01, 0x04, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
buffer.extend(headers);
let _ = state.feed(&buffer, TimestampNs(1_000_000_000));
assert_eq!(state.active_streams.len(), 1, "Stream 1 should be active");
let _ = state.feed(&[], TimestampNs(3_000_000_000));
assert_eq!(
state.active_streams.len(),
0,
"Stream should be evicted after timeout"
);
}
#[test]
fn test_max_concurrent_streams_enforced() {
let mut state = H2ConnectionState::with_limits(H2Limits {
max_concurrent_streams: 2,
stream_timeout_ns: 60_000_000_000, ..H2Limits::default()
});
let hpack = vec![0x82];
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
for &stream_id in &[1u32, 3] {
let mut headers = vec![
0x00,
0x00,
hpack.len() as u8,
0x01,
0x04, (stream_id >> 24) as u8 & 0x7F,
(stream_id >> 16) as u8,
(stream_id >> 8) as u8,
stream_id as u8,
];
headers.extend(&hpack);
buffer.extend(headers);
}
let _ = state.feed(&buffer, TimestampNs(1_000_000));
assert_eq!(
state.active_streams.len(),
2,
"Should have 2 active streams"
);
let mut buffer2 = Vec::new();
let mut headers3 = vec![
0x00,
0x00,
hpack.len() as u8,
0x01,
0x04,
0x00,
0x00,
0x00,
0x05, ];
headers3.extend(&hpack);
buffer2.extend(headers3);
let result = state.feed(&buffer2, TimestampNs(2_000_000));
assert!(
result.is_ok(),
"Exceeding max concurrent streams should be non-fatal"
);
assert_eq!(
state.active_streams.len(),
2,
"Should still have only 2 streams"
);
}
#[test]
fn test_body_size_limit_drops_stream() {
let mut state = H2ConnectionState::with_limits(H2Limits {
max_body_size: 10, ..H2Limits::default()
});
let hpack = vec![0x82, 0x87, 0x84]; let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let hpack_len = hpack.len();
let mut headers_frame = vec![
(hpack_len >> 16) as u8,
(hpack_len >> 8) as u8,
hpack_len as u8,
0x01, 0x04, 0x00,
0x00,
0x00,
0x01, ];
headers_frame.extend(&hpack);
buffer.extend(headers_frame);
let body = vec![0x41u8; 20];
let body_len = body.len();
let mut data_frame = vec![
(body_len >> 16) as u8,
(body_len >> 8) as u8,
body_len as u8,
0x00, 0x01, 0x00,
0x00,
0x00,
0x01, ];
data_frame.extend(&body);
buffer.extend(data_frame);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(result.is_ok(), "Body size limit should be non-fatal");
assert!(
result.unwrap().is_empty(),
"Stream exceeding body size limit should be dropped"
);
assert_eq!(
state.active_streams.len(),
0,
"Stream should be removed after exceeding body limit"
);
}
#[test]
fn test_h2session_cache_concurrent_different_connections() {
use std::{sync::Arc, thread};
let cache = Arc::new(H2SessionCache::new());
let num_threads = 8;
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let cache = Arc::clone(&cache);
thread::spawn(move || {
let key = format!("conn_{i}");
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let stream_id = (i * 2 + 1) as u32;
let hpack = vec![0x82]; buffer.extend(create_headers_frame(stream_id, &hpack));
let result = cache.parse(key.clone(), &buffer);
assert!(result.is_ok(), "Thread {i} parse should succeed");
assert!(cache.contains(&key));
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
assert_eq!(cache.len(), num_threads);
}
#[test]
fn test_h2session_cache_concurrent_same_connection() {
use std::{sync::Arc, thread};
let cache = Arc::new(H2SessionCache::new());
let key = "shared_conn".to_string();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let _ = cache.parse(key.clone(), &buffer);
let num_threads = 4;
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let cache = Arc::clone(&cache);
let key = key.clone();
thread::spawn(move || {
let stream_id = (i * 2 + 1) as u32;
let hpack = vec![0x82]; let frame = create_headers_frame(stream_id, &hpack);
let result = cache.parse(key, &frame);
assert!(result.is_ok(), "Thread {i} same-conn parse should succeed");
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
assert!(cache.contains(&key));
}
#[rstest]
#[case::oversized_rejected(101, true)]
#[case::exact_limit_succeeds(100, false)]
fn test_buffer_growth_cap(#[case] chunk_size: usize, #[case] expect_error: bool) {
let mut state = H2ConnectionState::with_limits(H2Limits {
max_buffer_size: 100,
..H2Limits::default()
});
let chunk = vec![0x00u8; chunk_size];
let result = state.feed(&chunk, TimestampNs(1_000_000));
if expect_error {
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2BufferTooLarge)),
"Feed of {chunk_size} bytes should return Http2BufferTooLarge, got: {result:?}"
);
} else {
assert!(
result.is_ok(),
"Feed of {chunk_size} bytes should succeed, got: {result:?}"
);
}
}
#[test]
fn test_checked_add_max_24bit_length() {
let max_length: u32 = 0x00FF_FFFF; let frame = vec![
(max_length >> 16) as u8,
(max_length >> 8) as u8,
max_length as u8,
0x00, 0x00, 0x00,
0x00,
0x00,
0x01, ];
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend_from_slice(&frame);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2FrameSizeError)),
"Max 24-bit length frame should trigger FrameSizeError, got: {result:?}"
);
}
#[test]
fn test_max_frame_size_rejects_oversized_frame() {
let length: u32 = 16385; let mut frame_header = vec![
(length >> 16) as u8,
(length >> 8) as u8,
length as u8,
0x00, 0x01, 0x00,
0x00,
0x00,
0x01, ];
frame_header.extend(vec![0x41u8; length as usize]);
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let hpack = vec![0x82]; buffer.extend(build_test_headers_frame(1, &hpack));
buffer.extend(frame_header);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2FrameSizeError)),
"Frame exceeding default max_frame_size should be rejected, got: {result:?}"
);
}
#[test]
fn test_max_frame_size_respects_settings() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
let settings_payload = [0x00u8, 0x05, 0x00, 0x00, 0x80, 0x00]; let mut settings_frame = vec![
0x00, 0x00, 0x06, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, ];
settings_frame.extend_from_slice(&settings_payload);
buffer.extend(settings_frame);
let hpack = vec![0x82]; let hpack_len = hpack.len();
let mut headers = vec![
(hpack_len >> 16) as u8,
(hpack_len >> 8) as u8,
hpack_len as u8,
0x01, 0x04, 0x00,
0x00,
0x00,
0x01,
];
headers.extend(&hpack);
buffer.extend(headers);
let data_len: u32 = 20000;
let mut data_frame = vec![
(data_len >> 16) as u8,
(data_len >> 8) as u8,
data_len as u8,
0x00, 0x01, 0x00,
0x00,
0x00,
0x01,
];
data_frame.extend(vec![0x41u8; data_len as usize]);
buffer.extend(data_frame);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
result.is_ok(),
"20000-byte frame should succeed with max_frame_size=32768: {result:?}"
);
let messages = result.unwrap();
assert!(
messages.contains_key(&StreamId(1)),
"Stream 1 should complete"
);
assert_eq!(
messages[&StreamId(1)].body.len(),
20000,
"Body should be 20000 bytes"
);
}
#[test]
fn test_continuation_expected_but_got_data() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let hpack = vec![0x82]; let mut headers = vec![
0x00,
0x00,
hpack.len() as u8,
0x01, 0x00, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
buffer.extend(headers);
let data_frame = vec![
0x00, 0x00, 0x05, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x68, 0x65, 0x6c, 0x6c, 0x6f, ];
buffer.extend(data_frame);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2ContinuationExpected)),
"DATA after HEADERS without END_HEADERS should trigger ContinuationExpected, got: \
{result:?}"
);
}
#[test]
fn test_continuation_wrong_stream_rejected() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let hpack = vec![0x82]; let mut headers = vec![
0x00,
0x00,
hpack.len() as u8,
0x01, 0x00, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
buffer.extend(headers);
let cont_payload = vec![0x84]; let cont_frame = vec![
0x00,
0x00,
cont_payload.len() as u8,
0x09, 0x04, 0x00,
0x00,
0x00,
0x03, ];
buffer.extend(cont_frame);
buffer.extend(cont_payload);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2ContinuationExpected)),
"CONTINUATION on wrong stream should trigger ContinuationExpected, got: {result:?}"
);
}
#[test]
fn test_continuation_correct_ordering_succeeds() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let hpack_part1 = vec![0x82]; let mut headers = vec![
0x00,
0x00,
hpack_part1.len() as u8,
0x01, 0x01, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack_part1);
buffer.extend(headers);
let hpack_part2 = vec![0x84]; let mut cont = vec![
0x00,
0x00,
hpack_part2.len() as u8,
0x09, 0x04, 0x00,
0x00,
0x00,
0x01, ];
cont.extend(&hpack_part2);
buffer.extend(cont);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
result.is_ok(),
"Correct CONTINUATION ordering should succeed: {result:?}"
);
let messages = result.unwrap();
assert!(
messages.contains_key(&StreamId(1)),
"Stream 1 should complete with HEADERS + CONTINUATION"
);
}
#[test]
fn test_settings_payload_not_multiple_of_6() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
let settings_frame = vec![
0x00, 0x00, 0x07, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, ];
buffer.extend(settings_frame);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
matches!(result, Err(ref e) if matches!(e.kind, ParseErrorKind::Http2SettingsLengthError)),
"SETTINGS with 7-byte payload should trigger SettingsLengthError, got: {result:?}"
);
}
#[test]
fn test_settings_empty_payload_ack_succeeds() {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
let result = parse_frames_stateful(&buffer, &mut state);
assert!(
result.is_ok(),
"SETTINGS ACK (empty payload) should succeed: {result:?}"
);
}
#[test]
fn test_into_http_request() {
let msg = ParsedH2Message {
method: Some("GET".to_string()),
path: Some("/foo".to_string()),
authority: Some("example.com".to_string()),
scheme: Some("https".to_string()),
status: None,
headers: vec![("content-type".to_string(), "text/plain".to_string())],
stream_id: StreamId(1),
header_size: 100,
body: vec![1, 2, 3],
first_frame_timestamp_ns: TimestampNs(1000),
end_stream_timestamp_ns: TimestampNs(2000),
};
let req = msg
.into_http_request()
.expect("should produce an HttpRequest");
assert_eq!(req.method, http::Method::GET);
assert_eq!(req.uri, "/foo");
assert_eq!(req.body, vec![1, 2, 3]);
assert_eq!(req.timestamp_ns, TimestampNs(2000));
}
#[test]
fn test_into_http_response() {
let msg = ParsedH2Message {
method: None,
path: None,
authority: None,
scheme: None,
status: Some(200),
headers: vec![("content-type".to_string(), "application/json".to_string())],
stream_id: StreamId(1),
header_size: 50,
body: vec![4, 5, 6],
first_frame_timestamp_ns: TimestampNs(3000),
end_stream_timestamp_ns: TimestampNs(4000),
};
let resp = msg
.into_http_response()
.expect("should produce an HttpResponse");
assert_eq!(resp.status, http::StatusCode::OK);
assert_eq!(resp.body, vec![4, 5, 6]);
assert_eq!(resp.timestamp_ns, TimestampNs(3000));
}
#[test]
fn test_into_http_request_returns_none_for_response() {
let msg = ParsedH2Message {
method: None,
path: None,
authority: None,
scheme: None,
status: Some(200),
headers: vec![],
stream_id: StreamId(1),
header_size: 10,
body: vec![],
first_frame_timestamp_ns: TimestampNs(0),
end_stream_timestamp_ns: TimestampNs(0),
};
assert!(msg.into_http_request().is_none());
}
#[test]
fn test_per_key_mutex_preserves_hpack_state() {
use std::{
sync::{Arc, Barrier},
thread,
};
let cache = Arc::new(H2SessionCache::new());
let key = "shared".to_string();
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
let mut init_buffer = frame::CONNECTION_PREFACE.to_vec();
init_buffer.extend_from_slice(&create_settings_frame());
let _ = cache.parse(key.clone(), &init_buffer);
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let cache = Arc::clone(&cache);
let key = key.clone();
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait(); let stream_id = (i * 2 + 1) as u32;
let hpack = vec![0x82]; let frame = create_headers_frame(stream_id, &hpack);
let result = cache.parse(key, &frame);
assert!(result.is_ok(), "Thread {i} should succeed");
})
})
.collect();
for handle in handles {
handle.join().expect("Thread should not panic");
}
let hpack = vec![0x82];
let frame = create_headers_frame(99, &hpack);
let result = cache.parse(key.clone(), &frame);
assert!(
result.is_ok(),
"Post-concurrent parse should succeed with intact HPACK state"
);
}
#[test]
fn test_broken_continuation_does_not_poison_connection() {
let mut state = H2ConnectionState::new();
let mut init = frame::CONNECTION_PREFACE.to_vec();
init.extend_from_slice(&create_settings_frame());
let result = state.feed(&init, TimestampNs(1_000_000));
assert!(result.is_ok());
let hpack = vec![0x82]; let mut headers = vec![
0x00,
0x00,
hpack.len() as u8,
0x01, 0x00, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
let data_frame = vec![
0x00, 0x00, 0x05, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x68, 0x65, 0x6c, 0x6c, 0x6f, ];
let mut buf = Vec::new();
buf.extend(headers);
buf.extend(data_frame);
let result = state.feed(&buf, TimestampNs(2_000_000));
assert!(
result.is_ok(),
"Broken CONTINUATION should be non-fatal via feed()"
);
assert!(
state.expecting_continuation.is_none(),
"expecting_continuation should be cleared after violation"
);
assert!(
!state.active_streams.contains_key(&StreamId(1)),
"Incomplete stream 1 should be removed"
);
let hpack2 = vec![0x82]; let valid_headers = create_headers_frame(5, &hpack2);
let result = state.feed(&valid_headers, TimestampNs(3_000_000));
assert!(
result.is_ok(),
"Valid HEADERS after broken CONTINUATION should succeed: {result:?}"
);
let msg = state.try_pop();
assert!(
msg.is_some(),
"Stream 5 should complete — connection is not poisoned"
);
let (stream_id, _) = msg.unwrap();
assert_eq!(stream_id, StreamId(5));
}
#[rstest]
#[case::zero_ignored(0x00000000_u32, 16_384)]
#[case::u32_max_ignored(0xFFFFFFFF_u32, 16_384)]
#[case::valid_32768_accepted(0x00008000_u32, 32_768)]
fn test_settings_max_frame_size_validation(
#[case] settings_value: u32,
#[case] expected_max_frame_size: u32,
) {
let mut state = H2ConnectionState::new();
let mut buffer = frame::CONNECTION_PREFACE.to_vec();
let settings_frame = vec![
0x00,
0x00,
0x06, 0x04, 0x00, 0x00,
0x00,
0x00,
0x00, 0x00,
0x05, (settings_value >> 24) as u8,
(settings_value >> 16) as u8,
(settings_value >> 8) as u8,
settings_value as u8,
];
buffer.extend(settings_frame);
let result = parse_frames_stateful(&buffer, &mut state);
assert!(result.is_ok());
assert_eq!(
state.settings.max_frame_size, expected_max_frame_size,
"max_frame_size={settings_value:#010X} should yield {expected_max_frame_size}"
);
}
#[test]
fn test_completed_message_survives_eviction() {
let mut state = H2ConnectionState::with_limits(H2Limits {
stream_timeout_ns: 1_000_000_000, ..H2Limits::default()
});
let hpack = vec![0x82]; let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(create_headers_frame(1, &hpack));
let _ = state.feed(&buffer, TimestampNs(1_000_000_000));
let popped = state.try_pop();
assert!(popped.is_some(), "Stream 1 should complete and be poppable");
let (sid, msg) = popped.unwrap();
assert_eq!(sid, StreamId(1));
assert_eq!(msg.method.as_deref(), Some("GET"));
state.evict_stale_streams(TimestampNs(3_000_000_000));
assert_eq!(sid, StreamId(1));
assert_eq!(msg.method.as_deref(), Some("GET"));
}
#[test]
fn test_completed_in_queue_not_evicted() {
let mut state = H2ConnectionState::with_limits(H2Limits {
stream_timeout_ns: 1_000_000_000, ..H2Limits::default()
});
let hpack = vec![0x82]; let mut buffer = frame::CONNECTION_PREFACE.to_vec();
buffer.extend_from_slice(&create_settings_frame());
buffer.extend(create_headers_frame(1, &hpack));
let _ = state.feed(&buffer, TimestampNs(1_000_000_000));
assert!(state.has_completed(), "Should have a completed message");
state.evict_stale_streams(TimestampNs(3_000_000_000));
let popped = state.try_pop();
assert!(
popped.is_some(),
"Completed message in queue should survive eviction"
);
let (sid, _) = popped.unwrap();
assert_eq!(sid, StreamId(1));
}