#![allow(unused_mut)]
use rstest::rstest;
use super::*;
struct TestEvent {
payload: Vec<u8>,
timestamp_ns: u64,
direction: Direction,
conn_id: u128,
process_id: u32,
remote_port: u16,
}
impl DataEvent for TestEvent {
fn payload(&self) -> &[u8] {
&self.payload
}
fn timestamp_ns(&self) -> u64 {
self.timestamp_ns
}
fn direction(&self) -> Direction {
self.direction
}
fn connection_id(&self) -> u128 {
self.conn_id
}
fn process_id(&self) -> u32 {
self.process_id
}
fn remote_port(&self) -> u16 {
self.remote_port
}
}
fn make_event(
direction: Direction,
conn_id: u128,
process_id: u32,
remote_port: u16,
timestamp_ns: u64,
payload: &[u8],
) -> TestEvent {
TestEvent {
payload: payload.to_vec(),
timestamp_ns,
direction,
conn_id,
process_id,
remote_port,
}
}
#[test]
fn test_ssl_port_zero_becomes_none() {
let mut collator: Collator<TestEvent> = Collator::new();
let event = make_event(
Direction::Write,
0, 1234,
0, 1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let _ = collator.add_event(event);
let conn = collator.ssl_connections.get(&1234).unwrap();
assert_eq!(conn.remote_port, None, "Port 0 should become None");
}
#[test]
fn test_port_updated_from_later_event() {
let mut collator: Collator<TestEvent> = Collator::new();
let event1 = make_event(
Direction::Write,
0,
1234,
0, 1_000_000,
b"GET / HTTP/1.1\r\n",
);
let _ = collator.add_event(event1);
assert_eq!(
collator.ssl_connections.get(&1234).unwrap().remote_port,
None
);
let event2 = make_event(
Direction::Write,
0,
1234,
8080, 2_000_000,
b"Host: example.com\r\n\r\n",
);
let _ = collator.add_event(event2);
assert_eq!(
collator.ssl_connections.get(&1234).unwrap().remote_port,
Some(8080),
"Port should be updated from later event"
);
}
const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
fn build_settings_frame() -> Vec<u8> {
vec![0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]
}
fn build_headers_frame(stream_id: u32, hpack_block: &[u8]) -> Vec<u8> {
let len = hpack_block.len();
let mut frame = vec![
(len >> 16) as u8,
(len >> 8) as u8,
len as u8,
0x01, 0x04, (stream_id >> 24) as u8 & 0x7F,
(stream_id >> 16) as u8,
(stream_id >> 8) as u8,
stream_id as u8,
];
frame.extend_from_slice(hpack_block);
frame
}
fn build_data_frame(stream_id: u32, data: &[u8], end_stream: bool) -> Vec<u8> {
let len = data.len();
let flags = if end_stream { 0x01 } else { 0x00 };
let mut frame = vec![
(len >> 16) as u8,
(len >> 8) as u8,
len as u8,
0x00, flags,
(stream_id >> 24) as u8 & 0x7F,
(stream_id >> 16) as u8,
(stream_id >> 8) as u8,
stream_id as u8,
];
frame.extend_from_slice(data);
frame
}
fn hpack_get_request() -> Vec<u8> {
let mut block = vec![
0x82, 0x87, 0x84, 0x01, 0x0b, ];
block.extend_from_slice(b"example.com");
block
}
fn hpack_status_200() -> Vec<u8> {
vec![0x88] }
#[test]
fn test_h2_incremental_parsing_no_body_duplication() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 12345u128;
let process_id = 1000u32;
let mut request_chunk1 = H2_PREFACE.to_vec();
request_chunk1.extend(build_settings_frame());
request_chunk1.extend(build_headers_frame(1, &hpack_get_request()));
let event1 = make_event(
Direction::Write,
conn_id,
process_id,
8080,
1_000_000,
&request_chunk1,
);
let _ = collator.add_event(event1);
let data_frame1 = build_data_frame(1, b"hello", false);
let event2 = make_event(
Direction::Write,
conn_id,
process_id,
8080,
2_000_000,
&data_frame1,
);
let _ = collator.add_event(event2);
let data_frame2 = build_data_frame(1, b"world", true);
let event3 = make_event(
Direction::Write,
conn_id,
process_id,
8080,
3_000_000,
&data_frame2,
);
let _ = collator.add_event(event3);
let conn = collator.connections.get(&conn_id).unwrap();
let request = conn.pending_requests.get(&StreamId(1)).unwrap();
assert_eq!(
request.body, b"helloworld",
"Body should not be duplicated when parsing incrementally"
);
}
#[test]
fn test_h2_large_payload_exceeding_max_buf_size() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 54321u128;
let process_id = 4000u32;
let body = vec![0x41u8; 32768]; let mut payload = H2_PREFACE.to_vec();
payload.extend(build_settings_frame());
payload.extend(build_headers_frame(1, &hpack_get_request()));
payload.extend(build_data_frame(1, &body, true));
assert!(
payload.len() > MAX_BUF_SIZE,
"Test payload ({} bytes) must exceed MAX_BUF_SIZE ({MAX_BUF_SIZE})",
payload.len()
);
let event = make_event(
Direction::Write,
conn_id,
process_id,
8080,
1_000_000,
&payload,
);
let events = collator.add_event(event);
let request_msg = events.iter().find_map(|e| {
if let Some((msg, _)) = e.as_message()
&& msg.is_request()
{
return Some(msg.clone());
}
None
});
let request = request_msg.expect("Large payload request should be parsed and emitted");
match request {
ParsedHttpMessage::Request(req) => {
assert_eq!(
req.body.len(),
32768,
"Body should be 32KB, got {} bytes",
req.body.len()
);
assert!(
req.body.iter().all(|&b| b == 0x41),
"Body content should be all 'A' bytes"
);
},
_ => panic!("Expected a request message"),
}
}
#[test]
fn test_h2_fd_reuse_resets_parser_on_new_preface() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 88888u128;
let process_id = 5000u32;
let hpack = hpack_get_request();
let mut req1 = H2_PREFACE.to_vec();
req1.extend(build_settings_frame());
let mut headers = vec![
(hpack.len() >> 16) as u8,
(hpack.len() >> 8) as u8,
hpack.len() as u8,
0x01, 0x05, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
req1.extend(headers);
let req1_event = make_event(Direction::Read, conn_id, process_id, 80, 1_000_000, &req1);
let _ = collator.add_event(req1_event);
let resp_hpack = hpack_status_200();
let mut resp1 = vec![
(resp_hpack.len() >> 16) as u8,
(resp_hpack.len() >> 8) as u8,
resp_hpack.len() as u8,
0x01,
0x05,
0x00,
0x00,
0x00,
0x01,
];
resp1.extend(&resp_hpack);
let resp1_event = make_event(Direction::Write, conn_id, process_id, 80, 2_000_000, &resp1);
let events1 = collator.add_event(resp1_event);
assert!(
events1.iter().any(|e| e.is_exchange()),
"First exchange should complete"
);
let body = vec![0x42u8; 32768]; let mut req2 = H2_PREFACE.to_vec();
req2.extend(build_settings_frame());
req2.extend(build_headers_frame(1, &hpack_get_request()));
req2.extend(build_data_frame(1, &body, true));
let req2_event = make_event(Direction::Read, conn_id, process_id, 80, 3_000_000, &req2);
let events2 = collator.add_event(req2_event);
let request_msg = events2.iter().find_map(|e| {
if let Some((msg, _)) = e.as_message()
&& msg.is_request()
{
return Some(msg.clone());
}
None
});
let request = request_msg.expect("Second h2 connection on reused fd should parse successfully");
match request {
ParsedHttpMessage::Request(req) => {
assert_eq!(
req.body.len(),
32768,
"Body should be 32KB, got {} bytes",
req.body.len()
);
},
_ => panic!("Expected a request message"),
}
}
#[test]
fn test_h2_fd_reuse_split_chunks_with_response() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 88889u128;
let process_id = 5001u32;
let hpack = hpack_get_request();
let mut req1 = H2_PREFACE.to_vec();
req1.extend(build_settings_frame());
let mut headers = vec![
(hpack.len() >> 16) as u8,
(hpack.len() >> 8) as u8,
hpack.len() as u8,
0x01, 0x05, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
req1.extend(headers);
let req1_event = make_event(Direction::Read, conn_id, process_id, 80, 1_000_000, &req1);
let _ = collator.add_event(req1_event);
let resp_hpack = hpack_status_200();
let mut resp1 = vec![
(resp_hpack.len() >> 16) as u8,
(resp_hpack.len() >> 8) as u8,
resp_hpack.len() as u8,
0x01,
0x05,
0x00,
0x00,
0x00,
0x01,
];
resp1.extend(&resp_hpack);
let resp1_event = make_event(Direction::Write, conn_id, process_id, 80, 2_000_000, &resp1);
let events1 = collator.add_event(resp1_event);
assert!(
events1.iter().any(|e| e.is_exchange()),
"First exchange should complete"
);
let goaway = vec![
0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, ];
let goaway_event = make_event(Direction::Read, conn_id, process_id, 80, 2_500_000, &goaway);
let _ = collator.add_event(goaway_event);
let body = vec![0x42u8; 32768];
let mut post_hpack = vec![
0x83, 0x87, 0x84, 0x01, 0x0b, ];
post_hpack.extend_from_slice(b"example.com");
let mut full_request = H2_PREFACE.to_vec();
full_request.extend(build_settings_frame());
full_request.extend(build_headers_frame(1, &post_hpack));
full_request.extend(build_data_frame(1, &body[..16384], false));
full_request.extend(build_data_frame(1, &body[16384..], true));
let split_points = [
200, 2000, 8000, 16400, 20000, 32000, ];
let mut prev = 0;
let mut ts = 3_000_000u64;
for &split in &split_points {
let end = split.min(full_request.len());
if prev >= full_request.len() {
break;
}
let chunk = &full_request[prev..end];
let event = make_event(Direction::Read, conn_id, process_id, 80, ts, chunk);
let _ = collator.add_event(event);
prev = end;
ts += 100_000;
}
if prev < full_request.len() {
let chunk = &full_request[prev..];
let event = make_event(Direction::Read, conn_id, process_id, 80, ts, chunk);
let _ = collator.add_event(event);
ts += 100_000;
}
{
let conn = collator.connections.get(&conn_id).unwrap();
assert!(
conn.pending_requests.contains_key(&StreamId(1)),
"Second exchange request should be in pending_requests"
);
let req = conn.pending_requests.get(&StreamId(1)).unwrap();
assert_eq!(
req.body.len(),
32768,
"Body should be 32KB, got {} bytes",
req.body.len()
);
}
let resp2_hpack = hpack_status_200();
let mut resp2 = vec![
(resp2_hpack.len() >> 16) as u8,
(resp2_hpack.len() >> 8) as u8,
resp2_hpack.len() as u8,
0x01,
0x05,
0x00,
0x00,
0x00,
0x01,
];
resp2.extend(&resp2_hpack);
let resp2_event = make_event(Direction::Write, conn_id, process_id, 80, ts, &resp2);
let events2 = collator.add_event(resp2_event);
let exchange = events2
.iter()
.find_map(|e| e.as_exchange())
.expect("Second exchange should complete after fd-reuse with split chunks");
assert_eq!(exchange.request.method, http::Method::POST);
assert_eq!(
exchange.request.body.len(),
32768,
"Exchange body should be 32KB"
);
assert_eq!(exchange.response.status, http::StatusCode::OK);
}
#[test]
fn test_h2_per_stream_latency() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 99999u128;
let process_id = 2000u32;
let mut request = H2_PREFACE.to_vec();
request.extend(build_settings_frame());
let hpack = hpack_get_request();
let mut headers = vec![
(hpack.len() >> 16) as u8,
(hpack.len() >> 8) as u8,
hpack.len() as u8,
0x01, 0x05, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
request.extend(headers);
let req_event = make_event(
Direction::Write,
conn_id,
process_id,
443,
1_000_000_000, &request,
);
let _ = collator.add_event(req_event);
let response_hpack = hpack_status_200();
let mut response = vec![
(response_hpack.len() >> 16) as u8,
(response_hpack.len() >> 8) as u8,
response_hpack.len() as u8,
0x01, 0x05, 0x00,
0x00,
0x00,
0x01, ];
response.extend(&response_hpack);
let resp_event = make_event(
Direction::Read,
conn_id,
process_id,
443,
1_050_000_000, &response,
);
let events = collator.add_event(resp_event);
let exchange = events
.iter()
.find_map(|e| e.as_exchange())
.expect("Should produce a complete exchange");
assert!(
exchange.latency_ns > 0,
"Latency should be > 0, got {} ns",
exchange.latency_ns
);
let expected_latency = 50_000_000u64; assert!(
exchange.latency_ns >= expected_latency - 1_000_000
&& exchange.latency_ns <= expected_latency + 1_000_000,
"Expected latency ~50ms, got {} ns",
exchange.latency_ns
);
}
#[rstest]
#[case::port_unavailable(None, "Port: unavailable")]
#[case::port_available(Some(8080), "Port: 8080")]
fn test_exchange_display_port(#[case] remote_port: Option<u16>, #[case] expected_text: &str) {
let exchange = Exchange {
request: HttpRequest {
method: http::Method::GET,
uri: "/".parse().unwrap(),
headers: http::HeaderMap::new(),
body: vec![],
timestamp_ns: TimestampNs(0),
version: None,
},
response: HttpResponse {
status: http::StatusCode::OK,
headers: http::HeaderMap::new(),
body: vec![],
timestamp_ns: TimestampNs(0),
version: None,
reason: None,
},
latency_ns: 1_000_000,
protocol: Protocol::Http2,
process_id: 1234,
remote_port,
stream_id: Some(StreamId(1)),
};
let display = format!("{exchange}");
assert!(
display.contains(expected_text),
"Display should contain '{expected_text}'"
);
}
#[rstest]
#[case::messages_only(CollatorConfig::messages_only(), true, false)]
#[case::exchanges_only(CollatorConfig::exchanges_only(), false, true)]
#[case::default_emits_both(CollatorConfig::default(), true, true)]
fn test_collator_config(
#[case] config: CollatorConfig,
#[case] expect_messages: bool,
#[case] expect_exchanges: bool,
) {
assert_eq!(config.emit_messages, expect_messages);
assert_eq!(config.emit_exchanges, expect_exchanges);
}
#[test]
fn test_http1_emits_request_message() {
let mut collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(event);
assert_eq!(events.len(), 1);
let (msg, metadata) = events[0].as_message().expect("Should be a Message event");
assert!(msg.is_request());
assert_eq!(metadata.protocol, Protocol::Http1);
assert_eq!(metadata.connection_id, 1);
assert_eq!(metadata.process_id, 1234);
}
#[test]
fn test_http1_emits_response_message() {
let mut collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let event = make_event(
Direction::Read,
1,
1234,
8080,
1_000_000,
b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello",
);
let events = collator.add_event(event);
assert_eq!(events.len(), 1);
let (msg, metadata) = events[0].as_message().expect("Should be a Message event");
assert!(msg.is_response());
assert_eq!(metadata.protocol, Protocol::Http1);
}
#[test]
fn test_http1_complete_exchange_emits_both_types() {
let mut collator: Collator<TestEvent> = Collator::new();
let req_event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(req_event);
assert_eq!(events.len(), 1, "Request should emit 1 Message event");
assert!(events[0].is_message());
let resp_event = make_event(
Direction::Read,
1,
1234,
8080,
2_000_000,
b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello",
);
let events = collator.add_event(resp_event);
assert_eq!(events.len(), 2, "Response should emit Message + Exchange");
assert!(events.iter().any(|e| e.is_message()));
assert!(events.iter().any(|e| e.is_exchange()));
}
#[test]
fn test_exchanges_only_skips_message_events() {
let mut collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::exchanges_only());
let req_event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(req_event);
assert!(events.is_empty(), "Should not emit Message events");
let resp_event = make_event(
Direction::Read,
1,
1234,
8080,
2_000_000,
b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello",
);
let events = collator.add_event(resp_event);
assert_eq!(events.len(), 1);
assert!(events[0].is_exchange());
}
#[test]
fn test_message_not_emitted_twice() {
let mut collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let event1 = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\n",
);
let events1 = collator.add_event(event1);
assert!(events1.is_empty(), "Incomplete request should not emit");
let event2 = make_event(
Direction::Write,
1,
1234,
8080,
2_000_000,
b"Host: example.com\r\n\r\n",
);
let events2 = collator.add_event(event2);
assert_eq!(events2.len(), 1, "Complete request should emit once");
let event3 = make_event(Direction::Write, 1, 1234, 8080, 3_000_000, b"");
let events3 = collator.add_event(event3);
assert!(events3.is_empty(), "Empty payload should not emit");
}
#[test]
fn test_h2_server_side_monitoring() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 77777u128;
let process_id = 3000u32;
let mut request = H2_PREFACE.to_vec();
request.extend(build_settings_frame());
let hpack = hpack_get_request();
let mut headers = vec![
(hpack.len() >> 16) as u8,
(hpack.len() >> 8) as u8,
hpack.len() as u8,
0x01, 0x05, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
request.extend(headers);
let req_event = make_event(
Direction::Read, conn_id,
process_id,
443,
1_000_000_000,
&request,
);
let events = collator.add_event(req_event);
let request_msg = events.iter().find_map(|e| {
if let Some((msg, _)) = e.as_message()
&& msg.is_request()
{
return Some(msg.clone());
}
None
});
assert!(
request_msg.is_some(),
"Server should see request on Read direction"
);
let response_hpack = hpack_status_200();
let mut response = vec![
(response_hpack.len() >> 16) as u8,
(response_hpack.len() >> 8) as u8,
response_hpack.len() as u8,
0x01, 0x05, 0x00,
0x00,
0x00,
0x01, ];
response.extend(&response_hpack);
let resp_event = make_event(
Direction::Write, conn_id,
process_id,
443,
1_050_000_000,
&response,
);
let events = collator.add_event(resp_event);
let exchange = events
.iter()
.find_map(|e| e.as_exchange())
.expect("Should produce a complete exchange with inverted directions");
assert_eq!(
exchange.request.method,
http::Method::GET,
"Request method should be GET"
);
assert_eq!(
exchange.response.status,
http::StatusCode::OK,
"Response status should be 200 OK"
);
assert!(
exchange.latency_ns > 0,
"Latency should be > 0, got {} ns",
exchange.latency_ns
);
let expected_latency = 50_000_000u64; assert!(
exchange.latency_ns >= expected_latency - 1_000_000
&& exchange.latency_ns <= expected_latency + 1_000_000,
"Expected latency ~50ms, got {} ns",
exchange.latency_ns
);
}
#[test]
fn test_cleanup_removes_stale_connections() {
let config = CollatorConfig {
timeout_ns: 5_000_000_000, ..CollatorConfig::default()
};
let mut collator: Collator<TestEvent> = Collator::with_config(config);
let event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let _ = collator.add_event(event);
assert_eq!(collator.connections.len(), 1);
collator.cleanup(TimestampNs(3_000_000_000));
assert_eq!(
collator.connections.len(),
1,
"Connection should survive (2s < 5s timeout)"
);
collator.cleanup(TimestampNs(7_000_000_000));
assert_eq!(
collator.connections.len(),
0,
"Connection should be removed (6s > 5s timeout)"
);
}
#[test]
fn test_cleanup_evicts_stale_h2_streams() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 42u128;
let hpack = hpack_get_request();
let mut payload = H2_PREFACE.to_vec();
payload.extend(build_settings_frame());
let mut headers = vec![
(hpack.len() >> 16) as u8,
(hpack.len() >> 8) as u8,
hpack.len() as u8,
0x01, 0x04, 0x00,
0x00,
0x00,
0x01, ];
headers.extend(&hpack);
payload.extend(headers);
let event = make_event(
Direction::Write,
conn_id,
1000,
8080,
1_000_000_000,
&payload,
);
let _ = collator.add_event(event);
{
let conn = collator.connections.get(&conn_id).unwrap();
assert_eq!(conn.h2_write_state.active_stream_count(), 1);
}
collator
.connections
.get_mut(&conn_id)
.unwrap()
.last_activity_ns = TimestampNs(39_000_000_000);
collator.cleanup(TimestampNs(40_000_000_000));
assert_eq!(
collator.connections.len(),
1,
"Connection should survive (recent activity)"
);
let conn = collator.connections.get(&conn_id).unwrap();
assert_eq!(
conn.h2_write_state.active_stream_count(),
0,
"Stale H2 stream should be evicted by cleanup"
);
}
#[test]
fn test_body_size_limit_resets_connection() {
let config = CollatorConfig {
max_body_size: 100, ..CollatorConfig::default()
};
let mut collator: Collator<TestEvent> = Collator::with_config(config);
let large_body = vec![b'X'; 200];
let mut payload = b"POST / HTTP/1.1\r\nContent-Length: 200\r\n\r\n".to_vec();
payload.extend(&large_body);
let event1 = make_event(Direction::Write, 1, 1234, 8080, 1_000_000, &payload[..80]);
let events1 = collator.add_event(event1);
assert!(events1.is_empty());
let event2 = make_event(Direction::Write, 1, 1234, 8080, 2_000_000, &payload[80..]);
let events2 = collator.add_event(event2);
assert!(
events2.is_empty(),
"Should not emit after body limit exceeded"
);
let conn = collator.connections.get(&1).unwrap();
assert_eq!(conn.protocol, Protocol::Unknown);
assert!(conn.request_chunks.is_empty());
assert_eq!(conn.request_body_size, 0);
}
#[test]
fn test_body_size_limit_normal_request_works() {
let config = CollatorConfig {
max_body_size: 1000,
..CollatorConfig::default()
};
let mut collator: Collator<TestEvent> = Collator::with_config(config);
let event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(event);
assert_eq!(events.len(), 1, "Normal request should parse fine");
assert!(events[0].is_message());
}
#[test]
fn test_fd_reuse_http2_to_http1() {
let mut collator: Collator<TestEvent> = Collator::new();
let conn_id = 55555u128;
let hpack = hpack_get_request();
let mut h2_req = H2_PREFACE.to_vec();
h2_req.extend(build_settings_frame());
let mut headers = vec![
(hpack.len() >> 16) as u8,
(hpack.len() >> 8) as u8,
hpack.len() as u8,
0x01,
0x05, 0x00,
0x00,
0x00,
0x01,
];
headers.extend(&hpack);
h2_req.extend(headers);
let h2_event = make_event(Direction::Write, conn_id, 1000, 80, 1_000_000, &h2_req);
let _ = collator.add_event(h2_event);
assert_eq!(
collator.connections.get(&conn_id).unwrap().protocol,
Protocol::Http2
);
let h1_req = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
let h1_event = make_event(Direction::Write, conn_id, 1000, 80, 2_000_000, h1_req);
let events = collator.add_event(h1_event);
let conn = collator.connections.get(&conn_id).unwrap();
assert_eq!(
conn.protocol,
Protocol::Http1,
"Protocol should switch to HTTP/1"
);
assert!(
events.iter().any(|e| e.is_message()),
"HTTP/1 request should be parsed after protocol change"
);
}
#[test]
fn test_cleanup_clock_skew_no_panic() {
let config = CollatorConfig {
timeout_ns: 5_000_000_000,
..CollatorConfig::default()
};
let collator: Collator<TestEvent> = Collator::with_config(config);
collator.connections.insert(1, Conn::new(1234, 8080));
collator.connections.get_mut(&1).unwrap().last_activity_ns = TimestampNs(10_000_000_000);
collator.cleanup(TimestampNs(5_000_000_000));
assert_eq!(
collator.connections.len(),
1,
"Connection should survive clock skew"
);
}
#[test]
fn test_close_connection_finalizes_http1_response() {
let collator: Collator<TestEvent> = Collator::new();
let req_event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let _ = collator.add_event(req_event);
let resp_event = make_event(
Direction::Read,
1,
1234,
8080,
2_000_000,
b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nHello World",
);
let events = collator.add_event(resp_event);
assert!(
!events
.iter()
.any(|e| { e.as_message().is_some_and(|(msg, _)| msg.is_response()) }),
"Response without framing should not be emitted yet"
);
let close_events = collator.close_connection(1, 1234);
let has_response = close_events
.iter()
.any(|e| e.as_message().is_some_and(|(msg, _)| msg.is_response()));
let has_exchange = close_events.iter().any(|e| e.is_exchange());
assert!(
has_response || has_exchange,
"close_connection should finalize the pending response"
);
assert!(
collator.connections.get(&1).is_none(),
"Connection should be removed after close"
);
}
#[test]
fn test_h1_server_side_request_on_read() {
let collator: Collator<TestEvent> = Collator::new();
let req_event = make_event(
Direction::Read,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(req_event);
let has_request = events
.iter()
.any(|e| e.as_message().is_some_and(|(msg, _)| msg.is_request()));
assert!(
has_request,
"Server should see request arriving on Read direction"
);
}
#[test]
fn test_h1_server_side_full_exchange() {
let collator: Collator<TestEvent> = Collator::new();
let req_event = make_event(
Direction::Read,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let _ = collator.add_event(req_event);
let resp_event = make_event(
Direction::Write,
1,
1234,
8080,
2_000_000,
b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello",
);
let events = collator.add_event(resp_event);
let has_response = events
.iter()
.any(|e| e.as_message().is_some_and(|(msg, _)| msg.is_response()));
let has_exchange = events.iter().any(|e| e.is_exchange());
assert!(has_response, "Server should emit response on Write");
assert!(has_exchange, "Server-side exchange should complete");
}
#[test]
fn test_unknown_protocol_webdav_propfind() {
let collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"PROPFIND / HTTP/1.1\r\nHost: example.com\r\nDepth: 1\r\n\r\n",
);
let events = collator.add_event(event);
assert_eq!(events.len(), 1, "PROPFIND should be parsed as HTTP/1");
let (msg, metadata) = events[0].as_message().expect("Should be a Message");
assert!(msg.is_request());
assert_eq!(
metadata.protocol,
Protocol::Http1,
"Protocol should be promoted to Http1"
);
let conn = collator.connections.get(&1).unwrap();
assert_eq!(conn.protocol, Protocol::Http1);
}
#[test]
fn test_unknown_protocol_webdav_mkcol() {
let collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"MKCOL /newdir HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(event);
assert_eq!(events.len(), 1, "MKCOL should be parsed as HTTP/1");
let (msg, _) = events[0].as_message().expect("Should be a Message");
assert!(msg.is_request());
}
#[test]
fn test_h1_version_and_reason_populated() {
let collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let req_event = make_event(
Direction::Write,
1,
1234,
8080,
1_000_000,
b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events = collator.add_event(req_event);
let (msg, _) = events[0].as_message().unwrap();
let req = msg.as_request().unwrap();
assert_eq!(req.version, Some(1), "HTTP/1.1 should have version=Some(1)");
let resp_event = make_event(
Direction::Read,
1,
1234,
8080,
2_000_000,
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n",
);
let events = collator.add_event(resp_event);
let (msg, _) = events[0].as_message().unwrap();
let resp = msg.as_response().unwrap();
assert_eq!(
resp.version,
Some(1),
"HTTP/1.1 should have version=Some(1)"
);
assert_eq!(
resp.reason.as_deref(),
Some("OK"),
"Reason phrase should be preserved"
);
}
#[test]
fn test_h1_version_10() {
let collator: Collator<TestEvent> = Collator::with_config(CollatorConfig::messages_only());
let resp_event = make_event(
Direction::Read,
1,
1234,
8080,
1_000_000,
b"HTTP/1.0 301 Moved Permanently\r\nContent-Length: 0\r\n\r\n",
);
let events = collator.add_event(resp_event);
let (msg, _) = events[0].as_message().unwrap();
let resp = msg.as_response().unwrap();
assert_eq!(
resp.version,
Some(0),
"HTTP/1.0 should have version=Some(0)"
);
assert_eq!(resp.reason.as_deref(), Some("Moved Permanently"));
}
#[test]
fn test_h1_two_pipelined_requests_on_same_connection() {
let mut collator: Collator<TestEvent> = Collator::new();
let ev1 = make_event(
Direction::Read,
42,
1234,
443,
1_000_000,
b"GET /first HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events1 = collator.add_event(ev1);
assert_eq!(
events1.len(),
1,
"first request must surface as a Message event, got: {events1:?}"
);
let (msg1, _) = events1[0].as_message().unwrap();
assert_eq!(msg1.as_request().unwrap().uri.path(), "/first");
let ev2 = make_event(
Direction::Read,
42,
1234,
443,
2_000_000,
b"GET /second HTTP/1.1\r\nHost: example.com\r\n\r\n",
);
let events2 = collator.add_event(ev2);
assert_eq!(
events2.len(),
1,
"second pipelined request on same conn_id must surface as a Message event, got: {events2:?}"
);
let (msg2, _) = events2[0].as_message().unwrap();
assert_eq!(msg2.as_request().unwrap().uri.path(), "/second");
}
#[test]
fn test_h1_ten_sequential_requests_on_same_connection() {
let mut collator: Collator<TestEvent> = Collator::new();
for i in 0..10 {
let path = format!("/req{i}");
let buf = format!("GET {path} HTTP/1.1\r\nHost: example.com\r\n\r\n");
let ev = make_event(
Direction::Read,
99,
4321,
443,
(i + 1) as u64 * 1_000_000,
buf.as_bytes(),
);
let events = collator.add_event(ev);
assert_eq!(
events.len(),
1,
"iteration {i}: expected one Message event per request on keep-alive conn, got: {events:?}"
);
let (msg, _) = events[0].as_message().unwrap();
assert_eq!(
msg.as_request().unwrap().uri.path(),
path,
"iteration {i}: wrong path in emitted message"
);
}
}