huginn_net_http/
lib.rs

1#![forbid(unsafe_code)]
2
3pub use huginn_net_db as db;
4pub use huginn_net_db::http;
5
6pub mod akamai;
7pub mod akamai_extractor;
8pub mod filter;
9pub mod http1_parser;
10pub mod http1_process;
11pub mod http2_fingerprint_extractor;
12pub mod http2_parser;
13pub mod http2_process;
14pub mod http_common;
15pub mod http_languages;
16pub mod http_process;
17pub mod packet_parser;
18pub mod raw_filter;
19
20pub mod packet_hash;
21
22pub mod display;
23pub mod error;
24pub mod observable;
25pub mod output;
26pub mod parallel;
27pub mod process;
28pub mod signature_matcher;
29
30// Re-exports
31pub use akamai::{AkamaiFingerprint, Http2Priority, PseudoHeader, SettingId, SettingParameter};
32pub use akamai_extractor::{
33    calculate_frames_bytes_consumed, extract_akamai_fingerprint,
34    extract_akamai_fingerprint_from_bytes,
35};
36pub use error::*;
37pub use filter::*;
38pub use http1_process::{
39    build_absent_headers_from_new_parser, convert_headers_to_http_format, parse_http1_request,
40    Http1Processor,
41};
42pub use http2_fingerprint_extractor::Http2FingerprintExtractor;
43pub use http2_parser::{Http2Frame, Http2FrameType, Http2Parser, HTTP2_CONNECTION_PREFACE};
44pub use http2_process::{parse_http2_request, Http2Processor};
45pub use http_common::HttpProcessor;
46pub use http_process::*;
47pub use observable::*;
48pub use output::*;
49pub use parallel::{DispatchResult, PoolStats, WorkerPool, WorkerStats};
50pub use process::*;
51pub use signature_matcher::*;
52
53use crate::packet_parser::{parse_packet, IpPacket};
54use pcap_file::pcap::PcapReader;
55use pnet::datalink::{self, Channel, Config};
56use std::fs::File;
57use std::sync::atomic::{AtomicBool, Ordering};
58use std::sync::mpsc::Sender;
59use std::sync::Arc;
60use tracing::{debug, error};
61use ttl_cache::TtlCache;
62
63/// Configuration for parallel processing
64///
65/// Controls the behavior of worker threads in parallel mode.
66#[derive(Debug, Clone)]
67pub struct ParallelConfig {
68    /// Number of worker threads to spawn
69    pub num_workers: usize,
70    /// Size of packet queue per worker (affects memory usage and backpressure)
71    pub queue_size: usize,
72    /// Maximum packets to process in one batch before checking for new work
73    /// Higher = better throughput, lower = better latency (typical: 8-32)
74    pub batch_size: usize,
75    /// Worker receive timeout in milliseconds
76    /// Lower = faster shutdown, higher = better throughput (typical: 5-20)
77    pub timeout_ms: u64,
78}
79
80/// An HTTP-focused passive fingerprinting analyzer.
81///
82/// The `HuginnNetHttp` struct handles HTTP packet analysis for browser fingerprinting,
83/// web server detection, and HTTP protocol analysis using p0f-style methodologies.
84pub struct HuginnNetHttp {
85    http_flows: TtlCache<FlowKey, TcpFlow>,
86    http_processors: HttpProcessors,
87    parallel_config: Option<ParallelConfig>,
88    worker_pool: Option<Arc<WorkerPool>>,
89    database: Option<Arc<db::Database>>,
90    max_connections: usize,
91    filter_config: Option<FilterConfig>,
92}
93
94impl HuginnNetHttp {
95    /// Creates a new instance of `HuginnNetHttp` in sequential mode.
96    ///
97    /// # Parameters
98    /// - `database`: Optional signature database for HTTP matching
99    /// - `max_connections`: Maximum number of HTTP flows to track
100    ///
101    /// # Returns
102    /// A new `HuginnNetHttp` instance ready for HTTP analysis.
103    pub fn new(
104        database: Option<Arc<db::Database>>,
105        max_connections: usize,
106    ) -> Result<Self, HuginnNetHttpError> {
107        Ok(Self {
108            http_flows: TtlCache::new(max_connections),
109            http_processors: HttpProcessors::new(),
110            parallel_config: None,
111            worker_pool: None,
112            database,
113            max_connections,
114            filter_config: None,
115        })
116    }
117
118    /// Creates a new instance of `HuginnNetHttp` with full parallel configuration.
119    ///
120    /// # Parameters
121    /// - `database`: Optional signature database for HTTP matching
122    /// - `max_connections`: Maximum number of HTTP flows to track per worker (typical: 1000-10000)
123    /// - `num_workers`: Number of worker threads (recommended: 2 for HTTP due to flow tracking)
124    /// - `queue_size`: Size of each worker's packet queue (typical: 100-200)
125    /// - `batch_size`: Maximum packets to process in one batch (typical: 8-32, recommended: 16)
126    /// - `timeout_ms`: Worker receive timeout in milliseconds (typical: 5-20, recommended: 10)
127    ///
128    /// # Configuration Guide
129    ///
130    /// ## batch_size
131    /// - **Low (8)**: Lower latency, more responsive, higher overhead
132    /// - **Medium (16)**: Balanced throughput and latency *(recommended)*
133    /// - **High (32-64)**: Maximum throughput, higher latency
134    ///
135    /// ## timeout_ms
136    /// - **Low (5-10ms)**: Fast shutdown, slightly lower throughput *(recommended: 10)*
137    /// - **Medium (15-20ms)**: Better throughput, slower shutdown
138    /// - **High (50ms+)**: Maximum throughput, slow shutdown
139    ///
140    /// # Example
141    /// ```rust,no_run
142    /// use huginn_net_http::HuginnNetHttp;
143    ///
144    /// // Balanced configuration (recommended for HTTP)
145    /// let http = HuginnNetHttp::with_config(None, 1000, 2, 100, 16, 10);
146    ///
147    /// // Low latency
148    /// let low_latency = HuginnNetHttp::with_config(None, 1000, 2, 100, 8, 5);
149    ///
150    /// // High throughput
151    /// let high_throughput = HuginnNetHttp::with_config(None, 5000, 2, 200, 32, 15);
152    /// ```
153    ///
154    /// # Returns
155    /// A new `HuginnNetHttp` instance configured for parallel processing.
156    pub fn with_config(
157        database: Option<Arc<db::Database>>,
158        max_connections: usize,
159        num_workers: usize,
160        queue_size: usize,
161        batch_size: usize,
162        timeout_ms: u64,
163    ) -> Result<Self, HuginnNetHttpError> {
164        Ok(Self {
165            http_flows: TtlCache::new(max_connections),
166            http_processors: HttpProcessors::new(),
167            parallel_config: Some(ParallelConfig {
168                num_workers,
169                queue_size,
170                batch_size,
171                timeout_ms,
172            }),
173            worker_pool: None,
174            database,
175            max_connections,
176            filter_config: None,
177        })
178    }
179
180    /// Configure packet filtering (builder pattern)
181    pub fn with_filter(mut self, config: FilterConfig) -> Self {
182        self.filter_config = Some(config);
183        self
184    }
185
186    /// Initializes the worker pool for parallel processing.
187    ///
188    /// Must be called after `with_config` and before calling `analyze_network` or `analyze_pcap`.
189    ///
190    /// # Parameters
191    /// - `result_tx`: Channel sender for analysis results
192    ///
193    /// # Returns
194    /// `Ok(())` if pool initialized successfully, error otherwise.
195    pub fn init_pool(
196        &mut self,
197        result_tx: Sender<HttpAnalysisResult>,
198    ) -> Result<(), HuginnNetHttpError> {
199        if let Some(config) = &self.parallel_config {
200            let pool = WorkerPool::new(
201                config.num_workers,
202                config.queue_size,
203                config.batch_size,
204                config.timeout_ms,
205                result_tx,
206                self.database.clone(),
207                self.max_connections,
208                self.filter_config.clone(),
209            )?;
210            self.worker_pool = Some(pool);
211            Ok(())
212        } else {
213            Err(HuginnNetHttpError::Misconfiguration(
214                "Parallel config not set. Use with_config() instead of new()".to_string(),
215            ))
216        }
217    }
218
219    /// Returns a reference to the worker pool if initialized.
220    pub fn worker_pool(&self) -> Option<&Arc<WorkerPool>> {
221        self.worker_pool.as_ref()
222    }
223
224    /// Returns current worker pool statistics if parallel mode is active.
225    pub fn stats(&self) -> Option<PoolStats> {
226        self.worker_pool.as_ref().map(|pool| pool.stats())
227    }
228
229    fn process_with<F>(
230        &mut self,
231        packet_fn: F,
232        sender: Sender<HttpAnalysisResult>,
233        cancel_signal: Option<Arc<AtomicBool>>,
234    ) -> Result<(), HuginnNetHttpError>
235    where
236        F: FnMut() -> Option<Result<Vec<u8>, HuginnNetHttpError>>,
237    {
238        if self.worker_pool.is_some() {
239            self.process_parallel(packet_fn, cancel_signal)
240        } else {
241            self.process_sequential(packet_fn, sender, cancel_signal)
242        }
243    }
244
245    fn process_sequential<F>(
246        &mut self,
247        mut packet_fn: F,
248        sender: Sender<HttpAnalysisResult>,
249        cancel_signal: Option<Arc<AtomicBool>>,
250    ) -> Result<(), HuginnNetHttpError>
251    where
252        F: FnMut() -> Option<Result<Vec<u8>, HuginnNetHttpError>>,
253    {
254        while let Some(packet_result) = packet_fn() {
255            if let Some(ref cancel) = cancel_signal {
256                if cancel.load(Ordering::Relaxed) {
257                    debug!("Cancellation signal received, stopping packet processing");
258                    break;
259                }
260            }
261
262            match packet_result {
263                Ok(packet) => match self.process_packet(&packet) {
264                    Ok(result) => {
265                        if sender.send(result).is_err() {
266                            error!("Receiver dropped, stopping packet processing");
267                            break;
268                        }
269                    }
270                    Err(http_error) => {
271                        debug!("Error processing packet: {}", http_error);
272                    }
273                },
274                Err(e) => {
275                    error!("Failed to read packet: {}", e);
276                }
277            }
278        }
279        Ok(())
280    }
281
282    fn process_parallel<F>(
283        &mut self,
284        mut packet_fn: F,
285        cancel_signal: Option<Arc<AtomicBool>>,
286    ) -> Result<(), HuginnNetHttpError>
287    where
288        F: FnMut() -> Option<Result<Vec<u8>, HuginnNetHttpError>>,
289    {
290        let pool = self.worker_pool.as_ref().ok_or_else(|| {
291            HuginnNetHttpError::Misconfiguration("Worker pool not initialized".to_string())
292        })?;
293
294        while let Some(packet_result) = packet_fn() {
295            if let Some(ref cancel) = cancel_signal {
296                if cancel.load(Ordering::Relaxed) {
297                    debug!("Cancellation signal received, stopping packet processing");
298                    break;
299                }
300            }
301
302            match packet_result {
303                Ok(packet) => {
304                    let _ = pool.dispatch(packet);
305                }
306                Err(e) => {
307                    error!("Failed to read packet: {}", e);
308                }
309            }
310        }
311        Ok(())
312    }
313
314    /// Analyzes network traffic from a live network interface for HTTP packets.
315    ///
316    /// # Parameters
317    /// - `interface_name`: The name of the network interface to capture from.
318    /// - `sender`: A channel sender to send analysis results.
319    /// - `cancel_signal`: Optional atomic boolean to signal cancellation.
320    ///
321    /// # Returns
322    /// A `Result` indicating success or failure.
323    pub fn analyze_network(
324        &mut self,
325        interface_name: &str,
326        sender: Sender<HttpAnalysisResult>,
327        cancel_signal: Option<Arc<AtomicBool>>,
328    ) -> Result<(), HuginnNetHttpError> {
329        let interfaces = datalink::interfaces();
330        let interface = interfaces
331            .into_iter()
332            .find(|iface| iface.name == interface_name)
333            .ok_or_else(|| {
334                HuginnNetHttpError::Parse(format!(
335                    "Could not find network interface: {interface_name}"
336                ))
337            })?;
338
339        debug!("Using network interface: {}", interface.name);
340
341        let config = Config { promiscuous: true, ..Config::default() };
342
343        let (_tx, mut rx) = match datalink::channel(&interface, config) {
344            Ok(Channel::Ethernet(tx, rx)) => (tx, rx),
345            Ok(_) => return Err(HuginnNetHttpError::Parse("Unhandled channel type".to_string())),
346            Err(e) => {
347                return Err(HuginnNetHttpError::Parse(format!("Unable to create channel: {e}")))
348            }
349        };
350
351        self.process_with(
352            move || match rx.next() {
353                Ok(packet) => Some(Ok(packet.to_vec())),
354                Err(e) => {
355                    Some(Err(HuginnNetHttpError::Parse(format!("Error receiving packet: {e}"))))
356                }
357            },
358            sender,
359            cancel_signal,
360        )
361    }
362
363    /// Analyzes HTTP packets from a PCAP file.
364    ///
365    /// # Parameters
366    /// - `pcap_path`: Path to the PCAP file to analyze.
367    /// - `sender`: A channel sender to send analysis results.
368    /// - `cancel_signal`: Optional atomic boolean to signal cancellation.
369    ///
370    /// # Returns
371    /// A `Result` indicating success or failure.
372    pub fn analyze_pcap(
373        &mut self,
374        pcap_path: &str,
375        sender: Sender<HttpAnalysisResult>,
376        cancel_signal: Option<Arc<AtomicBool>>,
377    ) -> Result<(), HuginnNetHttpError> {
378        let file = File::open(pcap_path)
379            .map_err(|e| HuginnNetHttpError::Parse(format!("Failed to open PCAP file: {e}")))?;
380        let mut pcap_reader = PcapReader::new(file)
381            .map_err(|e| HuginnNetHttpError::Parse(format!("Failed to create PCAP reader: {e}")))?;
382
383        self.process_with(
384            move || match pcap_reader.next_packet() {
385                Some(Ok(packet)) => Some(Ok(packet.data.to_vec())),
386                Some(Err(e)) => {
387                    Some(Err(HuginnNetHttpError::Parse(format!("Error reading PCAP packet: {e}"))))
388                }
389                None => None,
390            },
391            sender,
392            cancel_signal,
393        )
394    }
395
396    /// Processes a single packet and extracts HTTP information if present.
397    ///
398    /// # Parameters
399    /// - `packet`: The raw packet data.
400    ///
401    /// # Returns
402    /// A `Result` containing an `HttpAnalysisResult` or an error.
403    fn process_packet(&mut self, packet: &[u8]) -> Result<HttpAnalysisResult, HuginnNetHttpError> {
404        if let Some(ref filter) = self.filter_config {
405            if !raw_filter::apply(packet, filter) {
406                debug!("Filtered out packet before parsing");
407                return Ok(HttpAnalysisResult { http_request: None, http_response: None });
408            }
409        }
410
411        let matcher = self
412            .database
413            .as_ref()
414            .map(|db| SignatureMatcher::new(db.as_ref()));
415
416        match parse_packet(packet) {
417            IpPacket::Ipv4(ipv4) => process::process_ipv4_packet(
418                &ipv4,
419                &mut self.http_flows,
420                &self.http_processors,
421                matcher.as_ref(),
422            ),
423            IpPacket::Ipv6(ipv6) => process::process_ipv6_packet(
424                &ipv6,
425                &mut self.http_flows,
426                &self.http_processors,
427                matcher.as_ref(),
428            ),
429            IpPacket::None => Ok(HttpAnalysisResult { http_request: None, http_response: None }),
430        }
431    }
432}