huginn_net_http/
http_process.rs

1use crate::error::HuginnNetHttpError;
2use crate::http_common::HttpProcessor;
3use crate::observable::{ObservableHttpRequest, ObservableHttpResponse};
4use crate::{http1_process, http2_process};
5use pnet::packet::ip::IpNextHeaderProtocols;
6use pnet::packet::ipv4::Ipv4Packet;
7use pnet::packet::ipv6::Ipv6Packet;
8use pnet::packet::tcp::TcpPacket;
9use pnet::packet::Packet;
10use std::net::IpAddr;
11use std::time::Duration;
12use tracing::debug;
13use ttl_cache::TtlCache;
14
15/// FlowKey: (Client IP, Server IP, Client Port, Server Port)
16pub type FlowKey = (IpAddr, IpAddr, u16, u16);
17
18use crate::http_common::HttpParser;
19
20/// HTTP parser that automatically detects and processes different HTTP versions
21pub struct HttpProcessors {
22    parsers: Vec<Box<dyn HttpParser>>,
23}
24
25impl HttpProcessors {
26    pub fn new() -> Self {
27        Self {
28            parsers: vec![
29                Box::new(Http1ParserAdapter::new()),
30                Box::new(Http2ParserAdapter::new()),
31            ],
32        }
33    }
34
35    /// Parse HTTP request data using the appropriate parser
36    pub fn parse_request(&self, data: &[u8]) -> Option<ObservableHttpRequest> {
37        for parser in &self.parsers {
38            if parser.can_parse(data) {
39                if let Some(result) = parser.parse_request(data) {
40                    return Some(result);
41                }
42            }
43        }
44        None
45    }
46
47    /// Parse HTTP response data using the appropriate parser
48    pub fn parse_response(&self, data: &[u8]) -> Option<ObservableHttpResponse> {
49        for parser in &self.parsers {
50            if parser.can_parse(data) {
51                if let Some(result) = parser.parse_response(data) {
52                    return Some(result);
53                }
54            }
55        }
56        None
57    }
58
59    /// Get all supported HTTP versions
60    pub fn supported_versions(&self) -> Vec<crate::http::Version> {
61        self.parsers.iter().map(|p| p.supported_version()).collect()
62    }
63}
64
65/// Adapter that bridges HTTP/1.x processor to the unified HttpParser interface
66struct Http1ParserAdapter {
67    processor: http1_process::Http1Processor,
68}
69
70impl Http1ParserAdapter {
71    fn new() -> Self {
72        Self {
73            processor: http1_process::Http1Processor::new(),
74        }
75    }
76}
77
78impl HttpParser for Http1ParserAdapter {
79    fn supported_version(&self) -> crate::http::Version {
80        crate::http::Version::V11
81    }
82
83    fn can_parse(&self, data: &[u8]) -> bool {
84        self.processor.can_process_request(data) || self.processor.can_process_response(data)
85    }
86
87    fn name(&self) -> &'static str {
88        "HTTP/1.x"
89    }
90
91    fn parse_request(&self, data: &[u8]) -> Option<ObservableHttpRequest> {
92        self.processor.process_request(data).ok().flatten()
93    }
94
95    fn parse_response(&self, data: &[u8]) -> Option<ObservableHttpResponse> {
96        self.processor.process_response(data).ok().flatten()
97    }
98}
99
100/// Adapter that bridges HTTP/2 processor to the unified HttpParser interface
101struct Http2ParserAdapter {
102    processor: http2_process::Http2Processor,
103}
104
105impl Http2ParserAdapter {
106    fn new() -> Self {
107        Self {
108            processor: http2_process::Http2Processor::new(),
109        }
110    }
111}
112
113impl HttpParser for Http2ParserAdapter {
114    fn supported_version(&self) -> crate::http::Version {
115        crate::http::Version::V20
116    }
117
118    fn can_parse(&self, data: &[u8]) -> bool {
119        self.processor.can_process_request(data) || self.processor.can_process_response(data)
120    }
121
122    fn name(&self) -> &'static str {
123        "HTTP/2"
124    }
125
126    fn parse_request(&self, data: &[u8]) -> Option<ObservableHttpRequest> {
127        self.processor.process_request(data).ok().flatten()
128    }
129
130    fn parse_response(&self, data: &[u8]) -> Option<ObservableHttpResponse> {
131        self.processor.process_response(data).ok().flatten()
132    }
133}
134
135impl Default for HttpProcessors {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141pub struct ObservableHttpPackage {
142    pub http_request: Option<ObservableHttpRequest>,
143    pub http_response: Option<ObservableHttpResponse>,
144}
145
146#[derive(Clone)]
147struct TcpData {
148    sequence: u32,
149    data: Vec<u8>,
150}
151
152pub struct TcpFlow {
153    client_ip: IpAddr,
154    server_ip: IpAddr,
155    client_port: u16,
156    server_port: u16,
157    client_data: Vec<TcpData>,
158    server_data: Vec<TcpData>,
159    client_http_parsed: bool,
160    server_http_parsed: bool,
161}
162
163/// Quick check if HTTP data is complete for parsing (supports HTTP/1.x and HTTP/2)
164fn has_complete_http_data(data: &[u8], processors: &HttpProcessors) -> bool {
165    // Strategy: Don't make early decisions about protocol due to TCP fragmentation
166    // Wait until we have enough data to make a reliable determination
167
168    if data.len() < 4 {
169        // Not enough data yet, wait for more TCP packets
170        return false;
171    }
172
173    // Try to parse with any available parser - if successful, data is complete
174    processors.parse_request(data).is_some() || processors.parse_response(data).is_some()
175}
176
177impl TcpFlow {
178    fn init(
179        src_ip: IpAddr,
180        src_port: u16,
181        dst_ip: IpAddr,
182        dst_port: u16,
183        tcp_data: TcpData,
184    ) -> TcpFlow {
185        TcpFlow {
186            client_ip: src_ip,
187            server_ip: dst_ip,
188            client_port: src_port,
189            server_port: dst_port,
190            client_data: vec![tcp_data],
191            server_data: Vec::new(),
192            client_http_parsed: false,
193            server_http_parsed: false,
194        }
195    }
196    /// Traversing all the data in sequence in the correct order to build the full data
197    ///
198    /// # Parameters
199    /// - `is_client`: If the data comes from the client.
200    fn get_full_data(&self, is_client: bool) -> Vec<u8> {
201        let data: &Vec<TcpData> = if is_client {
202            &self.client_data
203        } else {
204            &self.server_data
205        };
206
207        let mut sorted_data = data.clone();
208
209        sorted_data.sort_by_key(|tcp_data| tcp_data.sequence);
210
211        let mut full_data = Vec::new();
212        for tcp_data in sorted_data {
213            full_data.extend_from_slice(&tcp_data.data);
214        }
215        full_data
216    }
217}
218
219pub fn process_http_ipv4(
220    packet: &Ipv4Packet,
221    http_flows: &mut TtlCache<FlowKey, TcpFlow>,
222    processors: &HttpProcessors,
223) -> Result<ObservableHttpPackage, HuginnNetHttpError> {
224    if packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp {
225        return Err(HuginnNetHttpError::UnsupportedProtocol("IPv4".to_string()));
226    }
227    if let Some(tcp) = TcpPacket::new(packet.payload()) {
228        process_tcp_packet(
229            http_flows,
230            tcp,
231            IpAddr::V4(packet.get_source()),
232            IpAddr::V4(packet.get_destination()),
233            processors,
234        )
235    } else {
236        Ok(ObservableHttpPackage {
237            http_request: None,
238            http_response: None,
239        })
240    }
241}
242
243pub fn process_http_ipv6(
244    packet: &Ipv6Packet,
245    http_flows: &mut TtlCache<FlowKey, TcpFlow>,
246    processors: &HttpProcessors,
247) -> Result<ObservableHttpPackage, HuginnNetHttpError> {
248    if packet.get_next_header() != IpNextHeaderProtocols::Tcp {
249        return Err(HuginnNetHttpError::UnsupportedProtocol("IPv6".to_string()));
250    }
251    if let Some(tcp) = TcpPacket::new(packet.payload()) {
252        process_tcp_packet(
253            http_flows,
254            tcp,
255            IpAddr::V6(packet.get_source()),
256            IpAddr::V6(packet.get_destination()),
257            processors,
258        )
259    } else {
260        Ok(ObservableHttpPackage {
261            http_request: None,
262            http_response: None,
263        })
264    }
265}
266
267fn process_tcp_packet(
268    http_flows: &mut TtlCache<FlowKey, TcpFlow>,
269    tcp: TcpPacket,
270    src_ip: IpAddr,
271    dst_ip: IpAddr,
272    processors: &HttpProcessors,
273) -> Result<ObservableHttpPackage, HuginnNetHttpError> {
274    let src_port: u16 = tcp.get_source();
275    let dst_port: u16 = tcp.get_destination();
276    let mut observable_http_package = ObservableHttpPackage {
277        http_request: None,
278        http_response: None,
279    };
280
281    let flow_key: FlowKey = (src_ip, dst_ip, src_port, dst_port);
282    let (tcp_flow, is_client) = {
283        if let Some(flow) = http_flows.get_mut(&flow_key) {
284            (Some(flow), true)
285        } else {
286            let reversed_key: FlowKey = (dst_ip, src_ip, dst_port, src_port);
287            if let Some(flow) = http_flows.get_mut(&reversed_key) {
288                (Some(flow), false)
289            } else {
290                (None, false)
291            }
292        }
293    };
294
295    if let Some(flow) = tcp_flow {
296        if !tcp.payload().is_empty() {
297            let tcp_data = TcpData {
298                sequence: tcp.get_sequence(),
299                data: Vec::from(tcp.payload()),
300            };
301
302            if is_client && src_ip == flow.client_ip && src_port == flow.client_port {
303                // Only add data and parse if not already parsed
304                if !flow.client_http_parsed {
305                    flow.client_data.push(tcp_data);
306                    let full_data = flow.get_full_data(is_client);
307
308                    // Quick check before expensive parsing (supports HTTP/1.x and HTTP/2)
309                    if has_complete_http_data(&full_data, processors) {
310                        match parse_http_request(&full_data, processors) {
311                            Ok(Some(http_request_parsed)) => {
312                                observable_http_package.http_request = Some(http_request_parsed);
313                                flow.client_http_parsed = true;
314                            }
315                            Ok(None) => {}
316                            Err(_e) => {}
317                        }
318                    }
319                } else {
320                    debug!("CLIENT: HTTP already parsed, discarding additional data");
321                }
322            } else if src_ip == flow.server_ip && src_port == flow.server_port {
323                // Only add data and parse if not already parsed
324                if !flow.server_http_parsed {
325                    flow.server_data.push(tcp_data);
326                    let full_data = flow.get_full_data(is_client);
327
328                    // Quick check before expensive parsing (supports HTTP/1.x and HTTP/2)
329                    if has_complete_http_data(&full_data, processors) {
330                        match parse_http_response(&full_data, processors) {
331                            Ok(Some(http_response_parsed)) => {
332                                observable_http_package.http_response = Some(http_response_parsed);
333                                flow.server_http_parsed = true;
334                            }
335                            Ok(None) => {}
336                            Err(_e) => {}
337                        }
338                    } else {
339                        debug!("SERVER: Data not complete yet, waiting for more");
340                    }
341                } else {
342                    debug!("SERVER: HTTP already parsed, discarding additional data");
343                }
344            }
345
346            // Remove from http_flows if both request and response are parsed
347            if flow.client_http_parsed && flow.server_http_parsed {
348                debug!("Both HTTP request and response parsed, removing from http_flows early");
349                http_flows.remove(&flow_key);
350                return Ok(observable_http_package);
351            }
352
353            // Clean up on connection close
354            if tcp.get_flags()
355                & (pnet::packet::tcp::TcpFlags::FIN | pnet::packet::tcp::TcpFlags::RST)
356                != 0
357            {
358                debug!("Connection closed or reset");
359                http_flows.remove(&flow_key);
360            }
361        }
362    } else if tcp.get_flags() & pnet::packet::tcp::TcpFlags::SYN != 0 {
363        let tcp_data: TcpData = TcpData {
364            sequence: tcp.get_sequence(),
365            data: Vec::from(tcp.payload()),
366        };
367        let flow: TcpFlow = TcpFlow::init(src_ip, src_port, dst_ip, dst_port, tcp_data);
368        http_flows.insert(flow_key, flow, Duration::new(60, 0));
369    }
370
371    Ok(observable_http_package)
372}
373
374fn parse_http_request(
375    data: &[u8],
376    processors: &HttpProcessors,
377) -> Result<Option<ObservableHttpRequest>, HuginnNetHttpError> {
378    match processors.parse_request(data) {
379        Some(request) => {
380            debug!("Successfully parsed HTTP request using polymorphic parser");
381            Ok(Some(request))
382        }
383        None => {
384            debug!("No HTTP parser could handle request data");
385            Ok(None)
386        }
387    }
388}
389
390fn parse_http_response(
391    data: &[u8],
392    processors: &HttpProcessors,
393) -> Result<Option<ObservableHttpResponse>, HuginnNetHttpError> {
394    match processors.parse_response(data) {
395        Some(response) => {
396            debug!("Successfully parsed HTTP response using polymorphic parser");
397            Ok(Some(response))
398        }
399        None => {
400            debug!("No HTTP parser could handle response data");
401            Ok(None)
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use crate::http;
410    use crate::http1_process;
411
412    #[test]
413    fn test_parse_http1_request() {
414        let valid_request = b"GET / HTTP/1.1\r\n\
415        Host: example.com\r\n\
416        Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\n\
417        Accept-Language: en-US,en;q=0.9,es;q=0.8\r\n\
418        Cache-Control: max-age=0\r\n\
419        Connection: keep-alive\r\n\
420        If-Modified-Since: Thu, 17 Oct 2019 07:18:26 GMT\r\n\
421        If-None-Match: \"3147526947\"\r\n\
422        Upgrade-Insecure-Requests: 1\r\n\
423        User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36\r\n\
424        \r\n";
425        match http1_process::Http1Processor::new().process_request(valid_request) {
426            Ok(Some(request)) => {
427                assert_eq!(request.lang, Some("English".to_string()));
428                assert_eq!(request.user_agent, Some("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36".to_string()));
429                assert_eq!(request.matching.version, http::Version::V11);
430
431                let expected_horder = vec![
432                    http::Header::new("Host"),
433                    http::Header::new("Accept").with_value("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"),
434                    http::Header::new("Accept-Language").with_value("en-US,en;q=0.9,es;q=0.8"),
435                    http::Header::new("Cache-Control").optional(),
436                    http::Header::new("Connection").with_value("keep-alive"),
437                    http::Header::new("If-Modified-Since").optional(),
438                    http::Header::new("If-None-Match").optional(),
439                    http::Header::new("Upgrade-Insecure-Requests").with_value("1"),
440                    http::Header::new("User-Agent"),
441                ];
442                assert_eq!(request.matching.horder, expected_horder);
443
444                let expected_habsent = vec![
445                    http::Header::new("Accept-Encoding"),
446                    http::Header::new("Accept-Charset"),
447                    http::Header::new("Keep-Alive"),
448                ];
449                assert_eq!(request.matching.habsent, expected_habsent);
450
451                assert_eq!(request.matching.expsw, "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36");
452            }
453            Ok(None) => panic!("Incomplete HTTP request"),
454            Err(e) => panic!("Failed to parse HTTP request: {e}"),
455        }
456    }
457
458    #[test]
459    fn test_parse_http1_response() {
460        let valid_response = b"HTTP/1.1 200 OK\r\n\
461        Server: Apache\r\n\
462        Content-Type: text/html; charset=UTF-8\r\n\
463        Content-Length: 112\r\n\
464        Connection: keep-alive\r\n\
465        \r\n\
466        <html><body><h1>It works!</h1></body></html>";
467
468        match http1_process::Http1Processor::new().process_response(valid_response) {
469            Ok(Some(response)) => {
470                assert_eq!(response.matching.expsw, "Apache");
471                assert_eq!(response.matching.version, http::Version::V11);
472
473                let expected_horder = vec![
474                    http::Header::new("Server"),
475                    http::Header::new("Content-Type"),
476                    http::Header::new("Content-Length").optional(),
477                    http::Header::new("Connection").with_value("keep-alive"),
478                ];
479                assert_eq!(response.matching.horder, expected_horder);
480
481                let expected_absent = vec![
482                    http::Header::new("Keep-Alive"),
483                    http::Header::new("Accept-Ranges"),
484                    http::Header::new("Date"),
485                ];
486                assert_eq!(response.matching.habsent, expected_absent);
487            }
488            Ok(None) => panic!("Incomplete HTTP response"),
489            Err(e) => panic!("Failed to parse HTTP response: {e}"),
490        }
491    }
492
493    #[test]
494    fn test_get_diagnostic_for_empty_sw() {
495        let diagnosis: http::HttpDiagnosis = crate::http_common::get_diagnostic(None, None, None);
496        assert_eq!(diagnosis, http::HttpDiagnosis::Anonymous);
497    }
498}