1use h2session::{StreamId, TimestampNs};
4
5use crate::{
6 connection::Protocol,
7 h1::{HttpRequest, HttpResponse},
8};
9
10#[derive(Debug, Clone)]
12pub enum ParsedHttpMessage {
13 Request(HttpRequest),
15 Response(HttpResponse),
17}
18
19impl ParsedHttpMessage {
20 pub fn is_request(&self) -> bool {
22 matches!(self, Self::Request(_))
23 }
24
25 pub fn is_response(&self) -> bool {
27 matches!(self, Self::Response(_))
28 }
29
30 pub fn as_request(&self) -> Option<&HttpRequest> {
32 match self {
33 Self::Request(req) => Some(req),
34 Self::Response(_) => None,
35 }
36 }
37
38 pub fn as_response(&self) -> Option<&HttpResponse> {
40 match self {
41 Self::Request(_) => None,
42 Self::Response(resp) => Some(resp),
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct MessageMetadata {
50 pub connection_id: u64,
52 pub process_id: u32,
54 pub timestamp_ns: TimestampNs,
56 pub stream_id: Option<StreamId>,
58 pub remote_port: Option<u16>,
60 pub protocol: Protocol,
62}
63
64#[derive(Debug)]
66pub enum CollationEvent {
67 Message {
69 message: ParsedHttpMessage,
71 metadata: MessageMetadata,
73 },
74 Exchange(Exchange),
76}
77
78impl CollationEvent {
79 pub fn is_message(&self) -> bool {
81 matches!(self, Self::Message { .. })
82 }
83
84 pub fn is_exchange(&self) -> bool {
86 matches!(self, Self::Exchange(_))
87 }
88
89 pub fn as_message(&self) -> Option<(&ParsedHttpMessage, &MessageMetadata)> {
91 match self {
92 Self::Message { message, metadata } => Some((message, metadata)),
93 Self::Exchange(_) => None,
94 }
95 }
96
97 pub fn as_exchange(&self) -> Option<&Exchange> {
99 match self {
100 Self::Message { .. } => None,
101 Self::Exchange(ex) => Some(ex),
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct CollatorConfig {
109 pub emit_messages: bool,
111 pub emit_exchanges: bool,
113 pub max_buf_size: usize,
115 pub timeout_ns: u64,
117 pub max_body_size: usize,
121}
122
123impl Default for CollatorConfig {
124 fn default() -> Self {
125 Self {
126 emit_messages: true,
127 emit_exchanges: true,
128 max_buf_size: 16384,
129 timeout_ns: 5_000_000_000,
130 max_body_size: 10 * 1024 * 1024, }
132 }
133}
134
135impl CollatorConfig {
136 pub fn messages_only() -> Self {
138 Self {
139 emit_messages: true,
140 emit_exchanges: false,
141 ..Default::default()
142 }
143 }
144
145 pub fn exchanges_only() -> Self {
147 Self {
148 emit_messages: false,
149 emit_exchanges: true,
150 ..Default::default()
151 }
152 }
153}
154
155#[derive(Debug)]
157pub struct Exchange {
158 pub request: HttpRequest,
160 pub response: HttpResponse,
162 pub latency_ns: u64,
164 pub protocol: Protocol,
166 pub process_id: u32,
168 pub remote_port: Option<u16>,
170 pub stream_id: Option<StreamId>,
172}
173
174impl std::fmt::Display for Exchange {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 let proto_str = match self.protocol {
177 Protocol::Http1 => "HTTP/1.1",
178 Protocol::Http2 => "HTTP/2",
179 Protocol::Unknown => "Unknown",
180 };
181 let latency_ms = self.latency_ns as f64 / 1_000_000.0;
182 let port_str = self
183 .remote_port
184 .map_or("unavailable".to_string(), |p| p.to_string());
185
186 writeln!(
187 f,
188 "=== {} Exchange (PID: {}, Port: {}) ===",
189 proto_str, self.process_id, port_str
190 )?;
191 writeln!(f, "Latency: {:.2}ms", latency_ms)?;
192 writeln!(f)?;
193 writeln!(f, "--- Request ---")?;
194 writeln!(f, "{} {}", self.request.method, self.request.uri)?;
195 for (key, value) in &self.request.headers {
196 writeln!(f, "{}: {}", key, value.to_str().unwrap_or("<binary>"))?;
197 }
198 if !self.request.body.is_empty() {
199 writeln!(f)?;
200 writeln!(f, "{}", String::from_utf8_lossy(&self.request.body))?;
201 }
202 writeln!(f)?;
203 writeln!(f, "--- Response ---")?;
204 let reason = self.response.status.canonical_reason().unwrap_or("");
205 writeln!(f, "{} {}", self.response.status.as_u16(), reason)?;
206 for (key, value) in &self.response.headers {
207 writeln!(f, "{}: {}", key, value.to_str().unwrap_or("<binary>"))?;
208 }
209 if !self.response.body.is_empty() {
210 writeln!(f)?;
211 writeln!(f, "{}", String::from_utf8_lossy(&self.response.body))?;
212 }
213 Ok(())
214 }
215}