Skip to main content

http_collator/
connection.rs

1//! Connection state tracking for HTTP collation
2
3use std::collections::{HashMap, HashSet};
4
5use bytes::Bytes;
6use h2session::{H2ConnectionState, ParsedH2Message, StreamId, TimestampNs};
7
8use crate::{
9    h1::{HttpRequest, HttpResponse},
10    traits::Direction,
11};
12
13/// Protocol detected for a connection
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum Protocol {
16    /// Protocol has not been identified yet
17    Unknown,
18    /// HTTP/1.x (HTTP/1.0 or HTTP/1.1)
19    Http1,
20    /// HTTP/2
21    Http2,
22}
23
24/// A chunk of data received from a data source
25#[derive(Debug, Clone)]
26pub(crate) struct DataChunk {
27    pub(crate) data:         Bytes,
28    pub(crate) timestamp_ns: TimestampNs,
29    #[allow(dead_code)]
30    pub(crate) direction:    Direction,
31}
32
33/// Tracks state for a single connection
34pub(crate) struct Connection {
35    pub(crate) process_id:        u32,
36    /// Remote port, None if unavailable (e.g., SSL without socket fd)
37    pub(crate) remote_port:       Option<u16>,
38    pub(crate) protocol:          Protocol,
39    pub(crate) request_chunks:    Vec<DataChunk>,
40    pub(crate) response_chunks:   Vec<DataChunk>,
41    pub(crate) last_activity_ns:  TimestampNs,
42    pub(crate) request_complete:  bool,
43    pub(crate) response_complete: bool,
44
45    // HTTP/1 growable buffers — new data is appended as it arrives, avoiding
46    // repeated clone-and-concatenate of all previous chunks.
47    pub(crate) h1_write_buffer: Vec<u8>,
48    pub(crate) h1_read_buffer:  Vec<u8>,
49
50    // Accumulated body size per direction for enforcing max_body_size limit
51    pub(crate) request_body_size:  usize,
52    pub(crate) response_body_size: usize,
53
54    // HTTP/1 parsed messages (when complete)
55    pub(crate) h1_request:  Option<HttpRequest>,
56    pub(crate) h1_response: Option<HttpResponse>,
57
58    // HTTP/1 per-direction parse tracking — have we extracted a message from
59    // this direction's buffer? Prevents redundant re-parsing on subsequent chunks.
60    pub(crate) h1_write_parsed: bool,
61    pub(crate) h1_read_parsed:  bool,
62
63    // HTTP/1 emission tracking - have we already emitted Message events for these?
64    pub(crate) h1_request_emitted:  bool,
65    pub(crate) h1_response_emitted: bool,
66
67    // HTTP/2 state: separate parsers per direction to avoid corrupting
68    // frame boundaries when Read and Write events interleave.
69    pub(crate) h2_write_state: H2ConnectionState,
70    pub(crate) h2_read_state:  H2ConnectionState,
71
72    // Completed messages from h2session, keyed by stream_id
73    pub(crate) pending_requests:  HashMap<StreamId, ParsedH2Message>,
74    pub(crate) pending_responses: HashMap<StreamId, ParsedH2Message>,
75
76    // HTTP/2 emission tracking - which stream_ids have we emitted Message events for?
77    pub(crate) h2_emitted_requests:  HashSet<StreamId>,
78    pub(crate) h2_emitted_responses: HashSet<StreamId>,
79
80    // Stream IDs that have both a pending request and pending response,
81    // enabling O(1) lookup for complete exchange pairs.
82    pub(crate) ready_streams: HashSet<StreamId>,
83}
84
85impl Connection {
86    pub(crate) fn new(process_id: u32, remote_port: u16) -> Self {
87        Self {
88            process_id,
89            // Store None for port 0 (unavailable from SSL)
90            remote_port: if remote_port == 0 {
91                None
92            } else {
93                Some(remote_port)
94            },
95            protocol: Protocol::Unknown,
96            request_chunks: Vec::new(),
97            response_chunks: Vec::new(),
98            last_activity_ns: TimestampNs(0),
99            request_complete: false,
100            response_complete: false,
101            h1_write_buffer: Vec::new(),
102            h1_read_buffer: Vec::new(),
103            request_body_size: 0,
104            response_body_size: 0,
105            h1_request: None,
106            h1_response: None,
107            h1_write_parsed: false,
108            h1_read_parsed: false,
109            h1_request_emitted: false,
110            h1_response_emitted: false,
111            h2_write_state: H2ConnectionState::new(),
112            h2_read_state: H2ConnectionState::new(),
113            pending_requests: HashMap::new(),
114            pending_responses: HashMap::new(),
115            h2_emitted_requests: HashSet::new(),
116            h2_emitted_responses: HashSet::new(),
117            ready_streams: HashSet::new(),
118        }
119    }
120}