Skip to main content

http_collator/
exchange.rs

1//! HTTP exchange (request/response pair) and collation events
2
3use h2session::{StreamId, TimestampNs};
4
5use crate::{
6    connection::Protocol,
7    h1::{HttpRequest, HttpResponse},
8};
9
10/// Classification of parsed HTTP message
11#[derive(Debug, Clone)]
12pub enum ParsedHttpMessage {
13    /// A parsed HTTP request
14    Request(HttpRequest),
15    /// A parsed HTTP response
16    Response(HttpResponse),
17}
18
19impl ParsedHttpMessage {
20    /// Returns true if this is a request
21    pub fn is_request(&self) -> bool {
22        matches!(self, Self::Request(_))
23    }
24
25    /// Returns true if this is a response
26    pub fn is_response(&self) -> bool {
27        matches!(self, Self::Response(_))
28    }
29
30    /// Get the request if this is a request, None otherwise
31    pub fn as_request(&self) -> Option<&HttpRequest> {
32        match self {
33            Self::Request(req) => Some(req),
34            Self::Response(_) => None,
35        }
36    }
37
38    /// Get the response if this is a response, None otherwise
39    pub fn as_response(&self) -> Option<&HttpResponse> {
40        match self {
41            Self::Request(_) => None,
42            Self::Response(resp) => Some(resp),
43        }
44    }
45}
46
47/// Metadata about a parsed message
48#[derive(Debug, Clone)]
49pub struct MessageMetadata {
50    /// Connection identifier (0 if unavailable, falls back to process_id)
51    pub connection_id: u64,
52    /// Process ID for connection tracking
53    pub process_id:    u32,
54    /// Timestamp in nanoseconds
55    pub timestamp_ns:  TimestampNs,
56    /// Stream ID for HTTP/2 (None for HTTP/1)
57    pub stream_id:     Option<StreamId>,
58    /// Remote port (None if unavailable)
59    pub remote_port:   Option<u16>,
60    /// Protocol detected for this connection
61    pub protocol:      Protocol,
62}
63
64/// Events emitted by the collator
65#[derive(Debug)]
66pub enum CollationEvent {
67    /// Individual message parsed and ready for processing
68    Message {
69        /// The parsed HTTP message (request or response)
70        message:  ParsedHttpMessage,
71        /// Connection and timing metadata for this message
72        metadata: MessageMetadata,
73    },
74    /// Complete exchange with latency (request + response matched)
75    Exchange(Exchange),
76}
77
78impl CollationEvent {
79    /// Returns true if this is a Message event
80    pub fn is_message(&self) -> bool {
81        matches!(self, Self::Message { .. })
82    }
83
84    /// Returns true if this is an Exchange event
85    pub fn is_exchange(&self) -> bool {
86        matches!(self, Self::Exchange(_))
87    }
88
89    /// Get the message if this is a Message event
90    pub fn as_message(&self) -> Option<(&ParsedHttpMessage, &MessageMetadata)> {
91        match self {
92            Self::Message { message, metadata } => Some((message, metadata)),
93            Self::Exchange(_) => None,
94        }
95    }
96
97    /// Get the exchange if this is an Exchange event
98    pub fn as_exchange(&self) -> Option<&Exchange> {
99        match self {
100            Self::Message { .. } => None,
101            Self::Exchange(ex) => Some(ex),
102        }
103    }
104}
105
106/// Configuration for what the collator emits
107#[derive(Debug, Clone)]
108pub struct CollatorConfig {
109    /// Emit Message events when individual requests/responses are parsed
110    pub emit_messages:  bool,
111    /// Emit Exchange events when request/response pairs complete
112    pub emit_exchanges: bool,
113    /// Maximum buffer size per chunk
114    pub max_buf_size:   usize,
115    /// Connection timeout for cleanup in nanoseconds
116    pub timeout_ns:     u64,
117    /// Maximum accumulated body size per direction before the connection is
118    /// reset. Prevents unbounded memory growth from stalled or malicious
119    /// connections. Default: 10 MiB.
120    pub max_body_size:  usize,
121}
122
123impl Default for CollatorConfig {
124    fn default() -> Self {
125        Self {
126            emit_messages:  true,
127            emit_exchanges: true,
128            max_buf_size:   16384,
129            timeout_ns:     5_000_000_000,
130            max_body_size:  10 * 1024 * 1024, // 10 MiB
131        }
132    }
133}
134
135impl CollatorConfig {
136    /// Create config that only emits messages (for immediate adjudication)
137    pub fn messages_only() -> Self {
138        Self {
139            emit_messages: true,
140            emit_exchanges: false,
141            ..Default::default()
142        }
143    }
144
145    /// Create config that only emits exchanges (for monitoring/APM)
146    pub fn exchanges_only() -> Self {
147        Self {
148            emit_messages: false,
149            emit_exchanges: true,
150            ..Default::default()
151        }
152    }
153}
154
155/// A complete request/response exchange
156#[derive(Debug)]
157pub struct Exchange {
158    /// The HTTP request
159    pub request:     HttpRequest,
160    /// The matched HTTP response
161    pub response:    HttpResponse,
162    /// Time between request completion and response start, in nanoseconds
163    pub latency_ns:  u64,
164    /// Protocol used for this exchange
165    pub protocol:    Protocol,
166    /// OS process ID that handled this connection
167    pub process_id:  u32,
168    /// Remote port, None if unavailable (e.g., SSL without socket fd)
169    pub remote_port: Option<u16>,
170    /// Stream ID for HTTP/2 (None for HTTP/1)
171    pub stream_id:   Option<StreamId>,
172}
173
174impl std::fmt::Display for Exchange {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        let proto_str = match self.protocol {
177            Protocol::Http1 => "HTTP/1.1",
178            Protocol::Http2 => "HTTP/2",
179            Protocol::Unknown => "Unknown",
180        };
181        let latency_ms = self.latency_ns as f64 / 1_000_000.0;
182        let port_str = self
183            .remote_port
184            .map_or("unavailable".to_string(), |p| p.to_string());
185
186        writeln!(
187            f,
188            "=== {} Exchange (PID: {}, Port: {}) ===",
189            proto_str, self.process_id, port_str
190        )?;
191        writeln!(f, "Latency: {:.2}ms", latency_ms)?;
192        writeln!(f)?;
193        writeln!(f, "--- Request ---")?;
194        writeln!(f, "{} {}", self.request.method, self.request.uri)?;
195        for (key, value) in &self.request.headers {
196            writeln!(f, "{}: {}", key, value.to_str().unwrap_or("<binary>"))?;
197        }
198        if !self.request.body.is_empty() {
199            writeln!(f)?;
200            writeln!(f, "{}", String::from_utf8_lossy(&self.request.body))?;
201        }
202        writeln!(f)?;
203        writeln!(f, "--- Response ---")?;
204        let reason = self.response.status.canonical_reason().unwrap_or("");
205        writeln!(f, "{} {}", self.response.status.as_u16(), reason)?;
206        for (key, value) in &self.response.headers {
207            writeln!(f, "{}: {}", key, value.to_str().unwrap_or("<binary>"))?;
208        }
209        if !self.response.body.is_empty() {
210            writeln!(f)?;
211            writeln!(f, "{}", String::from_utf8_lossy(&self.response.body))?;
212        }
213        Ok(())
214    }
215}