Skip to main content

http_collator/
lib.rs

1#![warn(missing_docs)]
2//! HTTP collation library
3//!
4//! Collates individual data events (from eBPF, pcap, etc.) into complete
5//! HTTP request/response exchanges. Supports both HTTP/1.x and HTTP/2.
6//!
7//! # Usage
8//!
9//! Implement the [`DataEvent`] trait for your data source, then feed events
10//! to the [`Collator`]:
11//!
12//! ```no_run
13//! use http_collator::{Collator, CollationEvent, DataEvent, Direction};
14//! use bytes::Bytes;
15//!
16//! struct MyEvent {
17//!     payload: Vec<u8>,
18//!     timestamp_ns: u64,
19//!     direction: Direction,
20//!     connection_id: u128,
21//!     process_id: u32,
22//!     remote_port: u16,
23//! }
24//!
25//! impl DataEvent for MyEvent {
26//!     fn payload(&self) -> &[u8] { &self.payload }
27//!     fn timestamp_ns(&self) -> u64 { self.timestamp_ns }
28//!     fn direction(&self) -> Direction { self.direction }
29//!     fn connection_id(&self) -> u128 { self.connection_id }
30//!     fn process_id(&self) -> u32 { self.process_id }
31//!     fn remote_port(&self) -> u16 { self.remote_port }
32//! }
33//!
34//! let collator = Collator::<MyEvent>::new();
35//! # let my_event = MyEvent { payload: vec![], timestamp_ns: 0, direction: Direction::Write, connection_id: 1, process_id: 1, remote_port: 80 };
36//! for event in collator.add_event(my_event) {
37//!     match event {
38//!         CollationEvent::Message { message, metadata } => {
39//!             println!("parsed message for conn {}", metadata.connection_id);
40//!         }
41//!         CollationEvent::Exchange(exchange) => {
42//!             println!("complete exchange: {exchange}");
43//!         }
44//!     }
45//! }
46//! ```
47
48mod connection;
49mod exchange;
50pub mod h1;
51mod traits;
52
53use std::marker::PhantomData;
54
55pub use connection::Protocol;
56use connection::{Connection as Conn, DataChunk};
57use dashmap::DashMap;
58pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
59pub use h1::{HttpRequest, HttpResponse};
60use h2session::{
61    H2ConnectionState,
62    StreamId,
63    TimestampNs,
64    is_http2_preface,
65    looks_like_http2_frame,
66};
67pub use traits::{DataEvent, Direction};
68
69/// Default maximum buffer size for data events (TLS record size)
70pub const MAX_BUF_SIZE: usize = 16384;
71
72/// Collates individual data events into complete request/response exchanges.
73///
74/// Generic over the event type `E` which must implement [`DataEvent`].
75/// Uses per-connection locking via `DashMap` so concurrent HTTP/2 connections
76/// do not serialize through a single mutex.
77pub struct Collator<E: DataEvent> {
78    /// Connections tracked by conn_id (for socket events)
79    connections:     DashMap<u128, Conn>,
80    /// SSL connections tracked by process_id (no conn_id available)
81    ssl_connections: DashMap<u32, Conn>,
82    /// Configuration for what events to emit
83    config:          CollatorConfig,
84    /// Phantom data for the event type
85    _phantom:        PhantomData<E>,
86}
87
88impl<E: DataEvent> Default for Collator<E> {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl<E: DataEvent> Collator<E> {
95    /// Create a new collator with default settings (emits both messages and
96    /// exchanges)
97    pub fn new() -> Self {
98        Self::with_config(CollatorConfig::default())
99    }
100
101    /// Create a new collator with custom configuration
102    pub fn with_config(config: CollatorConfig) -> Self {
103        Self {
104            connections: DashMap::new(),
105            ssl_connections: DashMap::new(),
106            config,
107            _phantom: PhantomData,
108        }
109    }
110
111    /// Create a new collator with a custom maximum buffer size
112    pub fn with_max_buf_size(max_buf_size: usize) -> Self {
113        Self::with_config(CollatorConfig {
114            max_buf_size,
115            ..Default::default()
116        })
117    }
118
119    /// Get a reference to the current configuration
120    pub fn config(&self) -> &CollatorConfig {
121        &self.config
122    }
123
124    /// Process a data event, returning all resulting collation events.
125    ///
126    /// Takes ownership of the event to enable zero-copy transfer of the
127    /// payload into `DataChunk` via `into_payload()`.
128    ///
129    /// Returns Vec because:
130    /// - HTTP/2 can have multiple streams complete in one buffer
131    /// - A single buffer might contain complete request AND start of next
132    /// - Config might emit both messages AND exchanges
133    pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
134        // Extract all scalar metadata before consuming the event
135        let direction = event.direction();
136        let timestamp_ns = TimestampNs(event.timestamp_ns());
137        let conn_id = event.connection_id();
138        let process_id = event.process_id();
139        let remote_port = event.remote_port();
140        let is_empty = event.payload().is_empty();
141
142        if is_empty {
143            return Vec::new();
144        }
145
146        // Skip non-data events
147        if direction == Direction::Other {
148            return Vec::new();
149        }
150
151        // Move payload ownership via into_payload() โ€” avoids cloning for
152        // implementors that already hold a Bytes or Vec<u8>.
153        let data = event.into_payload();
154
155        let chunk = DataChunk {
156            data,
157            timestamp_ns,
158            direction,
159        };
160
161        // DashMap entry() locks only the shard for this connection, allowing
162        // concurrent access to other connections.
163        if conn_id != 0 {
164            let mut conn = self
165                .connections
166                .entry(conn_id)
167                .or_insert_with(|| Conn::new(process_id, remote_port));
168            Self::process_event_for_conn(
169                &mut conn,
170                chunk,
171                direction,
172                timestamp_ns,
173                remote_port,
174                conn_id,
175                process_id,
176                &self.config,
177            )
178        } else {
179            let mut conn = self
180                .ssl_connections
181                .entry(process_id)
182                .or_insert_with(|| Conn::new(process_id, remote_port));
183            Self::process_event_for_conn(
184                &mut conn,
185                chunk,
186                direction,
187                timestamp_ns,
188                remote_port,
189                conn_id,
190                process_id,
191                &self.config,
192            )
193        }
194    }
195
196    /// Core event processing logic, called with a mutable reference to the
197    /// connection (obtained from a `DashMap` entry guard).
198    #[allow(clippy::too_many_arguments)]
199    fn process_event_for_conn(
200        conn: &mut Conn,
201        chunk: DataChunk,
202        direction: Direction,
203        timestamp_ns: TimestampNs,
204        remote_port: u16,
205        conn_id: u128,
206        process_id: u32,
207        config: &CollatorConfig,
208    ) -> Vec<CollationEvent> {
209        let buf: &[u8] = &chunk.data;
210
211        conn.last_activity_ns = timestamp_ns;
212        if remote_port != 0 && conn.remote_port.is_none() {
213            conn.remote_port = Some(remote_port);
214        }
215
216        // Detect protocol from first chunk if unknown
217        if conn.protocol == Protocol::Unknown {
218            conn.protocol = detect_protocol(buf);
219        }
220
221        // Detect protocol change on an established connection (FD reuse).
222        if conn.protocol != Protocol::Unknown {
223            let incoming_protocol = detect_protocol(buf);
224            if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
225                reset_connection_for_protocol_change(conn, incoming_protocol);
226            }
227        }
228
229        let mut events = Vec::new();
230
231        // Add chunk to appropriate buffer based on direction
232        match direction {
233            Direction::Write => {
234                conn.request_body_size += buf.len();
235                if conn.request_body_size > config.max_body_size {
236                    reset_connection_body_limit(conn);
237                    return Vec::new();
238                }
239
240                // Buffer for HTTP/1 and Unknown (Unknown may resolve to Http1)
241                if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
242                    conn.h1_write_buffer.extend_from_slice(buf);
243                }
244                conn.request_chunks.push(chunk);
245
246                match conn.protocol {
247                    Protocol::Http1 => {
248                        drain_parse_emit_http1_write(
249                            conn,
250                            conn_id,
251                            process_id,
252                            config,
253                            &mut events,
254                        );
255                    },
256                    Protocol::Http2 => {
257                        parse_http2_chunks(conn, direction);
258                    },
259                    // For Unknown protocol, try HTTP/1 parsing โ€” handles non-standard
260                    // methods (WebDAV, etc.) that detect_protocol() doesn't recognize.
261                    Protocol::Unknown => {
262                        drain_parse_emit_http1_unknown_write(
263                            conn,
264                            conn_id,
265                            process_id,
266                            config,
267                            &mut events,
268                        );
269                    },
270                }
271
272                // Content-based parsing: a response can be parsed from the
273                // Write direction (server-side), so check both.
274                if is_request_complete(conn) {
275                    conn.request_complete = true;
276                }
277                if is_response_complete(conn) {
278                    conn.response_complete = true;
279                }
280            },
281            Direction::Read => {
282                conn.response_body_size += buf.len();
283                if conn.response_body_size > config.max_body_size {
284                    reset_connection_body_limit(conn);
285                    return Vec::new();
286                }
287
288                // Buffer for HTTP/1 and Unknown (Unknown may resolve to Http1)
289                if conn.protocol == Protocol::Http1 || conn.protocol == Protocol::Unknown {
290                    conn.h1_read_buffer.extend_from_slice(buf);
291                }
292                conn.response_chunks.push(chunk);
293
294                match conn.protocol {
295                    Protocol::Http1 => {
296                        drain_parse_emit_http1_read(
297                            conn,
298                            conn_id,
299                            process_id,
300                            config,
301                            &mut events,
302                        );
303                    },
304                    Protocol::Http2 => {
305                        parse_http2_chunks(conn, direction);
306                    },
307                    // For Unknown protocol, try HTTP/1 parsing โ€” handles non-standard
308                    // methods (WebDAV, etc.) that detect_protocol() doesn't recognize.
309                    Protocol::Unknown => {
310                        drain_parse_emit_http1_unknown_read(
311                            conn,
312                            conn_id,
313                            process_id,
314                            config,
315                            &mut events,
316                        );
317                    },
318                }
319
320                // Content-based parsing: a request can be parsed from the
321                // Read direction (server-side), so check both.
322                if is_request_complete(conn) {
323                    conn.request_complete = true;
324                }
325                if is_response_complete(conn) {
326                    conn.response_complete = true;
327                }
328            },
329            Direction::Other => {
330                return Vec::new();
331            },
332        }
333
334        // For HTTP/2, a complete stream pair can be detected on either event.
335        if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
336            conn.request_complete = true;
337            conn.response_complete = true;
338        }
339
340        // HTTP/1 Messages were already pushed directly by the drain-parse-emit
341        // helpers above (so pipelined keep-alive requests all surface). This
342        // call is still needed for HTTP/2 messages and for the finalize path
343        // where a connection-close produces an h1_response that hasn't been
344        // emitted yet.
345        if config.emit_messages {
346            emit_message_events(conn, conn_id, process_id, &mut events);
347        }
348
349        if config.emit_exchanges && conn.request_complete && conn.response_complete {
350            if let Some(exchange) = build_exchange(conn) {
351                events.push(CollationEvent::Exchange(exchange));
352            }
353            reset_connection_after_exchange(conn);
354        }
355
356        events
357    }
358
359    /// Clean up stale connections and evict stale H2 streams.
360    ///
361    /// Callers should invoke this periodically to bound memory usage from
362    /// abandoned connections and incomplete HTTP/2 streams.
363    pub fn cleanup(&self, current_time_ns: TimestampNs) {
364        self.connections.retain(|_, conn| {
365            current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
366        });
367        self.ssl_connections.retain(|_, conn| {
368            current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
369        });
370
371        // Evict stale H2 streams within surviving connections
372        for mut entry in self.connections.iter_mut() {
373            entry.h2_write_state.evict_stale_streams(current_time_ns);
374            entry.h2_read_state.evict_stale_streams(current_time_ns);
375        }
376        for mut entry in self.ssl_connections.iter_mut() {
377            entry.h2_write_state.evict_stale_streams(current_time_ns);
378            entry.h2_read_state.evict_stale_streams(current_time_ns);
379        }
380    }
381
382    /// Remove a connection explicitly (e.g., on connection close)
383    ///
384    /// Call this when a connection is closed to free resources immediately.
385    /// If connection_id is 0, removes based on process_id from SSL connections.
386    pub fn remove_connection(&self, connection_id: u128, process_id: u32) {
387        if connection_id != 0 {
388            self.connections.remove(&connection_id);
389        } else {
390            self.ssl_connections.remove(&process_id);
391        }
392    }
393
394    /// Close a connection, finalizing any pending HTTP/1 response.
395    ///
396    /// For HTTP/1 responses without explicit framing (no Content-Length or
397    /// Transfer-Encoding), RFC 7230 ยง3.3.3 says the body extends until the
398    /// connection closes. This method finalizes such responses with whatever
399    /// body has accumulated so far, emits any resulting events, then removes
400    /// the connection.
401    pub fn close_connection(&self, connection_id: u128, process_id: u32) -> Vec<CollationEvent> {
402        let events = if connection_id != 0 {
403            match self.connections.get_mut(&connection_id) {
404                Some(mut guard) => {
405                    finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
406                },
407                None => Vec::new(),
408            }
409        } else {
410            match self.ssl_connections.get_mut(&process_id) {
411                Some(mut guard) => {
412                    finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
413                },
414                None => Vec::new(),
415            }
416        };
417
418        // Remove the connection after releasing the guard
419        if connection_id != 0 {
420            self.connections.remove(&connection_id);
421        } else {
422            self.ssl_connections.remove(&process_id);
423        }
424
425        events
426    }
427}
428
429/// Finalize any pending HTTP/1 response and emit events on connection close.
430fn finalize_and_emit(
431    conn: &mut Conn,
432    connection_id: u128,
433    process_id: u32,
434    config: &CollatorConfig,
435) -> Vec<CollationEvent> {
436    // Finalize any pending HTTP/1 response with body accumulated so far
437    if conn.protocol == Protocol::Http1
438        && conn.h1_response.is_none()
439        && !conn.h1_read_buffer.is_empty()
440    {
441        let timestamp = conn
442            .response_chunks
443            .first()
444            .map(|c| c.timestamp_ns)
445            .unwrap_or(TimestampNs(0));
446        conn.h1_response = h1::try_finalize_http1_response(&conn.h1_read_buffer, timestamp);
447        if conn.h1_response.is_some() {
448            conn.response_complete = true;
449        }
450    }
451
452    let mut events = Vec::new();
453
454    if config.emit_messages {
455        emit_message_events(conn, connection_id, process_id, &mut events);
456    }
457
458    if config.emit_exchanges
459        && conn.request_complete
460        && conn.response_complete
461        && let Some(exchange) = build_exchange(conn)
462    {
463        events.push(CollationEvent::Exchange(exchange));
464    }
465
466    events
467}
468
469/// Emit Message events for any newly parsed messages that haven't been emitted
470/// yet
471fn emit_message_events(
472    conn: &mut Conn,
473    conn_id: u128,
474    process_id: u32,
475    events: &mut Vec<CollationEvent>,
476) {
477    match conn.protocol {
478        Protocol::Http1 => {
479            // Emit request if parsed and not yet emitted
480            if let Some(ref req) = conn.h1_request
481                && !conn.h1_request_emitted
482            {
483                let metadata = MessageMetadata {
484                    connection_id: conn_id,
485                    process_id,
486                    timestamp_ns: req.timestamp_ns,
487                    stream_id: None,
488                    remote_port: conn.remote_port,
489                    protocol: conn.protocol,
490                };
491                events.push(CollationEvent::Message {
492                    message: ParsedHttpMessage::Request(req.clone()),
493                    metadata,
494                });
495                conn.h1_request_emitted = true;
496            }
497
498            // Emit response if parsed and not yet emitted
499            if let Some(ref resp) = conn.h1_response
500                && !conn.h1_response_emitted
501            {
502                let metadata = MessageMetadata {
503                    connection_id: conn_id,
504                    process_id,
505                    timestamp_ns: resp.timestamp_ns,
506                    stream_id: None,
507                    remote_port: conn.remote_port,
508                    protocol: conn.protocol,
509                };
510                events.push(CollationEvent::Message {
511                    message: ParsedHttpMessage::Response(resp.clone()),
512                    metadata,
513                });
514                conn.h1_response_emitted = true;
515            }
516        },
517        Protocol::Http2 => {
518            // Emit newly parsed HTTP/2 requests
519            for (&stream_id, msg) in &conn.pending_requests {
520                if !conn.h2_emitted_requests.contains(&stream_id)
521                    && let Some(req) = msg.to_http_request()
522                {
523                    let metadata = MessageMetadata {
524                        connection_id: conn_id,
525                        process_id,
526                        timestamp_ns: msg.end_stream_timestamp_ns,
527                        stream_id: Some(stream_id),
528                        remote_port: conn.remote_port,
529                        protocol: conn.protocol,
530                    };
531                    events.push(CollationEvent::Message {
532                        message: ParsedHttpMessage::Request(req),
533                        metadata,
534                    });
535                }
536            }
537            // Mark all current pending requests as emitted
538            conn.h2_emitted_requests
539                .extend(conn.pending_requests.keys().copied());
540
541            // Emit newly parsed HTTP/2 responses
542            for (&stream_id, msg) in &conn.pending_responses {
543                if !conn.h2_emitted_responses.contains(&stream_id)
544                    && let Some(resp) = msg.to_http_response()
545                {
546                    let metadata = MessageMetadata {
547                        connection_id: conn_id,
548                        process_id,
549                        timestamp_ns: msg.first_frame_timestamp_ns,
550                        stream_id: Some(stream_id),
551                        remote_port: conn.remote_port,
552                        protocol: conn.protocol,
553                    };
554                    events.push(CollationEvent::Message {
555                        message: ParsedHttpMessage::Response(resp),
556                        metadata,
557                    });
558                }
559            }
560            // Mark all current pending responses as emitted
561            conn.h2_emitted_responses
562                .extend(conn.pending_responses.keys().copied());
563        },
564        Protocol::Unknown => {},
565    }
566}
567
568/// Reset connection state after emitting an exchange
569fn reset_connection_after_exchange(conn: &mut Conn) {
570    conn.request_complete = false;
571    conn.response_complete = false;
572
573    if conn.protocol == Protocol::Http1 {
574        // HTTP/1: clear everything for the next exchange
575        conn.request_chunks.clear();
576        conn.response_chunks.clear();
577        conn.h1_request = None;
578        conn.h1_response = None;
579        conn.h1_write_parsed = false;
580        conn.h1_read_parsed = false;
581        conn.h1_request_emitted = false;
582        conn.h1_response_emitted = false;
583        conn.h1_write_buffer.clear();
584        conn.h1_read_buffer.clear();
585        conn.protocol = Protocol::Unknown;
586    } else if conn.protocol == Protocol::Http2 {
587        // HTTP/2: only clear chunks if no other pending messages remain.
588        // The matched pair was already removed in build_exchange().
589        // Keep h2_*_state HPACK decoder for connection persistence.
590        // h2_emitted_* sets are cleaned up in build_exchange when
591        // the stream_id is removed from pending.
592        if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
593            conn.request_chunks.clear();
594            conn.response_chunks.clear();
595            conn.h2_write_state.clear_buffer();
596            conn.h2_read_state.clear_buffer();
597        }
598    }
599
600    // Reset body size tracking for the next exchange
601    conn.request_body_size = 0;
602    conn.response_body_size = 0;
603}
604
605/// Reset connection when accumulated body size exceeds the limit.
606/// Drops all accumulated data and parsed state for this connection.
607fn reset_connection_body_limit(conn: &mut Conn) {
608    conn.request_chunks.clear();
609    conn.response_chunks.clear();
610    conn.h1_write_buffer.clear();
611    conn.h1_read_buffer.clear();
612    conn.h1_request = None;
613    conn.h1_response = None;
614    conn.h1_write_parsed = false;
615    conn.h1_read_parsed = false;
616    conn.h1_request_emitted = false;
617    conn.h1_response_emitted = false;
618    conn.h2_write_state = H2ConnectionState::new();
619    conn.h2_read_state = H2ConnectionState::new();
620    conn.pending_requests.clear();
621    conn.pending_responses.clear();
622    conn.h2_emitted_requests.clear();
623    conn.h2_emitted_responses.clear();
624    conn.ready_streams.clear();
625    conn.request_complete = false;
626    conn.response_complete = false;
627    conn.request_body_size = 0;
628    conn.response_body_size = 0;
629    conn.protocol = Protocol::Unknown;
630}
631
632/// Reset connection when the detected protocol changes (FD reuse with
633/// different protocol, e.g., HTTP/2 followed by HTTP/1 on the same fd).
634fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
635    conn.request_chunks.clear();
636    conn.response_chunks.clear();
637    conn.h1_write_buffer.clear();
638    conn.h1_read_buffer.clear();
639    conn.h1_request = None;
640    conn.h1_response = None;
641    conn.h1_write_parsed = false;
642    conn.h1_read_parsed = false;
643    conn.h1_request_emitted = false;
644    conn.h1_response_emitted = false;
645    conn.h2_write_state = H2ConnectionState::new();
646    conn.h2_read_state = H2ConnectionState::new();
647    conn.pending_requests.clear();
648    conn.pending_responses.clear();
649    conn.h2_emitted_requests.clear();
650    conn.h2_emitted_responses.clear();
651    conn.ready_streams.clear();
652    conn.request_complete = false;
653    conn.response_complete = false;
654    conn.request_body_size = 0;
655    conn.response_body_size = 0;
656    conn.protocol = new_protocol;
657}
658
659/// Detect whether raw bytes look like HTTP/1.x or HTTP/2 traffic.
660///
661/// Checks for the HTTP/2 connection preface, HTTP/2 frame headers,
662/// and HTTP/1.x request/response patterns. Returns [`Protocol::Unknown`]
663/// if no pattern matches.
664pub fn detect_protocol(data: &[u8]) -> Protocol {
665    // Check for HTTP/2 preface
666    if is_http2_preface(data) {
667        return Protocol::Http2;
668    }
669
670    // Check for HTTP/2 frame header heuristic
671    if looks_like_http2_frame(data) {
672        return Protocol::Http2;
673    }
674
675    // Check for HTTP/1.x request
676    if h1::is_http1_request(data) || h1::is_http1_response(data) {
677        return Protocol::Http1;
678    }
679
680    Protocol::Unknown
681}
682
683/// Feed chunk to h2session, classify by content after parsing.
684///
685/// Uses separate H2ConnectionState per direction to avoid corrupting frame
686/// boundaries when Read and Write events interleave (e.g., WINDOW_UPDATEs
687/// between DATA frames). Messages are classified by their pseudo-headers
688/// (:method = request, :status = response), supporting both client-side
689/// monitoring (Write=request, Read=response) and server-side monitoring
690/// (Read=request, Write=response).
691fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
692    // Check for fd-reuse: a new h2 connection preface on a connection that
693    // already processed one means the kernel reused the file descriptor for
694    // a new TCP connection. We must reset BOTH directions' parsers since the
695    // new connection has fresh HPACK context on both sides.
696    let last_chunk_is_preface = match direction {
697        Direction::Write => conn
698            .request_chunks
699            .last()
700            .is_some_and(|c| is_http2_preface(&c.data)),
701        Direction::Read => conn
702            .response_chunks
703            .last()
704            .is_some_and(|c| is_http2_preface(&c.data)),
705        Direction::Other => false,
706    };
707    let current_state_has_preface = match direction {
708        Direction::Write => conn.h2_write_state.preface_received,
709        Direction::Read => conn.h2_read_state.preface_received,
710        Direction::Other => false,
711    };
712
713    if last_chunk_is_preface && current_state_has_preface {
714        conn.h2_write_state = H2ConnectionState::new();
715        conn.h2_read_state = H2ConnectionState::new();
716        conn.pending_requests.clear();
717        conn.pending_responses.clear();
718        conn.h2_emitted_requests.clear();
719        conn.h2_emitted_responses.clear();
720        conn.ready_streams.clear();
721    }
722
723    let (chunks, h2_state) = match direction {
724        Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
725        Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
726        Direction::Other => return,
727    };
728
729    let chunk = match chunks.last() {
730        Some(c) => c,
731        None => return,
732    };
733
734    // Feed to direction-specific h2 parser; errors are non-fatal
735    let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
736
737    // Pop completed messages and classify by content, not direction.
738    // Maintain ready_streams set for O(1) complete-pair lookup.
739    while let Some((stream_id, msg)) = h2_state.try_pop() {
740        if msg.is_request() {
741            conn.pending_requests.insert(stream_id, msg);
742            if conn.pending_responses.contains_key(&stream_id) {
743                conn.ready_streams.insert(stream_id);
744            }
745        } else if msg.is_response() {
746            conn.pending_responses.insert(stream_id, msg);
747            if conn.pending_requests.contains_key(&stream_id) {
748                conn.ready_streams.insert(stream_id);
749            }
750        }
751    }
752}
753
754/// Find a stream_id that has both request and response ready (O(1) via
755/// ready_streams set)
756fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
757    conn.ready_streams.iter().next().copied()
758}
759
760/// Drain complete HTTP/1 messages from the write-direction buffer, emitting a
761/// `Message` event for each one. Supports HTTP/1.1 keep-alive pipelining: as
762/// long as the buffer holds another complete request or response, we slice
763/// its bytes off and keep parsing.
764///
765/// The last parsed message (if any) remains in `conn.h1_request` /
766/// `conn.h1_response` so the exchange-matching path in
767/// `process_event_for_conn` still works for the final unpaired message.
768fn drain_parse_emit_http1_write(
769    conn: &mut Conn,
770    conn_id: u128,
771    process_id: u32,
772    config: &CollatorConfig,
773    events: &mut Vec<CollationEvent>,
774) {
775    loop {
776        let timestamp = conn
777            .request_chunks
778            .last()
779            .map(|c| c.timestamp_ns)
780            .unwrap_or(TimestampNs(0));
781
782        // Try request first (client-side), then response (server-side).
783        if let Some((req, consumed)) =
784            h1::try_parse_http1_request_sized(&conn.h1_write_buffer, timestamp)
785        {
786            conn.h1_write_buffer.drain(..consumed);
787            emit_h1_request(conn, conn_id, process_id, config, events, req);
788        } else if let Some((resp, consumed)) =
789            h1::try_parse_http1_response_sized(&conn.h1_write_buffer, timestamp)
790        {
791            conn.h1_write_buffer.drain(..consumed);
792            emit_h1_response(conn, conn_id, process_id, config, events, resp);
793        } else {
794            break;
795        }
796    }
797}
798
799/// Drain complete HTTP/1 messages from the read-direction buffer, emitting a
800/// `Message` event for each one. See `drain_parse_emit_http1_write`.
801fn drain_parse_emit_http1_read(
802    conn: &mut Conn,
803    conn_id: u128,
804    process_id: u32,
805    config: &CollatorConfig,
806    events: &mut Vec<CollationEvent>,
807) {
808    loop {
809        let timestamp = conn
810            .response_chunks
811            .last()
812            .map(|c| c.timestamp_ns)
813            .unwrap_or(TimestampNs(0));
814
815        // Tries response first (client-side), then request (server-side).
816        if let Some((resp, consumed)) =
817            h1::try_parse_http1_response_sized(&conn.h1_read_buffer, timestamp)
818        {
819            conn.h1_read_buffer.drain(..consumed);
820            emit_h1_response(conn, conn_id, process_id, config, events, resp);
821        } else if let Some((req, consumed)) =
822            h1::try_parse_http1_request_sized(&conn.h1_read_buffer, timestamp)
823        {
824            conn.h1_read_buffer.drain(..consumed);
825            emit_h1_request(conn, conn_id, process_id, config, events, req);
826        } else {
827            break;
828        }
829    }
830}
831
832/// Drain HTTP/1 messages from the write buffer under Unknown protocol; on the
833/// first successful parse, promote the connection to Http1 and keep draining.
834fn drain_parse_emit_http1_unknown_write(
835    conn: &mut Conn,
836    conn_id: u128,
837    process_id: u32,
838    config: &CollatorConfig,
839    events: &mut Vec<CollationEvent>,
840) {
841    let timestamp = conn
842        .request_chunks
843        .last()
844        .map(|c| c.timestamp_ns)
845        .unwrap_or(TimestampNs(0));
846    if let Some((req, consumed)) =
847        h1::try_parse_http1_request_sized(&conn.h1_write_buffer, timestamp)
848    {
849        conn.protocol = Protocol::Http1;
850        conn.h1_write_buffer.drain(..consumed);
851        emit_h1_request(conn, conn_id, process_id, config, events, req);
852        drain_parse_emit_http1_write(conn, conn_id, process_id, config, events);
853    } else if let Some((resp, consumed)) =
854        h1::try_parse_http1_response_sized(&conn.h1_write_buffer, timestamp)
855    {
856        conn.protocol = Protocol::Http1;
857        conn.h1_write_buffer.drain(..consumed);
858        emit_h1_response(conn, conn_id, process_id, config, events, resp);
859        drain_parse_emit_http1_write(conn, conn_id, process_id, config, events);
860    }
861}
862
863/// Drain HTTP/1 messages from the read buffer under Unknown protocol; on the
864/// first successful parse, promote the connection to Http1 and keep draining.
865fn drain_parse_emit_http1_unknown_read(
866    conn: &mut Conn,
867    conn_id: u128,
868    process_id: u32,
869    config: &CollatorConfig,
870    events: &mut Vec<CollationEvent>,
871) {
872    let timestamp = conn
873        .response_chunks
874        .last()
875        .map(|c| c.timestamp_ns)
876        .unwrap_or(TimestampNs(0));
877    if let Some((resp, consumed)) =
878        h1::try_parse_http1_response_sized(&conn.h1_read_buffer, timestamp)
879    {
880        conn.protocol = Protocol::Http1;
881        conn.h1_read_buffer.drain(..consumed);
882        emit_h1_response(conn, conn_id, process_id, config, events, resp);
883        drain_parse_emit_http1_read(conn, conn_id, process_id, config, events);
884    } else if let Some((req, consumed)) =
885        h1::try_parse_http1_request_sized(&conn.h1_read_buffer, timestamp)
886    {
887        conn.protocol = Protocol::Http1;
888        conn.h1_read_buffer.drain(..consumed);
889        emit_h1_request(conn, conn_id, process_id, config, events, req);
890        drain_parse_emit_http1_read(conn, conn_id, process_id, config, events);
891    }
892}
893
894/// Push a Message event for an HTTP/1 request and store it on the connection
895/// as the "current" parsed request (for exchange matching). Sets
896/// `h1_request_emitted` so `emit_message_events` won't re-emit this one.
897fn emit_h1_request(
898    conn: &mut Conn,
899    conn_id: u128,
900    process_id: u32,
901    config: &CollatorConfig,
902    events: &mut Vec<CollationEvent>,
903    req: h1::HttpRequest,
904) {
905    if config.emit_messages {
906        let metadata = MessageMetadata {
907            connection_id: conn_id,
908            process_id,
909            timestamp_ns: req.timestamp_ns,
910            stream_id: None,
911            remote_port: conn.remote_port,
912            protocol: Protocol::Http1,
913        };
914        events.push(CollationEvent::Message {
915            message: ParsedHttpMessage::Request(req.clone()),
916            metadata,
917        });
918    }
919    conn.h1_request = Some(req);
920    conn.h1_request_emitted = true;
921}
922
923/// Push a Message event for an HTTP/1 response and store it on the connection
924/// as the "current" parsed response (for exchange matching). Sets
925/// `h1_response_emitted` so `emit_message_events` won't re-emit this one.
926fn emit_h1_response(
927    conn: &mut Conn,
928    conn_id: u128,
929    process_id: u32,
930    config: &CollatorConfig,
931    events: &mut Vec<CollationEvent>,
932    resp: h1::HttpResponse,
933) {
934    if config.emit_messages {
935        let metadata = MessageMetadata {
936            connection_id: conn_id,
937            process_id,
938            timestamp_ns: resp.timestamp_ns,
939            stream_id: None,
940            remote_port: conn.remote_port,
941            protocol: Protocol::Http1,
942        };
943        events.push(CollationEvent::Message {
944            message: ParsedHttpMessage::Response(resp.clone()),
945            metadata,
946        });
947    }
948    conn.h1_response = Some(resp);
949    conn.h1_response_emitted = true;
950}
951
952fn is_request_complete(conn: &Conn) -> bool {
953    match conn.protocol {
954        Protocol::Http1 => conn.h1_request.is_some(),
955        Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
956        Protocol::Unknown => false,
957    }
958}
959
960fn is_response_complete(conn: &Conn) -> bool {
961    match conn.protocol {
962        Protocol::Http1 => conn.h1_response.is_some(),
963        Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
964        Protocol::Unknown => false,
965    }
966}
967
968fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
969    let (request, response, stream_id, latency_ns) = match conn.protocol {
970        Protocol::Http1 => {
971            // Take the already-parsed request and response
972            let req = conn.h1_request.take()?;
973            let resp = conn.h1_response.take()?;
974            let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
975            (req, resp, None, latency)
976        },
977        Protocol::Http2 => {
978            let sid = find_complete_h2_stream(conn)?;
979            let msg_req = conn.pending_requests.remove(&sid)?;
980            let msg_resp = conn.pending_responses.remove(&sid)?;
981
982            // Clean up emission and ready tracking for this stream
983            conn.h2_emitted_requests.remove(&sid);
984            conn.h2_emitted_responses.remove(&sid);
985            conn.ready_streams.remove(&sid);
986
987            // For HTTP/2, use per-stream timestamps from the parsed messages
988            // Request complete time: when END_STREAM was seen on request
989            // Response start time: when first frame was received on response
990            let request_complete_time = msg_req.end_stream_timestamp_ns;
991            let response_start_time = msg_resp.first_frame_timestamp_ns;
992
993            let req = msg_req.into_http_request()?;
994            let resp = msg_resp.into_http_response()?;
995
996            let latency = response_start_time.saturating_sub(request_complete_time);
997            (req, resp, Some(sid), latency)
998        },
999        Protocol::Unknown => return None,
1000    };
1001
1002    Some(Exchange {
1003        request,
1004        response,
1005        latency_ns,
1006        protocol: conn.protocol,
1007        process_id: conn.process_id,
1008        remote_port: conn.remote_port,
1009        stream_id,
1010    })
1011}
1012
1013#[cfg(test)]
1014mod tests;