Skip to main content

http_collator/
connection.rs

1//! Connection state tracking for HTTP collation
2
3use std::collections::{HashMap, HashSet};
4
5use bytes::Bytes;
6use h2session::{H2ConnectionState, ParsedH2Message, StreamId, TimestampNs};
7
8use crate::{
9    h1::{HttpRequest, HttpResponse},
10    traits::Direction,
11};
12
13/// Protocol detected for a connection
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum Protocol {
16    /// Protocol has not been identified yet
17    Unknown,
18    /// HTTP/1.x (HTTP/1.0 or HTTP/1.1)
19    Http1,
20    /// HTTP/2
21    Http2,
22}
23
24/// A chunk of data received from a data source
25#[derive(Debug, Clone)]
26pub(crate) struct DataChunk {
27    pub(crate) data:         Bytes,
28    pub(crate) timestamp_ns: TimestampNs,
29    #[allow(dead_code)]
30    pub(crate) direction:    Direction,
31}
32
33/// Tracks state for a single connection
34pub(crate) struct Connection {
35    pub(crate) process_id:        u32,
36    /// Remote port, None if unavailable (e.g., SSL without socket fd)
37    pub(crate) remote_port:       Option<u16>,
38    pub(crate) protocol:          Protocol,
39    pub(crate) request_chunks:    Vec<DataChunk>,
40    pub(crate) response_chunks:   Vec<DataChunk>,
41    pub(crate) last_activity_ns:  TimestampNs,
42    pub(crate) request_complete:  bool,
43    pub(crate) response_complete: bool,
44
45    // HTTP/1 growable buffers — new data is appended as it arrives, avoiding
46    // repeated clone-and-concatenate of all previous chunks.
47    pub(crate) h1_request_buffer:  Vec<u8>,
48    pub(crate) h1_response_buffer: Vec<u8>,
49
50    // Accumulated body size per direction for enforcing max_body_size limit
51    pub(crate) request_body_size:  usize,
52    pub(crate) response_body_size: usize,
53
54    // HTTP/1 parsed messages (when complete)
55    pub(crate) h1_request:  Option<HttpRequest>,
56    pub(crate) h1_response: Option<HttpResponse>,
57
58    // HTTP/1 emission tracking - have we already emitted Message events for these?
59    pub(crate) h1_request_emitted:  bool,
60    pub(crate) h1_response_emitted: bool,
61
62    // HTTP/2 state: separate parsers per direction to avoid corrupting
63    // frame boundaries when Read and Write events interleave.
64    pub(crate) h2_write_state: H2ConnectionState,
65    pub(crate) h2_read_state:  H2ConnectionState,
66
67    // Completed messages from h2session, keyed by stream_id
68    pub(crate) pending_requests:  HashMap<StreamId, ParsedH2Message>,
69    pub(crate) pending_responses: HashMap<StreamId, ParsedH2Message>,
70
71    // HTTP/2 emission tracking - which stream_ids have we emitted Message events for?
72    pub(crate) h2_emitted_requests:  HashSet<StreamId>,
73    pub(crate) h2_emitted_responses: HashSet<StreamId>,
74
75    // Stream IDs that have both a pending request and pending response,
76    // enabling O(1) lookup for complete exchange pairs.
77    pub(crate) ready_streams: HashSet<StreamId>,
78}
79
80impl Connection {
81    pub(crate) fn new(process_id: u32, remote_port: u16) -> Self {
82        Self {
83            process_id,
84            // Store None for port 0 (unavailable from SSL)
85            remote_port: if remote_port == 0 {
86                None
87            } else {
88                Some(remote_port)
89            },
90            protocol: Protocol::Unknown,
91            request_chunks: Vec::new(),
92            response_chunks: Vec::new(),
93            last_activity_ns: TimestampNs(0),
94            request_complete: false,
95            response_complete: false,
96            h1_request_buffer: Vec::new(),
97            h1_response_buffer: Vec::new(),
98            request_body_size: 0,
99            response_body_size: 0,
100            h1_request: None,
101            h1_response: None,
102            h1_request_emitted: false,
103            h1_response_emitted: false,
104            h2_write_state: H2ConnectionState::new(),
105            h2_read_state: H2ConnectionState::new(),
106            pending_requests: HashMap::new(),
107            pending_responses: HashMap::new(),
108            h2_emitted_requests: HashSet::new(),
109            h2_emitted_responses: HashSet::new(),
110            ready_streams: HashSet::new(),
111        }
112    }
113}