use h2session::{StreamId, TimestampNs};
use crate::{
connection::Protocol,
h1::{HttpRequest, HttpResponse},
};
#[derive(Debug, Clone)]
pub enum ParsedHttpMessage {
Request(HttpRequest),
Response(HttpResponse),
}
impl ParsedHttpMessage {
pub fn is_request(&self) -> bool {
matches!(self, Self::Request(_))
}
pub fn is_response(&self) -> bool {
matches!(self, Self::Response(_))
}
pub fn as_request(&self) -> Option<&HttpRequest> {
match self {
Self::Request(req) => Some(req),
Self::Response(_) => None,
}
}
pub fn as_response(&self) -> Option<&HttpResponse> {
match self {
Self::Request(_) => None,
Self::Response(resp) => Some(resp),
}
}
}
#[derive(Debug, Clone)]
pub struct MessageMetadata {
pub connection_id: u128,
pub process_id: u32,
pub timestamp_ns: TimestampNs,
pub stream_id: Option<StreamId>,
pub remote_port: Option<u16>,
pub protocol: Protocol,
}
#[derive(Debug)]
pub enum CollationEvent {
Message {
message: ParsedHttpMessage,
metadata: MessageMetadata,
},
Exchange(Exchange),
}
impl CollationEvent {
pub fn is_message(&self) -> bool {
matches!(self, Self::Message { .. })
}
pub fn is_exchange(&self) -> bool {
matches!(self, Self::Exchange(_))
}
pub fn as_message(&self) -> Option<(&ParsedHttpMessage, &MessageMetadata)> {
match self {
Self::Message { message, metadata } => Some((message, metadata)),
Self::Exchange(_) => None,
}
}
pub fn as_exchange(&self) -> Option<&Exchange> {
match self {
Self::Message { .. } => None,
Self::Exchange(ex) => Some(ex),
}
}
}
#[derive(Debug, Clone)]
pub struct CollatorConfig {
pub emit_messages: bool,
pub emit_exchanges: bool,
pub max_buf_size: usize,
pub timeout_ns: u64,
pub max_body_size: usize,
}
impl Default for CollatorConfig {
fn default() -> Self {
Self {
emit_messages: true,
emit_exchanges: true,
max_buf_size: 16384,
timeout_ns: 5_000_000_000,
max_body_size: 10 * 1024 * 1024, }
}
}
impl CollatorConfig {
pub fn messages_only() -> Self {
Self {
emit_messages: true,
emit_exchanges: false,
..Default::default()
}
}
pub fn exchanges_only() -> Self {
Self {
emit_messages: false,
emit_exchanges: true,
..Default::default()
}
}
}
#[derive(Debug)]
pub struct Exchange {
pub request: HttpRequest,
pub response: HttpResponse,
pub latency_ns: u64,
pub protocol: Protocol,
pub process_id: u32,
pub remote_port: Option<u16>,
pub stream_id: Option<StreamId>,
}
impl std::fmt::Display for Exchange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let proto_str = match self.protocol {
Protocol::Http1 => "HTTP/1.1",
Protocol::Http2 => "HTTP/2",
Protocol::Unknown => "Unknown",
};
let latency_ms = self.latency_ns as f64 / 1_000_000.0;
let port_str = self
.remote_port
.map_or("unavailable".to_string(), |p| p.to_string());
writeln!(
f,
"=== {} Exchange (PID: {}, Port: {}) ===",
proto_str, self.process_id, port_str
)?;
writeln!(f, "Latency: {:.2}ms", latency_ms)?;
writeln!(f)?;
writeln!(f, "--- Request ---")?;
writeln!(f, "{} {}", self.request.method, self.request.uri)?;
for (key, value) in &self.request.headers {
writeln!(f, "{}: {}", key, value.to_str().unwrap_or("<binary>"))?;
}
if !self.request.body.is_empty() {
writeln!(f)?;
writeln!(f, "{}", String::from_utf8_lossy(&self.request.body))?;
}
writeln!(f)?;
writeln!(f, "--- Response ---")?;
let reason = self.response.status.canonical_reason().unwrap_or("");
writeln!(f, "{} {}", self.response.status.as_u16(), reason)?;
for (key, value) in &self.response.headers {
writeln!(f, "{}: {}", key, value.to_str().unwrap_or("<binary>"))?;
}
if !self.response.body.is_empty() {
writeln!(f)?;
writeln!(f, "{}", String::from_utf8_lossy(&self.response.body))?;
}
Ok(())
}
}