Skip to main content

http_collator/
lib.rs

1#![warn(missing_docs)]
2//! HTTP collation library
3//!
4//! Collates individual data events (from eBPF, pcap, etc.) into complete
5//! HTTP request/response exchanges. Supports both HTTP/1.x and HTTP/2.
6//!
7//! # Usage
8//!
9//! Implement the [`DataEvent`] trait for your data source, then feed events
10//! to the [`Collator`]:
11//!
12//! ```no_run
13//! use http_collator::{Collator, CollationEvent, DataEvent, Direction};
14//! use bytes::Bytes;
15//!
16//! struct MyEvent {
17//!     payload: Vec<u8>,
18//!     timestamp_ns: u64,
19//!     direction: Direction,
20//!     connection_id: u64,
21//!     process_id: u32,
22//!     remote_port: u16,
23//! }
24//!
25//! impl DataEvent for MyEvent {
26//!     fn payload(&self) -> &[u8] { &self.payload }
27//!     fn timestamp_ns(&self) -> u64 { self.timestamp_ns }
28//!     fn direction(&self) -> Direction { self.direction }
29//!     fn connection_id(&self) -> u64 { self.connection_id }
30//!     fn process_id(&self) -> u32 { self.process_id }
31//!     fn remote_port(&self) -> u16 { self.remote_port }
32//! }
33//!
34//! let collator = Collator::<MyEvent>::new();
35//! # let my_event = MyEvent { payload: vec![], timestamp_ns: 0, direction: Direction::Write, connection_id: 1, process_id: 1, remote_port: 80 };
36//! for event in collator.add_event(my_event) {
37//!     match event {
38//!         CollationEvent::Message { message, metadata } => {
39//!             println!("parsed message for conn {}", metadata.connection_id);
40//!         }
41//!         CollationEvent::Exchange(exchange) => {
42//!             println!("complete exchange: {exchange}");
43//!         }
44//!     }
45//! }
46//! ```
47
48mod connection;
49mod exchange;
50pub mod h1;
51mod traits;
52
53use std::marker::PhantomData;
54
55pub use connection::Protocol;
56use connection::{Connection as Conn, DataChunk};
57use dashmap::DashMap;
58pub use exchange::{CollationEvent, CollatorConfig, Exchange, MessageMetadata, ParsedHttpMessage};
59pub use h1::{HttpRequest, HttpResponse};
60use h2session::{
61    H2ConnectionState,
62    StreamId,
63    TimestampNs,
64    is_http2_preface,
65    looks_like_http2_frame,
66};
67pub use traits::{DataEvent, Direction};
68
69/// Default maximum buffer size for data events (TLS record size)
70pub const MAX_BUF_SIZE: usize = 16384;
71
72/// Collates individual data events into complete request/response exchanges.
73///
74/// Generic over the event type `E` which must implement [`DataEvent`].
75/// Uses per-connection locking via `DashMap` so concurrent HTTP/2 connections
76/// do not serialize through a single mutex.
77pub struct Collator<E: DataEvent> {
78    /// Connections tracked by conn_id (for socket events)
79    connections:     DashMap<u64, Conn>,
80    /// SSL connections tracked by process_id (no conn_id available)
81    ssl_connections: DashMap<u32, Conn>,
82    /// Configuration for what events to emit
83    config:          CollatorConfig,
84    /// Phantom data for the event type
85    _phantom:        PhantomData<E>,
86}
87
88impl<E: DataEvent> Default for Collator<E> {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl<E: DataEvent> Collator<E> {
95    /// Create a new collator with default settings (emits both messages and
96    /// exchanges)
97    pub fn new() -> Self {
98        Self::with_config(CollatorConfig::default())
99    }
100
101    /// Create a new collator with custom configuration
102    pub fn with_config(config: CollatorConfig) -> Self {
103        Self {
104            connections: DashMap::new(),
105            ssl_connections: DashMap::new(),
106            config,
107            _phantom: PhantomData,
108        }
109    }
110
111    /// Create a new collator with a custom maximum buffer size
112    pub fn with_max_buf_size(max_buf_size: usize) -> Self {
113        Self::with_config(CollatorConfig {
114            max_buf_size,
115            ..Default::default()
116        })
117    }
118
119    /// Get a reference to the current configuration
120    pub fn config(&self) -> &CollatorConfig {
121        &self.config
122    }
123
124    /// Process a data event, returning all resulting collation events.
125    ///
126    /// Takes ownership of the event to enable zero-copy transfer of the
127    /// payload into `DataChunk` via `into_payload()`.
128    ///
129    /// Returns Vec because:
130    /// - HTTP/2 can have multiple streams complete in one buffer
131    /// - A single buffer might contain complete request AND start of next
132    /// - Config might emit both messages AND exchanges
133    pub fn add_event(&self, event: E) -> Vec<CollationEvent> {
134        // Extract all scalar metadata before consuming the event
135        let direction = event.direction();
136        let timestamp_ns = TimestampNs(event.timestamp_ns());
137        let conn_id = event.connection_id();
138        let process_id = event.process_id();
139        let remote_port = event.remote_port();
140        let is_empty = event.payload().is_empty();
141
142        if is_empty {
143            return Vec::new();
144        }
145
146        // Skip non-data events
147        if direction == Direction::Other {
148            return Vec::new();
149        }
150
151        // Move payload ownership via into_payload() — avoids cloning for
152        // implementors that already hold a Bytes or Vec<u8>.
153        let data = event.into_payload();
154
155        let chunk = DataChunk {
156            data,
157            timestamp_ns,
158            direction,
159        };
160
161        // DashMap entry() locks only the shard for this connection, allowing
162        // concurrent access to other connections.
163        if conn_id != 0 {
164            let mut conn = self
165                .connections
166                .entry(conn_id)
167                .or_insert_with(|| Conn::new(process_id, remote_port));
168            Self::process_event_for_conn(
169                &mut conn,
170                chunk,
171                direction,
172                timestamp_ns,
173                remote_port,
174                conn_id,
175                process_id,
176                &self.config,
177            )
178        } else {
179            let mut conn = self
180                .ssl_connections
181                .entry(process_id)
182                .or_insert_with(|| Conn::new(process_id, remote_port));
183            Self::process_event_for_conn(
184                &mut conn,
185                chunk,
186                direction,
187                timestamp_ns,
188                remote_port,
189                conn_id,
190                process_id,
191                &self.config,
192            )
193        }
194    }
195
196    /// Core event processing logic, called with a mutable reference to the
197    /// connection (obtained from a `DashMap` entry guard).
198    #[allow(clippy::too_many_arguments)]
199    fn process_event_for_conn(
200        conn: &mut Conn,
201        chunk: DataChunk,
202        direction: Direction,
203        timestamp_ns: TimestampNs,
204        remote_port: u16,
205        conn_id: u64,
206        process_id: u32,
207        config: &CollatorConfig,
208    ) -> Vec<CollationEvent> {
209        let buf: &[u8] = &chunk.data;
210
211        conn.last_activity_ns = timestamp_ns;
212        if remote_port != 0 && conn.remote_port.is_none() {
213            conn.remote_port = Some(remote_port);
214        }
215
216        // Detect protocol from first chunk if unknown
217        if conn.protocol == Protocol::Unknown {
218            conn.protocol = detect_protocol(buf);
219        }
220
221        // Detect protocol change on an established connection (FD reuse).
222        if conn.protocol != Protocol::Unknown {
223            let incoming_protocol = detect_protocol(buf);
224            if incoming_protocol != Protocol::Unknown && incoming_protocol != conn.protocol {
225                reset_connection_for_protocol_change(conn, incoming_protocol);
226            }
227        }
228
229        // Add chunk to appropriate buffer based on direction
230        match direction {
231            Direction::Write => {
232                conn.request_body_size += buf.len();
233                if conn.request_body_size > config.max_body_size {
234                    reset_connection_body_limit(conn);
235                    return Vec::new();
236                }
237
238                if conn.protocol == Protocol::Http1 {
239                    conn.h1_request_buffer.extend_from_slice(buf);
240                }
241                conn.request_chunks.push(chunk);
242
243                match conn.protocol {
244                    Protocol::Http1 if conn.h1_request.is_none() => {
245                        try_parse_http1_request_chunks(conn);
246                    },
247                    Protocol::Http2 => {
248                        parse_http2_chunks(conn, direction);
249                    },
250                    _ => {},
251                }
252
253                if is_request_complete(conn) {
254                    conn.request_complete = true;
255                }
256            },
257            Direction::Read => {
258                conn.response_body_size += buf.len();
259                if conn.response_body_size > config.max_body_size {
260                    reset_connection_body_limit(conn);
261                    return Vec::new();
262                }
263
264                if conn.protocol == Protocol::Http1 {
265                    conn.h1_response_buffer.extend_from_slice(buf);
266                }
267                conn.response_chunks.push(chunk);
268
269                match conn.protocol {
270                    Protocol::Http1 if conn.h1_response.is_none() => {
271                        try_parse_http1_response_chunks(conn);
272                    },
273                    Protocol::Http2 => {
274                        parse_http2_chunks(conn, direction);
275                    },
276                    _ => {},
277                }
278
279                if is_response_complete(conn) {
280                    conn.response_complete = true;
281                }
282            },
283            Direction::Other => {
284                return Vec::new();
285            },
286        }
287
288        // For HTTP/2, a complete stream pair can be detected on either event.
289        if conn.protocol == Protocol::Http2 && find_complete_h2_stream(conn).is_some() {
290            conn.request_complete = true;
291            conn.response_complete = true;
292        }
293
294        let mut events = Vec::new();
295
296        if config.emit_messages {
297            emit_message_events(conn, conn_id, process_id, &mut events);
298        }
299
300        if config.emit_exchanges && conn.request_complete && conn.response_complete {
301            if let Some(exchange) = build_exchange(conn) {
302                events.push(CollationEvent::Exchange(exchange));
303            }
304            reset_connection_after_exchange(conn);
305        }
306
307        events
308    }
309
310    /// Clean up stale connections and evict stale H2 streams.
311    ///
312    /// Callers should invoke this periodically to bound memory usage from
313    /// abandoned connections and incomplete HTTP/2 streams.
314    pub fn cleanup(&self, current_time_ns: TimestampNs) {
315        self.connections.retain(|_, conn| {
316            current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
317        });
318        self.ssl_connections.retain(|_, conn| {
319            current_time_ns.saturating_sub(conn.last_activity_ns) < self.config.timeout_ns
320        });
321
322        // Evict stale H2 streams within surviving connections
323        for mut entry in self.connections.iter_mut() {
324            entry.h2_write_state.evict_stale_streams(current_time_ns);
325            entry.h2_read_state.evict_stale_streams(current_time_ns);
326        }
327        for mut entry in self.ssl_connections.iter_mut() {
328            entry.h2_write_state.evict_stale_streams(current_time_ns);
329            entry.h2_read_state.evict_stale_streams(current_time_ns);
330        }
331    }
332
333    /// Remove a connection explicitly (e.g., on connection close)
334    ///
335    /// Call this when a connection is closed to free resources immediately.
336    /// If connection_id is 0, removes based on process_id from SSL connections.
337    pub fn remove_connection(&self, connection_id: u64, process_id: u32) {
338        if connection_id != 0 {
339            self.connections.remove(&connection_id);
340        } else {
341            self.ssl_connections.remove(&process_id);
342        }
343    }
344
345    /// Close a connection, finalizing any pending HTTP/1 response.
346    ///
347    /// For HTTP/1 responses without explicit framing (no Content-Length or
348    /// Transfer-Encoding), RFC 7230 §3.3.3 says the body extends until the
349    /// connection closes. This method finalizes such responses with whatever
350    /// body has accumulated so far, emits any resulting events, then removes
351    /// the connection.
352    pub fn close_connection(&self, connection_id: u64, process_id: u32) -> Vec<CollationEvent> {
353        let events = if connection_id != 0 {
354            match self.connections.get_mut(&connection_id) {
355                Some(mut guard) => {
356                    finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
357                },
358                None => Vec::new(),
359            }
360        } else {
361            match self.ssl_connections.get_mut(&process_id) {
362                Some(mut guard) => {
363                    finalize_and_emit(&mut guard, connection_id, process_id, &self.config)
364                },
365                None => Vec::new(),
366            }
367        };
368
369        // Remove the connection after releasing the guard
370        if connection_id != 0 {
371            self.connections.remove(&connection_id);
372        } else {
373            self.ssl_connections.remove(&process_id);
374        }
375
376        events
377    }
378}
379
380/// Finalize any pending HTTP/1 response and emit events on connection close.
381fn finalize_and_emit(
382    conn: &mut Conn,
383    connection_id: u64,
384    process_id: u32,
385    config: &CollatorConfig,
386) -> Vec<CollationEvent> {
387    // Finalize any pending HTTP/1 response with body accumulated so far
388    if conn.protocol == Protocol::Http1
389        && conn.h1_response.is_none()
390        && !conn.h1_response_buffer.is_empty()
391    {
392        let timestamp = conn
393            .response_chunks
394            .first()
395            .map(|c| c.timestamp_ns)
396            .unwrap_or(TimestampNs(0));
397        conn.h1_response = h1::try_finalize_http1_response(&conn.h1_response_buffer, timestamp);
398        if conn.h1_response.is_some() {
399            conn.response_complete = true;
400        }
401    }
402
403    let mut events = Vec::new();
404
405    if config.emit_messages {
406        emit_message_events(conn, connection_id, process_id, &mut events);
407    }
408
409    if config.emit_exchanges
410        && conn.request_complete
411        && conn.response_complete
412        && let Some(exchange) = build_exchange(conn)
413    {
414        events.push(CollationEvent::Exchange(exchange));
415    }
416
417    events
418}
419
420/// Emit Message events for any newly parsed messages that haven't been emitted
421/// yet
422fn emit_message_events(
423    conn: &mut Conn,
424    conn_id: u64,
425    process_id: u32,
426    events: &mut Vec<CollationEvent>,
427) {
428    match conn.protocol {
429        Protocol::Http1 => {
430            // Emit request if parsed and not yet emitted
431            if let Some(ref req) = conn.h1_request
432                && !conn.h1_request_emitted
433            {
434                let metadata = MessageMetadata {
435                    connection_id: conn_id,
436                    process_id,
437                    timestamp_ns: req.timestamp_ns,
438                    stream_id: None,
439                    remote_port: conn.remote_port,
440                    protocol: conn.protocol,
441                };
442                events.push(CollationEvent::Message {
443                    message: ParsedHttpMessage::Request(req.clone()),
444                    metadata,
445                });
446                conn.h1_request_emitted = true;
447            }
448
449            // Emit response if parsed and not yet emitted
450            if let Some(ref resp) = conn.h1_response
451                && !conn.h1_response_emitted
452            {
453                let metadata = MessageMetadata {
454                    connection_id: conn_id,
455                    process_id,
456                    timestamp_ns: resp.timestamp_ns,
457                    stream_id: None,
458                    remote_port: conn.remote_port,
459                    protocol: conn.protocol,
460                };
461                events.push(CollationEvent::Message {
462                    message: ParsedHttpMessage::Response(resp.clone()),
463                    metadata,
464                });
465                conn.h1_response_emitted = true;
466            }
467        },
468        Protocol::Http2 => {
469            // Emit newly parsed HTTP/2 requests
470            for (&stream_id, msg) in &conn.pending_requests {
471                if !conn.h2_emitted_requests.contains(&stream_id)
472                    && let Some(req) = msg.to_http_request()
473                {
474                    let metadata = MessageMetadata {
475                        connection_id: conn_id,
476                        process_id,
477                        timestamp_ns: msg.end_stream_timestamp_ns,
478                        stream_id: Some(stream_id),
479                        remote_port: conn.remote_port,
480                        protocol: conn.protocol,
481                    };
482                    events.push(CollationEvent::Message {
483                        message: ParsedHttpMessage::Request(req),
484                        metadata,
485                    });
486                }
487            }
488            // Mark all current pending requests as emitted
489            conn.h2_emitted_requests
490                .extend(conn.pending_requests.keys().copied());
491
492            // Emit newly parsed HTTP/2 responses
493            for (&stream_id, msg) in &conn.pending_responses {
494                if !conn.h2_emitted_responses.contains(&stream_id)
495                    && let Some(resp) = msg.to_http_response()
496                {
497                    let metadata = MessageMetadata {
498                        connection_id: conn_id,
499                        process_id,
500                        timestamp_ns: msg.first_frame_timestamp_ns,
501                        stream_id: Some(stream_id),
502                        remote_port: conn.remote_port,
503                        protocol: conn.protocol,
504                    };
505                    events.push(CollationEvent::Message {
506                        message: ParsedHttpMessage::Response(resp),
507                        metadata,
508                    });
509                }
510            }
511            // Mark all current pending responses as emitted
512            conn.h2_emitted_responses
513                .extend(conn.pending_responses.keys().copied());
514        },
515        Protocol::Unknown => {},
516    }
517}
518
519/// Reset connection state after emitting an exchange
520fn reset_connection_after_exchange(conn: &mut Conn) {
521    conn.request_complete = false;
522    conn.response_complete = false;
523
524    if conn.protocol == Protocol::Http1 {
525        // HTTP/1: clear everything for the next exchange
526        conn.request_chunks.clear();
527        conn.response_chunks.clear();
528        conn.h1_request = None;
529        conn.h1_response = None;
530        conn.h1_request_emitted = false;
531        conn.h1_response_emitted = false;
532        conn.h1_request_buffer.clear();
533        conn.h1_response_buffer.clear();
534        conn.protocol = Protocol::Unknown;
535    } else if conn.protocol == Protocol::Http2 {
536        // HTTP/2: only clear chunks if no other pending messages remain.
537        // The matched pair was already removed in build_exchange().
538        // Keep h2_*_state HPACK decoder for connection persistence.
539        // h2_emitted_* sets are cleaned up in build_exchange when
540        // the stream_id is removed from pending.
541        if conn.pending_requests.is_empty() && conn.pending_responses.is_empty() {
542            conn.request_chunks.clear();
543            conn.response_chunks.clear();
544            conn.h2_write_state.clear_buffer();
545            conn.h2_read_state.clear_buffer();
546        }
547    }
548
549    // Reset body size tracking for the next exchange
550    conn.request_body_size = 0;
551    conn.response_body_size = 0;
552}
553
554/// Reset connection when accumulated body size exceeds the limit.
555/// Drops all accumulated data and parsed state for this connection.
556fn reset_connection_body_limit(conn: &mut Conn) {
557    conn.request_chunks.clear();
558    conn.response_chunks.clear();
559    conn.h1_request_buffer.clear();
560    conn.h1_response_buffer.clear();
561    conn.h1_request = None;
562    conn.h1_response = None;
563    conn.h1_request_emitted = false;
564    conn.h1_response_emitted = false;
565    conn.h2_write_state = H2ConnectionState::new();
566    conn.h2_read_state = H2ConnectionState::new();
567    conn.pending_requests.clear();
568    conn.pending_responses.clear();
569    conn.h2_emitted_requests.clear();
570    conn.h2_emitted_responses.clear();
571    conn.ready_streams.clear();
572    conn.request_complete = false;
573    conn.response_complete = false;
574    conn.request_body_size = 0;
575    conn.response_body_size = 0;
576    conn.protocol = Protocol::Unknown;
577}
578
579/// Reset connection when the detected protocol changes (FD reuse with
580/// different protocol, e.g., HTTP/2 followed by HTTP/1 on the same fd).
581fn reset_connection_for_protocol_change(conn: &mut Conn, new_protocol: Protocol) {
582    conn.request_chunks.clear();
583    conn.response_chunks.clear();
584    conn.h1_request_buffer.clear();
585    conn.h1_response_buffer.clear();
586    conn.h1_request = None;
587    conn.h1_response = None;
588    conn.h1_request_emitted = false;
589    conn.h1_response_emitted = false;
590    conn.h2_write_state = H2ConnectionState::new();
591    conn.h2_read_state = H2ConnectionState::new();
592    conn.pending_requests.clear();
593    conn.pending_responses.clear();
594    conn.h2_emitted_requests.clear();
595    conn.h2_emitted_responses.clear();
596    conn.ready_streams.clear();
597    conn.request_complete = false;
598    conn.response_complete = false;
599    conn.request_body_size = 0;
600    conn.response_body_size = 0;
601    conn.protocol = new_protocol;
602}
603
604/// Detect whether raw bytes look like HTTP/1.x or HTTP/2 traffic.
605///
606/// Checks for the HTTP/2 connection preface, HTTP/2 frame headers,
607/// and HTTP/1.x request/response patterns. Returns [`Protocol::Unknown`]
608/// if no pattern matches.
609pub fn detect_protocol(data: &[u8]) -> Protocol {
610    // Check for HTTP/2 preface
611    if is_http2_preface(data) {
612        return Protocol::Http2;
613    }
614
615    // Check for HTTP/2 frame header heuristic
616    if looks_like_http2_frame(data) {
617        return Protocol::Http2;
618    }
619
620    // Check for HTTP/1.x request
621    if h1::is_http1_request(data) || h1::is_http1_response(data) {
622        return Protocol::Http1;
623    }
624
625    Protocol::Unknown
626}
627
628/// Feed chunk to h2session, classify by content after parsing.
629///
630/// Uses separate H2ConnectionState per direction to avoid corrupting frame
631/// boundaries when Read and Write events interleave (e.g., WINDOW_UPDATEs
632/// between DATA frames). Messages are classified by their pseudo-headers
633/// (:method = request, :status = response), supporting both client-side
634/// monitoring (Write=request, Read=response) and server-side monitoring
635/// (Read=request, Write=response).
636fn parse_http2_chunks(conn: &mut Conn, direction: Direction) {
637    // Check for fd-reuse: a new h2 connection preface on a connection that
638    // already processed one means the kernel reused the file descriptor for
639    // a new TCP connection. We must reset BOTH directions' parsers since the
640    // new connection has fresh HPACK context on both sides.
641    let last_chunk_is_preface = match direction {
642        Direction::Write => conn
643            .request_chunks
644            .last()
645            .is_some_and(|c| is_http2_preface(&c.data)),
646        Direction::Read => conn
647            .response_chunks
648            .last()
649            .is_some_and(|c| is_http2_preface(&c.data)),
650        Direction::Other => false,
651    };
652    let current_state_has_preface = match direction {
653        Direction::Write => conn.h2_write_state.preface_received,
654        Direction::Read => conn.h2_read_state.preface_received,
655        Direction::Other => false,
656    };
657
658    if last_chunk_is_preface && current_state_has_preface {
659        conn.h2_write_state = H2ConnectionState::new();
660        conn.h2_read_state = H2ConnectionState::new();
661        conn.pending_requests.clear();
662        conn.pending_responses.clear();
663        conn.h2_emitted_requests.clear();
664        conn.h2_emitted_responses.clear();
665        conn.ready_streams.clear();
666    }
667
668    let (chunks, h2_state) = match direction {
669        Direction::Write => (&conn.request_chunks, &mut conn.h2_write_state),
670        Direction::Read => (&conn.response_chunks, &mut conn.h2_read_state),
671        Direction::Other => return,
672    };
673
674    let chunk = match chunks.last() {
675        Some(c) => c,
676        None => return,
677    };
678
679    // Feed to direction-specific h2 parser; errors are non-fatal
680    let _ = h2_state.feed(&chunk.data, chunk.timestamp_ns);
681
682    // Pop completed messages and classify by content, not direction.
683    // Maintain ready_streams set for O(1) complete-pair lookup.
684    while let Some((stream_id, msg)) = h2_state.try_pop() {
685        if msg.is_request() {
686            conn.pending_requests.insert(stream_id, msg);
687            if conn.pending_responses.contains_key(&stream_id) {
688                conn.ready_streams.insert(stream_id);
689            }
690        } else if msg.is_response() {
691            conn.pending_responses.insert(stream_id, msg);
692            if conn.pending_requests.contains_key(&stream_id) {
693                conn.ready_streams.insert(stream_id);
694            }
695        }
696    }
697}
698
699/// Find a stream_id that has both request and response ready (O(1) via
700/// ready_streams set)
701fn find_complete_h2_stream(conn: &Conn) -> Option<StreamId> {
702    conn.ready_streams.iter().next().copied()
703}
704
705/// Try to parse HTTP/1 request from accumulated buffer.
706/// If complete, stores the parsed request in conn.h1_request.
707fn try_parse_http1_request_chunks(conn: &mut Conn) {
708    let timestamp = conn
709        .request_chunks
710        .last()
711        .map(|c| c.timestamp_ns)
712        .unwrap_or(TimestampNs(0));
713    conn.h1_request = h1::try_parse_http1_request(&conn.h1_request_buffer, timestamp);
714}
715
716/// Try to parse HTTP/1 response from accumulated buffer.
717/// If complete, stores the parsed response in conn.h1_response.
718fn try_parse_http1_response_chunks(conn: &mut Conn) {
719    let timestamp = conn
720        .response_chunks
721        .first()
722        .map(|c| c.timestamp_ns)
723        .unwrap_or(TimestampNs(0));
724    conn.h1_response = h1::try_parse_http1_response(&conn.h1_response_buffer, timestamp);
725}
726
727fn is_request_complete(conn: &Conn) -> bool {
728    match conn.protocol {
729        Protocol::Http1 => conn.h1_request.is_some(),
730        Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
731        Protocol::Unknown => false,
732    }
733}
734
735fn is_response_complete(conn: &Conn) -> bool {
736    match conn.protocol {
737        Protocol::Http1 => conn.h1_response.is_some(),
738        Protocol::Http2 => find_complete_h2_stream(conn).is_some(),
739        Protocol::Unknown => false,
740    }
741}
742
743fn build_exchange(conn: &mut Conn) -> Option<Exchange> {
744    let (request, response, stream_id, latency_ns) = match conn.protocol {
745        Protocol::Http1 => {
746            // Take the already-parsed request and response
747            let req = conn.h1_request.take()?;
748            let resp = conn.h1_response.take()?;
749            let latency = resp.timestamp_ns.saturating_sub(req.timestamp_ns);
750            (req, resp, None, latency)
751        },
752        Protocol::Http2 => {
753            let sid = find_complete_h2_stream(conn)?;
754            let msg_req = conn.pending_requests.remove(&sid)?;
755            let msg_resp = conn.pending_responses.remove(&sid)?;
756
757            // Clean up emission and ready tracking for this stream
758            conn.h2_emitted_requests.remove(&sid);
759            conn.h2_emitted_responses.remove(&sid);
760            conn.ready_streams.remove(&sid);
761
762            // For HTTP/2, use per-stream timestamps from the parsed messages
763            // Request complete time: when END_STREAM was seen on request
764            // Response start time: when first frame was received on response
765            let request_complete_time = msg_req.end_stream_timestamp_ns;
766            let response_start_time = msg_resp.first_frame_timestamp_ns;
767
768            let req = msg_req.into_http_request()?;
769            let resp = msg_resp.into_http_response()?;
770
771            let latency = response_start_time.saturating_sub(request_complete_time);
772            (req, resp, Some(sid), latency)
773        },
774        Protocol::Unknown => return None,
775    };
776
777    Some(Exchange {
778        request,
779        response,
780        latency_ns,
781        protocol: conn.protocol,
782        process_id: conn.process_id,
783        remote_port: conn.remote_port,
784        stream_id,
785    })
786}
787
788#[cfg(test)]
789mod tests;