#![warn(missing_docs)]
mod connection;
mod exchange;
pub mod h1;
mod traits;
use std::marker::PhantomData;
pub use connection::Protocol;
use connection::{Connection as Conn, DataChunk};
use dashmap::DashMap;
pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
pub use h1::{HttpRequest, HttpResponse};
use h2session::{
H2ConnectionState,
StreamId,
TimestampNs,
is_http2_preface,
looks_like_http2_frame,
};
pub use traits::{DataEvent, Direction};
pub const MAX_BUF_SIZE: usize = 16384;
pub struct Collator<E: DataEvent> {
connections: DashMap<u128, Conn>,
ssl_connections: DashMap<u32, Conn>,
config: CollatorConfig,
_phantom: PhantomData<E>,
}
impl<E: DataEvent> Default for Collator<E> {
fn default() -> Self {
Self::new()
}
}
impl<E: DataEvent> Collator<E> {
pub fn new() -> Self {
Self::with_config(CollatorConfig::default())
}
pub fn with_config(config: CollatorConfig) -> Self {
Self {
connections: DashMap::new(),
ssl_connections: DashMap::new(),
config,
_phantom: PhantomData,
}
}
pub fn with_max_buf_size(max_buf_size: usize) -> Self {
Self::with_config(CollatorConfig {
max_buf_size,
..Default::default()
})
}
pub fn config(&self) -> &CollatorConfig {
&self.config
}
pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
let direction = event.direction();
let timestamp_ns = TimestampNs(event.timestamp_ns());
let conn_id = event.connection_id();
let process_id = event.process_id();
let remote_port = event.remote_port();
let is_empty = event.payload().is_empty();
if is_empty {
return Vec::new();
}
if direction == Direction::Other {
return Vec::new();
}
let data = event.into_payload();
let chunk = DataChunk {
data,
timestamp_ns,
direction,
};
if conn_id != 0 {
let mut conn = self
.connections
.entry(conn_id)
.or_insert_with(|| Conn::new(process_id, remote_port));
Self::process_event_for_conn(
&mut conn,
chunk,
direction,
timestamp_ns,
remote_port,
conn_id,
process_id,
&self.config,
)
} else {
let mut conn = self
.ssl_connections
.entry(process_id)
.or_insert_with(|| Conn::new(process_id, remote_port));
Self::process_event_for_conn(
&mut conn,
chunk,
direction,
timestamp_ns,
remote_port,
conn_id,
process_id,
&self.config,
)
}
}
#[allow(clippy::too_many_arguments)]
fn process_event_for_conn(
conn: &mut Conn,
chunk: DataChunk,
direction: Direction,
timestamp_ns: TimestampNs,
remote_port: u16,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
) -> Vec<CollationEvent> {
let buf: &[u8] = &chunk.data;
conn.last_activity_ns = timestamp_ns;
if remote_port != 0 && conn.remote_port.is_none() {
conn.remote_port = Some(remote_port);
}
if conn.protocol == Protocol::Unknown {
conn.protocol = detect_protocol(buf);
}
if conn.protocol != Protocol::Unknown {
let incoming_protocol = detect_protocol(buf);
if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
reset_connection_for_protocol_change(conn, incoming_protocol);
}
}
let mut events = Vec::new();
match direction {
Direction::Write => {
conn.request_body_size += buf.len();
if conn.request_body_size > config.max_body_size {
reset_connection_body_limit(conn);
return Vec::new();
}
if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
conn.h1_write_buffer.extend_from_slice(buf);
}
conn.request_chunks.push(chunk);
match conn.protocol {
Protocol::Http1 => {
drain_parse_emit_http1_write(
conn,
conn_id,
process_id,
config,
&mut events,
);
},
Protocol::Http2 => {
parse_http2_chunks(conn, direction);
},
Protocol::Unknown => {
drain_parse_emit_http1_unknown_write(
conn,
conn_id,
process_id,
config,
&mut events,
);
},
}
if is_request_complete(conn) {
conn.request_complete = true;
}
if is_response_complete(conn) {
conn.response_complete = true;
}
},
Direction::Read => {
conn.response_body_size += buf.len();
if conn.response_body_size > config.max_body_size {
reset_connection_body_limit(conn);
return Vec::new();
}
if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
conn.h1_read_buffer.extend_from_slice(buf);
}
conn.response_chunks.push(chunk);
match conn.protocol {
Protocol::Http1 => {
drain_parse_emit_http1_read(
conn,
conn_id,
process_id,
config,
&mut events,
);
},
Protocol::Http2 => {
parse_http2_chunks(conn, direction);
},
Protocol::Unknown => {
drain_parse_emit_http1_unknown_read(
conn,
conn_id,
process_id,
config,
&mut events,
);
},
}
if is_request_complete(conn) {
conn.request_complete = true;
}
if is_response_complete(conn) {
conn.response_complete = true;
}
},
Direction::Other => {
return Vec::new();
},
}
if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
conn.request_complete = true;
conn.response_complete = true;
}
if config.emit_messages {
emit_message_events(conn, conn_id, process_id, &mut events);
}
if config.emit_exchanges && conn.request_complete && conn.response_complete {
if let Some(exchange) = build_exchange(conn) {
events.push(CollationEvent::Exchange(exchange));
}
reset_connection_after_exchange(conn);
}
events
}
pub fn cleanup(&self, current_time_ns: TimestampNs) {
self.connections.retain(|_, conn| {
current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
});
self.ssl_connections.retain(|_, conn| {
current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
});
for mut entry in self.connections.iter_mut() {
entry.h2_write_state.evict_stale_streams(current_time_ns);
entry.h2_read_state.evict_stale_streams(current_time_ns);
}
for mut entry in self.ssl_connections.iter_mut() {
entry.h2_write_state.evict_stale_streams(current_time_ns);
entry.h2_read_state.evict_stale_streams(current_time_ns);
}
}
pub fn remove_connection(&self, connection_id: u128, process_id: u32) {
if connection_id != 0 {
self.connections.remove(&connection_id);
} else {
self.ssl_connections.remove(&process_id);
}
}
pub fn close_connection(&self, connection_id: u128, process_id: u32) -> Vec<CollationEvent> {
let events = if connection_id != 0 {
match self.connections.get_mut(&connection_id) {
Some(mut guard) => {
finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
},
None => Vec::new(),
}
} else {
match self.ssl_connections.get_mut(&process_id) {
Some(mut guard) => {
finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
},
None => Vec::new(),
}
};
if connection_id != 0 {
self.connections.remove(&connection_id);
} else {
self.ssl_connections.remove(&process_id);
}
events
}
}
fn finalize_and_emit(
conn: &mut Conn,
connection_id: u128,
process_id: u32,
config: &CollatorConfig,
) -> Vec<CollationEvent> {
if conn.protocol == Protocol::Http1
&& conn.h1_response.is_none()
&& !conn.h1_read_buffer.is_empty()
{
let timestamp = conn
.response_chunks
.first()
.map(|c| c.timestamp_ns)
.unwrap_or(TimestampNs(0));
conn.h1_response = h1::try_finalize_http1_response(&conn.h1_read_buffer, timestamp);
if conn.h1_response.is_some() {
conn.response_complete = true;
}
}
let mut events = Vec::new();
if config.emit_messages {
emit_message_events(conn, connection_id, process_id, &mut events);
}
if config.emit_exchanges
&& conn.request_complete
&& conn.response_complete
&& let Some(exchange) = build_exchange(conn)
{
events.push(CollationEvent::Exchange(exchange));
}
events
}
fn emit_message_events(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
events: &mut Vec<CollationEvent>,
) {
match conn.protocol {
Protocol::Http1 => {
if let Some(ref req) = conn.h1_request
&& !conn.h1_request_emitted
{
let metadata = MessageMetadata {
connection_id: conn_id,
process_id,
timestamp_ns: req.timestamp_ns,
stream_id: None,
remote_port: conn.remote_port,
protocol: conn.protocol,
};
events.push(CollationEvent::Message {
message: ParsedHttpMessage::Request(req.clone()),
metadata,
});
conn.h1_request_emitted = true;
}
if let Some(ref resp) = conn.h1_response
&& !conn.h1_response_emitted
{
let metadata = MessageMetadata {
connection_id: conn_id,
process_id,
timestamp_ns: resp.timestamp_ns,
stream_id: None,
remote_port: conn.remote_port,
protocol: conn.protocol,
};
events.push(CollationEvent::Message {
message: ParsedHttpMessage::Response(resp.clone()),
metadata,
});
conn.h1_response_emitted = true;
}
},
Protocol::Http2 => {
for (&stream_id, msg) in &conn.pending_requests {
if !conn.h2_emitted_requests.contains(&stream_id)
&& let Some(req) = msg.to_http_request()
{
let metadata = MessageMetadata {
connection_id: conn_id,
process_id,
timestamp_ns: msg.end_stream_timestamp_ns,
stream_id: Some(stream_id),
remote_port: conn.remote_port,
protocol: conn.protocol,
};
events.push(CollationEvent::Message {
message: ParsedHttpMessage::Request(req),
metadata,
});
}
}
conn.h2_emitted_requests
.extend(conn.pending_requests.keys().copied());
for (&stream_id, msg) in &conn.pending_responses {
if !conn.h2_emitted_responses.contains(&stream_id)
&& let Some(resp) = msg.to_http_response()
{
let metadata = MessageMetadata {
connection_id: conn_id,
process_id,
timestamp_ns: msg.first_frame_timestamp_ns,
stream_id: Some(stream_id),
remote_port: conn.remote_port,
protocol: conn.protocol,
};
events.push(CollationEvent::Message {
message: ParsedHttpMessage::Response(resp),
metadata,
});
}
}
conn.h2_emitted_responses
.extend(conn.pending_responses.keys().copied());
},
Protocol::Unknown => {},
}
}
fn reset_connection_after_exchange(conn: &mut Conn) {
conn.request_complete = false;
conn.response_complete = false;
if conn.protocol == Protocol::Http1 {
conn.request_chunks.clear();
conn.response_chunks.clear();
conn.h1_request = None;
conn.h1_response = None;
conn.h1_write_parsed = false;
conn.h1_read_parsed = false;
conn.h1_request_emitted = false;
conn.h1_response_emitted = false;
conn.h1_write_buffer.clear();
conn.h1_read_buffer.clear();
conn.protocol = Protocol::Unknown;
} else if conn.protocol == Protocol::Http2 {
if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
conn.request_chunks.clear();
conn.response_chunks.clear();
conn.h2_write_state.clear_buffer();
conn.h2_read_state.clear_buffer();
}
}
conn.request_body_size = 0;
conn.response_body_size = 0;
}
fn reset_connection_body_limit(conn: &mut Conn) {
conn.request_chunks.clear();
conn.response_chunks.clear();
conn.h1_write_buffer.clear();
conn.h1_read_buffer.clear();
conn.h1_request = None;
conn.h1_response = None;
conn.h1_write_parsed = false;
conn.h1_read_parsed = false;
conn.h1_request_emitted = false;
conn.h1_response_emitted = false;
conn.h2_write_state = H2ConnectionState::new();
conn.h2_read_state = H2ConnectionState::new();
conn.pending_requests.clear();
conn.pending_responses.clear();
conn.h2_emitted_requests.clear();
conn.h2_emitted_responses.clear();
conn.ready_streams.clear();
conn.request_complete = false;
conn.response_complete = false;
conn.request_body_size = 0;
conn.response_body_size = 0;
conn.protocol = Protocol::Unknown;
}
fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
conn.request_chunks.clear();
conn.response_chunks.clear();
conn.h1_write_buffer.clear();
conn.h1_read_buffer.clear();
conn.h1_request = None;
conn.h1_response = None;
conn.h1_write_parsed = false;
conn.h1_read_parsed = false;
conn.h1_request_emitted = false;
conn.h1_response_emitted = false;
conn.h2_write_state = H2ConnectionState::new();
conn.h2_read_state = H2ConnectionState::new();
conn.pending_requests.clear();
conn.pending_responses.clear();
conn.h2_emitted_requests.clear();
conn.h2_emitted_responses.clear();
conn.ready_streams.clear();
conn.request_complete = false;
conn.response_complete = false;
conn.request_body_size = 0;
conn.response_body_size = 0;
conn.protocol = new_protocol;
}
pub fn detect_protocol(data: &[u8]) -> Protocol {
if is_http2_preface(data) {
return Protocol::Http2;
}
if looks_like_http2_frame(data) {
return Protocol::Http2;
}
if h1::is_http1_request(data) || h1::is_http1_response(data) {
return Protocol::Http1;
}
Protocol::Unknown
}
fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
let last_chunk_is_preface = match direction {
Direction::Write => conn
.request_chunks
.last()
.is_some_and(|c| is_http2_preface(&c.data)),
Direction::Read => conn
.response_chunks
.last()
.is_some_and(|c| is_http2_preface(&c.data)),
Direction::Other => false,
};
let current_state_has_preface = match direction {
Direction::Write => conn.h2_write_state.preface_received,
Direction::Read => conn.h2_read_state.preface_received,
Direction::Other => false,
};
if last_chunk_is_preface && current_state_has_preface {
conn.h2_write_state = H2ConnectionState::new();
conn.h2_read_state = H2ConnectionState::new();
conn.pending_requests.clear();
conn.pending_responses.clear();
conn.h2_emitted_requests.clear();
conn.h2_emitted_responses.clear();
conn.ready_streams.clear();
}
let (chunks, h2_state) = match direction {
Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
Direction::Other => return,
};
let chunk = match chunks.last() {
Some(c) => c,
None => return,
};
let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
while let Some((stream_id, msg)) = h2_state.try_pop() {
if msg.is_request() {
conn.pending_requests.insert(stream_id, msg);
if conn.pending_responses.contains_key(&stream_id) {
conn.ready_streams.insert(stream_id);
}
} else if msg.is_response() {
conn.pending_responses.insert(stream_id, msg);
if conn.pending_requests.contains_key(&stream_id) {
conn.ready_streams.insert(stream_id);
}
}
}
}
fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
conn.ready_streams.iter().next().copied()
}
fn drain_parse_emit_http1_write(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
events: &mut Vec<CollationEvent>,
) {
loop {
let timestamp = conn
.request_chunks
.last()
.map(|c| c.timestamp_ns)
.unwrap_or(TimestampNs(0));
if let Some((req, consumed)) =
h1::try_parse_http1_request_sized(&conn.h1_write_buffer, timestamp)
{
conn.h1_write_buffer.drain(..consumed);
emit_h1_request(conn, conn_id, process_id, config, events, req);
} else if let Some((resp, consumed)) =
h1::try_parse_http1_response_sized(&conn.h1_write_buffer, timestamp)
{
conn.h1_write_buffer.drain(..consumed);
emit_h1_response(conn, conn_id, process_id, config, events, resp);
} else {
break;
}
}
}
fn drain_parse_emit_http1_read(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
events: &mut Vec<CollationEvent>,
) {
loop {
let timestamp = conn
.response_chunks
.last()
.map(|c| c.timestamp_ns)
.unwrap_or(TimestampNs(0));
if let Some((resp, consumed)) =
h1::try_parse_http1_response_sized(&conn.h1_read_buffer, timestamp)
{
conn.h1_read_buffer.drain(..consumed);
emit_h1_response(conn, conn_id, process_id, config, events, resp);
} else if let Some((req, consumed)) =
h1::try_parse_http1_request_sized(&conn.h1_read_buffer, timestamp)
{
conn.h1_read_buffer.drain(..consumed);
emit_h1_request(conn, conn_id, process_id, config, events, req);
} else {
break;
}
}
}
fn drain_parse_emit_http1_unknown_write(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
events: &mut Vec<CollationEvent>,
) {
let timestamp = conn
.request_chunks
.last()
.map(|c| c.timestamp_ns)
.unwrap_or(TimestampNs(0));
if let Some((req, consumed)) =
h1::try_parse_http1_request_sized(&conn.h1_write_buffer, timestamp)
{
conn.protocol = Protocol::Http1;
conn.h1_write_buffer.drain(..consumed);
emit_h1_request(conn, conn_id, process_id, config, events, req);
drain_parse_emit_http1_write(conn, conn_id, process_id, config, events);
} else if let Some((resp, consumed)) =
h1::try_parse_http1_response_sized(&conn.h1_write_buffer, timestamp)
{
conn.protocol = Protocol::Http1;
conn.h1_write_buffer.drain(..consumed);
emit_h1_response(conn, conn_id, process_id, config, events, resp);
drain_parse_emit_http1_write(conn, conn_id, process_id, config, events);
}
}
fn drain_parse_emit_http1_unknown_read(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
events: &mut Vec<CollationEvent>,
) {
let timestamp = conn
.response_chunks
.last()
.map(|c| c.timestamp_ns)
.unwrap_or(TimestampNs(0));
if let Some((resp, consumed)) =
h1::try_parse_http1_response_sized(&conn.h1_read_buffer, timestamp)
{
conn.protocol = Protocol::Http1;
conn.h1_read_buffer.drain(..consumed);
emit_h1_response(conn, conn_id, process_id, config, events, resp);
drain_parse_emit_http1_read(conn, conn_id, process_id, config, events);
} else if let Some((req, consumed)) =
h1::try_parse_http1_request_sized(&conn.h1_read_buffer, timestamp)
{
conn.protocol = Protocol::Http1;
conn.h1_read_buffer.drain(..consumed);
emit_h1_request(conn, conn_id, process_id, config, events, req);
drain_parse_emit_http1_read(conn, conn_id, process_id, config, events);
}
}
fn emit_h1_request(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
events: &mut Vec<CollationEvent>,
req: h1::HttpRequest,
) {
if config.emit_messages {
let metadata = MessageMetadata {
connection_id: conn_id,
process_id,
timestamp_ns: req.timestamp_ns,
stream_id: None,
remote_port: conn.remote_port,
protocol: Protocol::Http1,
};
events.push(CollationEvent::Message {
message: ParsedHttpMessage::Request(req.clone()),
metadata,
});
}
conn.h1_request = Some(req);
conn.h1_request_emitted = true;
}
fn emit_h1_response(
conn: &mut Conn,
conn_id: u128,
process_id: u32,
config: &CollatorConfig,
events: &mut Vec<CollationEvent>,
resp: h1::HttpResponse,
) {
if config.emit_messages {
let metadata = MessageMetadata {
connection_id: conn_id,
process_id,
timestamp_ns: resp.timestamp_ns,
stream_id: None,
remote_port: conn.remote_port,
protocol: Protocol::Http1,
};
events.push(CollationEvent::Message {
message: ParsedHttpMessage::Response(resp.clone()),
metadata,
});
}
conn.h1_response = Some(resp);
conn.h1_response_emitted = true;
}
fn is_request_complete(conn: &Conn) -> bool {
match conn.protocol {
Protocol::Http1 => conn.h1_request.is_some(),
Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
Protocol::Unknown => false,
}
}
fn is_response_complete(conn: &Conn) -> bool {
match conn.protocol {
Protocol::Http1 => conn.h1_response.is_some(),
Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
Protocol::Unknown => false,
}
}
fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
let (request, response, stream_id, latency_ns) = match conn.protocol {
Protocol::Http1 => {
let req = conn.h1_request.take()?;
let resp = conn.h1_response.take()?;
let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
(req, resp, None, latency)
},
Protocol::Http2 => {
let sid = find_complete_h2_stream(conn)?;
let msg_req = conn.pending_requests.remove(&sid)?;
let msg_resp = conn.pending_responses.remove(&sid)?;
conn.h2_emitted_requests.remove(&sid);
conn.h2_emitted_responses.remove(&sid);
conn.ready_streams.remove(&sid);
let request_complete_time = msg_req.end_stream_timestamp_ns;
let response_start_time = msg_resp.first_frame_timestamp_ns;
let req = msg_req.into_http_request()?;
let resp = msg_resp.into_http_response()?;
let latency = response_start_time.saturating_sub(request_complete_time);
(req, resp, Some(sid), latency)
},
Protocol::Unknown => return None,
};
Some(Exchange {
request,
response,
latency_ns,
protocol: conn.protocol,
process_id: conn.process_id,
remote_port: conn.remote_port,
stream_id,
})
}
#[cfg(test)]
mod tests;