Skip to main content

http_collator/
lib.rs

1#![warn(missing_docs)]
2//! HTTP collation library
3//!
4//! Collates individual data events (from eBPF, pcap, etc.) into complete
5//! HTTP request/response exchanges. Supports both HTTP/1.x and HTTP/2.
6//!
7//! # Usage
8//!
9//! Implement the [`DataEvent`] trait for your data source, then feed events
10//! to the [`Collator`]:
11//!
12//! ```no_run
13//! use http_collator::{Collator, CollationEvent, DataEvent, Direction};
14//! use bytes::Bytes;
15//!
16//! struct MyEvent {
17//!     payload: Vec<u8>,
18//!     timestamp_ns: u64,
19//!     direction: Direction,
20//!     connection_id: u64,
21//!     process_id: u32,
22//!     remote_port: u16,
23//! }
24//!
25//! impl DataEvent for MyEvent {
26//!     fn payload(&self) -> &[u8] { &self.payload }
27//!     fn timestamp_ns(&self) -> u64 { self.timestamp_ns }
28//!     fn direction(&self) -> Direction { self.direction }
29//!     fn connection_id(&self) -> u64 { self.connection_id }
30//!     fn process_id(&self) -> u32 { self.process_id }
31//!     fn remote_port(&self) -> u16 { self.remote_port }
32//! }
33//!
34//! let collator = Collator::<MyEvent>::new();
35//! # let my_event = MyEvent { payload: vec![], timestamp_ns: 0, direction: Direction::Write, connection_id: 1, process_id: 1, remote_port: 80 };
36//! for event in collator.add_event(my_event) {
37//!     match event {
38//!         CollationEvent::Message { message, metadata } => {
39//!             println!("parsed message for conn {}", metadata.connection_id);
40//!         }
41//!         CollationEvent::Exchange(exchange) => {
42//!             println!("complete exchange: {exchange}");
43//!         }
44//!     }
45//! }
46//! ```
47
48mod connection;
49mod exchange;
50pub mod h1;
51mod traits;
52
53use std::marker::PhantomData;
54
55pub use connection::Protocol;
56use connection::{Connection as Conn, DataChunk};
57use dashmap::DashMap;
58pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
59pub use h1::{HttpRequest, HttpResponse};
60use h2session::{
61    H2ConnectionState,
62    StreamId,
63    TimestampNs,
64    is_http2_preface,
65    looks_like_http2_frame,
66};
67pub use traits::{DataEvent, Direction};
68
69/// Default maximum buffer size for data events (TLS record size)
70pub const MAX_BUF_SIZE: usize = 16384;
71
72/// Collates individual data events into complete request/response exchanges.
73///
74/// Generic over the event type `E` which must implement [`DataEvent`].
75/// Uses per-connection locking via `DashMap` so concurrent HTTP/2 connections
76/// do not serialize through a single mutex.
77pub struct Collator<E: DataEvent> {
78    /// Connections tracked by conn_id (for socket events)
79    connections:     DashMap<u64, Conn>,
80    /// SSL connections tracked by process_id (no conn_id available)
81    ssl_connections: DashMap<u32, Conn>,
82    /// Configuration for what events to emit
83    config:          CollatorConfig,
84    /// Phantom data for the event type
85    _phantom:        PhantomData<E>,
86}
87
88impl<E: DataEvent> Default for Collator<E> {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl<E: DataEvent> Collator<E> {
95    /// Create a new collator with default settings (emits both messages and
96    /// exchanges)
97    pub fn new() -> Self {
98        Self::with_config(CollatorConfig::default())
99    }
100
101    /// Create a new collator with custom configuration
102    pub fn with_config(config: CollatorConfig) -> Self {
103        Self {
104            connections: DashMap::new(),
105            ssl_connections: DashMap::new(),
106            config,
107            _phantom: PhantomData,
108        }
109    }
110
111    /// Create a new collator with a custom maximum buffer size
112    pub fn with_max_buf_size(max_buf_size: usize) -> Self {
113        Self::with_config(CollatorConfig {
114            max_buf_size,
115            ..Default::default()
116        })
117    }
118
119    /// Get a reference to the current configuration
120    pub fn config(&self) -> &CollatorConfig {
121        &self.config
122    }
123
124    /// Process a data event, returning all resulting collation events.
125    ///
126    /// Takes ownership of the event to enable zero-copy transfer of the
127    /// payload into `DataChunk` via `into_payload()`.
128    ///
129    /// Returns Vec because:
130    /// - HTTP/2 can have multiple streams complete in one buffer
131    /// - A single buffer might contain complete request AND start of next
132    /// - Config might emit both messages AND exchanges
133    pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
134        // Extract all scalar metadata before consuming the event
135        let direction = event.direction();
136        let timestamp_ns = TimestampNs(event.timestamp_ns());
137        let conn_id = event.connection_id();
138        let process_id = event.process_id();
139        let remote_port = event.remote_port();
140        let is_empty = event.payload().is_empty();
141
142        if is_empty {
143            return Vec::new();
144        }
145
146        // Skip non-data events
147        if direction == Direction::Other {
148            return Vec::new();
149        }
150
151        // Move payload ownership via into_payload() โ€” avoids cloning for
152        // implementors that already hold a Bytes or Vec<u8>.
153        let data = event.into_payload();
154
155        let chunk = DataChunk {
156            data,
157            timestamp_ns,
158            direction,
159        };
160
161        // DashMap entry() locks only the shard for this connection, allowing
162        // concurrent access to other connections.
163        if conn_id != 0 {
164            let mut conn = self
165                .connections
166                .entry(conn_id)
167                .or_insert_with(|| Conn::new(process_id, remote_port));
168            Self::process_event_for_conn(
169                &mut conn,
170                chunk,
171                direction,
172                timestamp_ns,
173                remote_port,
174                conn_id,
175                process_id,
176                &self.config,
177            )
178        } else {
179            let mut conn = self
180                .ssl_connections
181                .entry(process_id)
182                .or_insert_with(|| Conn::new(process_id, remote_port));
183            Self::process_event_for_conn(
184                &mut conn,
185                chunk,
186                direction,
187                timestamp_ns,
188                remote_port,
189                conn_id,
190                process_id,
191                &self.config,
192            )
193        }
194    }
195
196    /// Core event processing logic, called with a mutable reference to the
197    /// connection (obtained from a `DashMap` entry guard).
198    #[allow(clippy::too_many_arguments)]
199    fn process_event_for_conn(
200        conn: &mut Conn,
201        chunk: DataChunk,
202        direction: Direction,
203        timestamp_ns: TimestampNs,
204        remote_port: u16,
205        conn_id: u64,
206        process_id: u32,
207        config: &CollatorConfig,
208    ) -> Vec<CollationEvent> {
209        let buf: &[u8] = &chunk.data;
210
211        conn.last_activity_ns = timestamp_ns;
212        if remote_port != 0 && conn.remote_port.is_none() {
213            conn.remote_port = Some(remote_port);
214        }
215
216        // Detect protocol from first chunk if unknown
217        if conn.protocol == Protocol::Unknown {
218            conn.protocol = detect_protocol(buf);
219        }
220
221        // Detect protocol change on an established connection (FD reuse).
222        if conn.protocol != Protocol::Unknown {
223            let incoming_protocol = detect_protocol(buf);
224            if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
225                reset_connection_for_protocol_change(conn, incoming_protocol);
226            }
227        }
228
229        // Add chunk to appropriate buffer based on direction
230        match direction {
231            Direction::Write => {
232                conn.request_body_size += buf.len();
233                if conn.request_body_size > config.max_body_size {
234                    reset_connection_body_limit(conn);
235                    return Vec::new();
236                }
237
238                // Buffer for HTTP/1 and Unknown (Unknown may resolve to Http1)
239                if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
240                    conn.h1_write_buffer.extend_from_slice(buf);
241                }
242                conn.request_chunks.push(chunk);
243
244                match conn.protocol {
245                    Protocol::Http1 if !conn.h1_write_parsed => {
246                        try_parse_http1_write_chunks(conn);
247                    },
248                    Protocol::Http2 => {
249                        parse_http2_chunks(conn, direction);
250                    },
251                    // For Unknown protocol, try HTTP/1 parsing โ€” handles non-standard
252                    // methods (WebDAV, etc.) that detect_protocol() doesn't recognize.
253                    Protocol::Unknown => {
254                        try_parse_http1_unknown_write(conn);
255                    },
256                    _ => {},
257                }
258
259                // Content-based parsing: a response can be parsed from the
260                // Write direction (server-side), so check both.
261                if is_request_complete(conn) {
262                    conn.request_complete = true;
263                }
264                if is_response_complete(conn) {
265                    conn.response_complete = true;
266                }
267            },
268            Direction::Read => {
269                conn.response_body_size += buf.len();
270                if conn.response_body_size > config.max_body_size {
271                    reset_connection_body_limit(conn);
272                    return Vec::new();
273                }
274
275                // Buffer for HTTP/1 and Unknown (Unknown may resolve to Http1)
276                if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
277                    conn.h1_read_buffer.extend_from_slice(buf);
278                }
279                conn.response_chunks.push(chunk);
280
281                match conn.protocol {
282                    Protocol::Http1 if !conn.h1_read_parsed => {
283                        try_parse_http1_read_chunks(conn);
284                    },
285                    Protocol::Http2 => {
286                        parse_http2_chunks(conn, direction);
287                    },
288                    // For Unknown protocol, try HTTP/1 parsing โ€” handles non-standard
289                    // methods (WebDAV, etc.) that detect_protocol() doesn't recognize.
290                    Protocol::Unknown => {
291                        try_parse_http1_unknown_read(conn);
292                    },
293                    _ => {},
294                }
295
296                // Content-based parsing: a request can be parsed from the
297                // Read direction (server-side), so check both.
298                if is_request_complete(conn) {
299                    conn.request_complete = true;
300                }
301                if is_response_complete(conn) {
302                    conn.response_complete = true;
303                }
304            },
305            Direction::Other => {
306                return Vec::new();
307            },
308        }
309
310        // For HTTP/2, a complete stream pair can be detected on either event.
311        if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
312            conn.request_complete = true;
313            conn.response_complete = true;
314        }
315
316        let mut events = Vec::new();
317
318        if config.emit_messages {
319            emit_message_events(conn, conn_id, process_id, &mut events);
320        }
321
322        if config.emit_exchanges && conn.request_complete && conn.response_complete {
323            if let Some(exchange) = build_exchange(conn) {
324                events.push(CollationEvent::Exchange(exchange));
325            }
326            reset_connection_after_exchange(conn);
327        }
328
329        events
330    }
331
332    /// Clean up stale connections and evict stale H2 streams.
333    ///
334    /// Callers should invoke this periodically to bound memory usage from
335    /// abandoned connections and incomplete HTTP/2 streams.
336    pub fn cleanup(&self, current_time_ns: TimestampNs) {
337        self.connections.retain(|_, conn| {
338            current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
339        });
340        self.ssl_connections.retain(|_, conn| {
341            current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
342        });
343
344        // Evict stale H2 streams within surviving connections
345        for mut entry in self.connections.iter_mut() {
346            entry.h2_write_state.evict_stale_streams(current_time_ns);
347            entry.h2_read_state.evict_stale_streams(current_time_ns);
348        }
349        for mut entry in self.ssl_connections.iter_mut() {
350            entry.h2_write_state.evict_stale_streams(current_time_ns);
351            entry.h2_read_state.evict_stale_streams(current_time_ns);
352        }
353    }
354
355    /// Remove a connection explicitly (e.g., on connection close)
356    ///
357    /// Call this when a connection is closed to free resources immediately.
358    /// If connection_id is 0, removes based on process_id from SSL connections.
359    pub fn remove_connection(&self, connection_id: u64, process_id: u32) {
360        if connection_id != 0 {
361            self.connections.remove(&connection_id);
362        } else {
363            self.ssl_connections.remove(&process_id);
364        }
365    }
366
367    /// Close a connection, finalizing any pending HTTP/1 response.
368    ///
369    /// For HTTP/1 responses without explicit framing (no Content-Length or
370    /// Transfer-Encoding), RFC 7230 ยง3.3.3 says the body extends until the
371    /// connection closes. This method finalizes such responses with whatever
372    /// body has accumulated so far, emits any resulting events, then removes
373    /// the connection.
374    pub fn close_connection(&self, connection_id: u64, process_id: u32) -> Vec<CollationEvent> {
375        let events = if connection_id != 0 {
376            match self.connections.get_mut(&connection_id) {
377                Some(mut guard) => {
378                    finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
379                },
380                None => Vec::new(),
381            }
382        } else {
383            match self.ssl_connections.get_mut(&process_id) {
384                Some(mut guard) => {
385                    finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
386                },
387                None => Vec::new(),
388            }
389        };
390
391        // Remove the connection after releasing the guard
392        if connection_id != 0 {
393            self.connections.remove(&connection_id);
394        } else {
395            self.ssl_connections.remove(&process_id);
396        }
397
398        events
399    }
400}
401
402/// Finalize any pending HTTP/1 response and emit events on connection close.
403fn finalize_and_emit(
404    conn: &mut Conn,
405    connection_id: u64,
406    process_id: u32,
407    config: &CollatorConfig,
408) -> Vec<CollationEvent> {
409    // Finalize any pending HTTP/1 response with body accumulated so far
410    if conn.protocol == Protocol::Http1
411        && conn.h1_response.is_none()
412        && !conn.h1_read_buffer.is_empty()
413    {
414        let timestamp = conn
415            .response_chunks
416            .first()
417            .map(|c| c.timestamp_ns)
418            .unwrap_or(TimestampNs(0));
419        conn.h1_response = h1::try_finalize_http1_response(&conn.h1_read_buffer, timestamp);
420        if conn.h1_response.is_some() {
421            conn.response_complete = true;
422        }
423    }
424
425    let mut events = Vec::new();
426
427    if config.emit_messages {
428        emit_message_events(conn, connection_id, process_id, &mut events);
429    }
430
431    if config.emit_exchanges
432        && conn.request_complete
433        && conn.response_complete
434        && let Some(exchange) = build_exchange(conn)
435    {
436        events.push(CollationEvent::Exchange(exchange));
437    }
438
439    events
440}
441
442/// Emit Message events for any newly parsed messages that haven't been emitted
443/// yet
444fn emit_message_events(
445    conn: &mut Conn,
446    conn_id: u64,
447    process_id: u32,
448    events: &mut Vec<CollationEvent>,
449) {
450    match conn.protocol {
451        Protocol::Http1 => {
452            // Emit request if parsed and not yet emitted
453            if let Some(ref req) = conn.h1_request
454                && !conn.h1_request_emitted
455            {
456                let metadata = MessageMetadata {
457                    connection_id: conn_id,
458                    process_id,
459                    timestamp_ns: req.timestamp_ns,
460                    stream_id: None,
461                    remote_port: conn.remote_port,
462                    protocol: conn.protocol,
463                };
464                events.push(CollationEvent::Message {
465                    message: ParsedHttpMessage::Request(req.clone()),
466                    metadata,
467                });
468                conn.h1_request_emitted = true;
469            }
470
471            // Emit response if parsed and not yet emitted
472            if let Some(ref resp) = conn.h1_response
473                && !conn.h1_response_emitted
474            {
475                let metadata = MessageMetadata {
476                    connection_id: conn_id,
477                    process_id,
478                    timestamp_ns: resp.timestamp_ns,
479                    stream_id: None,
480                    remote_port: conn.remote_port,
481                    protocol: conn.protocol,
482                };
483                events.push(CollationEvent::Message {
484                    message: ParsedHttpMessage::Response(resp.clone()),
485                    metadata,
486                });
487                conn.h1_response_emitted = true;
488            }
489        },
490        Protocol::Http2 => {
491            // Emit newly parsed HTTP/2 requests
492            for (&stream_id, msg) in &conn.pending_requests {
493                if !conn.h2_emitted_requests.contains(&stream_id)
494                    && let Some(req) = msg.to_http_request()
495                {
496                    let metadata = MessageMetadata {
497                        connection_id: conn_id,
498                        process_id,
499                        timestamp_ns: msg.end_stream_timestamp_ns,
500                        stream_id: Some(stream_id),
501                        remote_port: conn.remote_port,
502                        protocol: conn.protocol,
503                    };
504                    events.push(CollationEvent::Message {
505                        message: ParsedHttpMessage::Request(req),
506                        metadata,
507                    });
508                }
509            }
510            // Mark all current pending requests as emitted
511            conn.h2_emitted_requests
512                .extend(conn.pending_requests.keys().copied());
513
514            // Emit newly parsed HTTP/2 responses
515            for (&stream_id, msg) in &conn.pending_responses {
516                if !conn.h2_emitted_responses.contains(&stream_id)
517                    && let Some(resp) = msg.to_http_response()
518                {
519                    let metadata = MessageMetadata {
520                        connection_id: conn_id,
521                        process_id,
522                        timestamp_ns: msg.first_frame_timestamp_ns,
523                        stream_id: Some(stream_id),
524                        remote_port: conn.remote_port,
525                        protocol: conn.protocol,
526                    };
527                    events.push(CollationEvent::Message {
528                        message: ParsedHttpMessage::Response(resp),
529                        metadata,
530                    });
531                }
532            }
533            // Mark all current pending responses as emitted
534            conn.h2_emitted_responses
535                .extend(conn.pending_responses.keys().copied());
536        },
537        Protocol::Unknown => {},
538    }
539}
540
541/// Reset connection state after emitting an exchange
542fn reset_connection_after_exchange(conn: &mut Conn) {
543    conn.request_complete = false;
544    conn.response_complete = false;
545
546    if conn.protocol == Protocol::Http1 {
547        // HTTP/1: clear everything for the next exchange
548        conn.request_chunks.clear();
549        conn.response_chunks.clear();
550        conn.h1_request = None;
551        conn.h1_response = None;
552        conn.h1_write_parsed = false;
553        conn.h1_read_parsed = false;
554        conn.h1_request_emitted = false;
555        conn.h1_response_emitted = false;
556        conn.h1_write_buffer.clear();
557        conn.h1_read_buffer.clear();
558        conn.protocol = Protocol::Unknown;
559    } else if conn.protocol == Protocol::Http2 {
560        // HTTP/2: only clear chunks if no other pending messages remain.
561        // The matched pair was already removed in build_exchange().
562        // Keep h2_*_state HPACK decoder for connection persistence.
563        // h2_emitted_* sets are cleaned up in build_exchange when
564        // the stream_id is removed from pending.
565        if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
566            conn.request_chunks.clear();
567            conn.response_chunks.clear();
568            conn.h2_write_state.clear_buffer();
569            conn.h2_read_state.clear_buffer();
570        }
571    }
572
573    // Reset body size tracking for the next exchange
574    conn.request_body_size = 0;
575    conn.response_body_size = 0;
576}
577
578/// Reset connection when accumulated body size exceeds the limit.
579/// Drops all accumulated data and parsed state for this connection.
580fn reset_connection_body_limit(conn: &mut Conn) {
581    conn.request_chunks.clear();
582    conn.response_chunks.clear();
583    conn.h1_write_buffer.clear();
584    conn.h1_read_buffer.clear();
585    conn.h1_request = None;
586    conn.h1_response = None;
587    conn.h1_write_parsed = false;
588    conn.h1_read_parsed = false;
589    conn.h1_request_emitted = false;
590    conn.h1_response_emitted = false;
591    conn.h2_write_state = H2ConnectionState::new();
592    conn.h2_read_state = H2ConnectionState::new();
593    conn.pending_requests.clear();
594    conn.pending_responses.clear();
595    conn.h2_emitted_requests.clear();
596    conn.h2_emitted_responses.clear();
597    conn.ready_streams.clear();
598    conn.request_complete = false;
599    conn.response_complete = false;
600    conn.request_body_size = 0;
601    conn.response_body_size = 0;
602    conn.protocol = Protocol::Unknown;
603}
604
605/// Reset connection when the detected protocol changes (FD reuse with
606/// different protocol, e.g., HTTP/2 followed by HTTP/1 on the same fd).
607fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
608    conn.request_chunks.clear();
609    conn.response_chunks.clear();
610    conn.h1_write_buffer.clear();
611    conn.h1_read_buffer.clear();
612    conn.h1_request = None;
613    conn.h1_response = None;
614    conn.h1_write_parsed = false;
615    conn.h1_read_parsed = false;
616    conn.h1_request_emitted = false;
617    conn.h1_response_emitted = false;
618    conn.h2_write_state = H2ConnectionState::new();
619    conn.h2_read_state = H2ConnectionState::new();
620    conn.pending_requests.clear();
621    conn.pending_responses.clear();
622    conn.h2_emitted_requests.clear();
623    conn.h2_emitted_responses.clear();
624    conn.ready_streams.clear();
625    conn.request_complete = false;
626    conn.response_complete = false;
627    conn.request_body_size = 0;
628    conn.response_body_size = 0;
629    conn.protocol = new_protocol;
630}
631
632/// Detect whether raw bytes look like HTTP/1.x or HTTP/2 traffic.
633///
634/// Checks for the HTTP/2 connection preface, HTTP/2 frame headers,
635/// and HTTP/1.x request/response patterns. Returns [`Protocol::Unknown`]
636/// if no pattern matches.
637pub fn detect_protocol(data: &[u8]) -> Protocol {
638    // Check for HTTP/2 preface
639    if is_http2_preface(data) {
640        return Protocol::Http2;
641    }
642
643    // Check for HTTP/2 frame header heuristic
644    if looks_like_http2_frame(data) {
645        return Protocol::Http2;
646    }
647
648    // Check for HTTP/1.x request
649    if h1::is_http1_request(data) || h1::is_http1_response(data) {
650        return Protocol::Http1;
651    }
652
653    Protocol::Unknown
654}
655
656/// Feed chunk to h2session, classify by content after parsing.
657///
658/// Uses separate H2ConnectionState per direction to avoid corrupting frame
659/// boundaries when Read and Write events interleave (e.g., WINDOW_UPDATEs
660/// between DATA frames). Messages are classified by their pseudo-headers
661/// (:method = request, :status = response), supporting both client-side
662/// monitoring (Write=request, Read=response) and server-side monitoring
663/// (Read=request, Write=response).
664fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
665    // Check for fd-reuse: a new h2 connection preface on a connection that
666    // already processed one means the kernel reused the file descriptor for
667    // a new TCP connection. We must reset BOTH directions' parsers since the
668    // new connection has fresh HPACK context on both sides.
669    let last_chunk_is_preface = match direction {
670        Direction::Write => conn
671            .request_chunks
672            .last()
673            .is_some_and(|c| is_http2_preface(&c.data)),
674        Direction::Read => conn
675            .response_chunks
676            .last()
677            .is_some_and(|c| is_http2_preface(&c.data)),
678        Direction::Other => false,
679    };
680    let current_state_has_preface = match direction {
681        Direction::Write => conn.h2_write_state.preface_received,
682        Direction::Read => conn.h2_read_state.preface_received,
683        Direction::Other => false,
684    };
685
686    if last_chunk_is_preface && current_state_has_preface {
687        conn.h2_write_state = H2ConnectionState::new();
688        conn.h2_read_state = H2ConnectionState::new();
689        conn.pending_requests.clear();
690        conn.pending_responses.clear();
691        conn.h2_emitted_requests.clear();
692        conn.h2_emitted_responses.clear();
693        conn.ready_streams.clear();
694    }
695
696    let (chunks, h2_state) = match direction {
697        Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
698        Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
699        Direction::Other => return,
700    };
701
702    let chunk = match chunks.last() {
703        Some(c) => c,
704        None => return,
705    };
706
707    // Feed to direction-specific h2 parser; errors are non-fatal
708    let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
709
710    // Pop completed messages and classify by content, not direction.
711    // Maintain ready_streams set for O(1) complete-pair lookup.
712    while let Some((stream_id, msg)) = h2_state.try_pop() {
713        if msg.is_request() {
714            conn.pending_requests.insert(stream_id, msg);
715            if conn.pending_responses.contains_key(&stream_id) {
716                conn.ready_streams.insert(stream_id);
717            }
718        } else if msg.is_response() {
719            conn.pending_responses.insert(stream_id, msg);
720            if conn.pending_requests.contains_key(&stream_id) {
721                conn.ready_streams.insert(stream_id);
722            }
723        }
724    }
725}
726
727/// Find a stream_id that has both request and response ready (O(1) via
728/// ready_streams set)
729fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
730    conn.ready_streams.iter().next().copied()
731}
732
733/// Try to parse HTTP/1 message from accumulated write-direction buffer.
734/// Tries request first (client-side), then response (server-side).
735fn try_parse_http1_write_chunks(conn: &mut Conn) {
736    let timestamp = conn
737        .request_chunks
738        .last()
739        .map(|c| c.timestamp_ns)
740        .unwrap_or(TimestampNs(0));
741    if let Some(req) = h1::try_parse_http1_request(&conn.h1_write_buffer, timestamp) {
742        conn.h1_request = Some(req);
743        conn.h1_write_parsed = true;
744    } else if let Some(resp) = h1::try_parse_http1_response(&conn.h1_write_buffer, timestamp) {
745        conn.h1_response = Some(resp);
746        conn.h1_write_parsed = true;
747    }
748}
749
750/// Try to parse HTTP/1 message from accumulated read-direction buffer.
751/// Tries response first (client-side), then request (server-side).
752fn try_parse_http1_read_chunks(conn: &mut Conn) {
753    let timestamp = conn
754        .response_chunks
755        .first()
756        .map(|c| c.timestamp_ns)
757        .unwrap_or(TimestampNs(0));
758    if let Some(resp) = h1::try_parse_http1_response(&conn.h1_read_buffer, timestamp) {
759        conn.h1_response = Some(resp);
760        conn.h1_read_parsed = true;
761    } else if let Some(req) = h1::try_parse_http1_request(&conn.h1_read_buffer, timestamp) {
762        conn.h1_request = Some(req);
763        conn.h1_read_parsed = true;
764    }
765}
766
767/// Try HTTP/1 parsing on Unknown-protocol write buffer. If successful,
768/// promotes the connection to Http1.
769fn try_parse_http1_unknown_write(conn: &mut Conn) {
770    let timestamp = conn
771        .request_chunks
772        .last()
773        .map(|c| c.timestamp_ns)
774        .unwrap_or(TimestampNs(0));
775    if let Some(req) = h1::try_parse_http1_request(&conn.h1_write_buffer, timestamp) {
776        conn.protocol = Protocol::Http1;
777        conn.h1_request = Some(req);
778        conn.h1_write_parsed = true;
779    } else if let Some(resp) = h1::try_parse_http1_response(&conn.h1_write_buffer, timestamp) {
780        conn.protocol = Protocol::Http1;
781        conn.h1_response = Some(resp);
782        conn.h1_write_parsed = true;
783    }
784}
785
786/// Try HTTP/1 parsing on Unknown-protocol read buffer. If successful,
787/// promotes the connection to Http1.
788fn try_parse_http1_unknown_read(conn: &mut Conn) {
789    let timestamp = conn
790        .response_chunks
791        .first()
792        .map(|c| c.timestamp_ns)
793        .unwrap_or(TimestampNs(0));
794    if let Some(resp) = h1::try_parse_http1_response(&conn.h1_read_buffer, timestamp) {
795        conn.protocol = Protocol::Http1;
796        conn.h1_response = Some(resp);
797        conn.h1_read_parsed = true;
798    } else if let Some(req) = h1::try_parse_http1_request(&conn.h1_read_buffer, timestamp) {
799        conn.protocol = Protocol::Http1;
800        conn.h1_request = Some(req);
801        conn.h1_read_parsed = true;
802    }
803}
804
805fn is_request_complete(conn: &Conn) -> bool {
806    match conn.protocol {
807        Protocol::Http1 => conn.h1_request.is_some(),
808        Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
809        Protocol::Unknown => false,
810    }
811}
812
813fn is_response_complete(conn: &Conn) -> bool {
814    match conn.protocol {
815        Protocol::Http1 => conn.h1_response.is_some(),
816        Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
817        Protocol::Unknown => false,
818    }
819}
820
821fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
822    let (request, response, stream_id, latency_ns) = match conn.protocol {
823        Protocol::Http1 => {
824            // Take the already-parsed request and response
825            let req = conn.h1_request.take()?;
826            let resp = conn.h1_response.take()?;
827            let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
828            (req, resp, None, latency)
829        },
830        Protocol::Http2 => {
831            let sid = find_complete_h2_stream(conn)?;
832            let msg_req = conn.pending_requests.remove(&sid)?;
833            let msg_resp = conn.pending_responses.remove(&sid)?;
834
835            // Clean up emission and ready tracking for this stream
836            conn.h2_emitted_requests.remove(&sid);
837            conn.h2_emitted_responses.remove(&sid);
838            conn.ready_streams.remove(&sid);
839
840            // For HTTP/2, use per-stream timestamps from the parsed messages
841            // Request complete time: when END_STREAM was seen on request
842            // Response start time: when first frame was received on response
843            let request_complete_time = msg_req.end_stream_timestamp_ns;
844            let response_start_time = msg_resp.first_frame_timestamp_ns;
845
846            let req = msg_req.into_http_request()?;
847            let resp = msg_resp.into_http_response()?;
848
849            let latency = response_start_time.saturating_sub(request_complete_time);
850            (req, resp, Some(sid), latency)
851        },
852        Protocol::Unknown => return None,
853    };
854
855    Some(Exchange {
856        request,
857        response,
858        latency_ns,
859        protocol: conn.protocol,
860        process_id: conn.process_id,
861        remote_port: conn.remote_port,
862        stream_id,
863    })
864}
865
866#[cfg(test)]
867mod tests;