1#![forbid(unsafe_code)]
2
3pub use huginn_net_db as db;
4pub use huginn_net_db::http;
5
6pub mod akamai;
7pub mod akamai_extractor;
8pub mod filter;
9pub mod http1_parser;
10pub mod http1_process;
11pub mod http2_fingerprint_extractor;
12pub mod http2_parser;
13pub mod http2_process;
14pub mod http_common;
15pub mod http_languages;
16pub mod http_process;
17pub mod packet_parser;
18pub mod raw_filter;
19
20pub mod packet_hash;
21
22pub mod display;
23pub mod error;
24pub mod observable;
25pub mod output;
26pub mod parallel;
27pub mod process;
28pub mod signature_matcher;
29
30pub use akamai::{AkamaiFingerprint, Http2Priority, PseudoHeader, SettingId, SettingParameter};
32pub use akamai_extractor::{
33 calculate_frames_bytes_consumed, extract_akamai_fingerprint,
34 extract_akamai_fingerprint_from_bytes,
35};
36pub use error::*;
37pub use filter::*;
38pub use http1_process::{
39 build_absent_headers_from_new_parser, convert_headers_to_http_format, parse_http1_request,
40 Http1Processor,
41};
42pub use http2_fingerprint_extractor::Http2FingerprintExtractor;
43pub use http2_parser::{Http2Frame, Http2FrameType, Http2Parser, HTTP2_CONNECTION_PREFACE};
44pub use http2_process::{parse_http2_request, Http2Processor};
45pub use http_common::HttpProcessor;
46pub use http_process::*;
47pub use observable::*;
48pub use output::*;
49pub use parallel::{DispatchResult, PoolStats, WorkerPool, WorkerStats};
50pub use process::*;
51pub use signature_matcher::*;
52
53use crate::packet_parser::{parse_packet, IpPacket};
54use pcap_file::pcap::PcapReader;
55use pnet::datalink::{self, Channel, Config};
56use std::fs::File;
57use std::sync::atomic::{AtomicBool, Ordering};
58use std::sync::mpsc::Sender;
59use std::sync::Arc;
60use tracing::{debug, error};
61use ttl_cache::TtlCache;
62
63#[derive(Debug, Clone)]
67pub struct ParallelConfig {
68 pub num_workers: usize,
70 pub queue_size: usize,
72 pub batch_size: usize,
75 pub timeout_ms: u64,
78}
79
80pub struct HuginnNetHttp {
85 http_flows: TtlCache<FlowKey, TcpFlow>,
86 http_processors: HttpProcessors,
87 parallel_config: Option<ParallelConfig>,
88 worker_pool: Option<Arc<WorkerPool>>,
89 database: Option<Arc<db::Database>>,
90 max_connections: usize,
91 filter_config: Option<FilterConfig>,
92}
93
94impl HuginnNetHttp {
95 pub fn new(
104 database: Option<Arc<db::Database>>,
105 max_connections: usize,
106 ) -> Result<Self, HuginnNetHttpError> {
107 Ok(Self {
108 http_flows: TtlCache::new(max_connections),
109 http_processors: HttpProcessors::new(),
110 parallel_config: None,
111 worker_pool: None,
112 database,
113 max_connections,
114 filter_config: None,
115 })
116 }
117
118 pub fn with_config(
157 database: Option<Arc<db::Database>>,
158 max_connections: usize,
159 num_workers: usize,
160 queue_size: usize,
161 batch_size: usize,
162 timeout_ms: u64,
163 ) -> Result<Self, HuginnNetHttpError> {
164 Ok(Self {
165 http_flows: TtlCache::new(max_connections),
166 http_processors: HttpProcessors::new(),
167 parallel_config: Some(ParallelConfig {
168 num_workers,
169 queue_size,
170 batch_size,
171 timeout_ms,
172 }),
173 worker_pool: None,
174 database,
175 max_connections,
176 filter_config: None,
177 })
178 }
179
180 pub fn with_filter(mut self, config: FilterConfig) -> Self {
182 self.filter_config = Some(config);
183 self
184 }
185
186 pub fn init_pool(
196 &mut self,
197 result_tx: Sender<HttpAnalysisResult>,
198 ) -> Result<(), HuginnNetHttpError> {
199 if let Some(config) = &self.parallel_config {
200 let pool = WorkerPool::new(
201 config.num_workers,
202 config.queue_size,
203 config.batch_size,
204 config.timeout_ms,
205 result_tx,
206 self.database.clone(),
207 self.max_connections,
208 self.filter_config.clone(),
209 )?;
210 self.worker_pool = Some(pool);
211 Ok(())
212 } else {
213 Err(HuginnNetHttpError::Misconfiguration(
214 "Parallel config not set. Use with_config() instead of new()".to_string(),
215 ))
216 }
217 }
218
219 pub fn worker_pool(&self) -> Option<&Arc<WorkerPool>> {
221 self.worker_pool.as_ref()
222 }
223
224 pub fn stats(&self) -> Option<PoolStats> {
226 self.worker_pool.as_ref().map(|pool| pool.stats())
227 }
228
229 fn process_with<F>(
230 &mut self,
231 packet_fn: F,
232 sender: Sender<HttpAnalysisResult>,
233 cancel_signal: Option<Arc<AtomicBool>>,
234 ) -> Result<(), HuginnNetHttpError>
235 where
236 F: FnMut() -> Option<Result<Vec<u8>, HuginnNetHttpError>>,
237 {
238 if self.worker_pool.is_some() {
239 self.process_parallel(packet_fn, cancel_signal)
240 } else {
241 self.process_sequential(packet_fn, sender, cancel_signal)
242 }
243 }
244
245 fn process_sequential<F>(
246 &mut self,
247 mut packet_fn: F,
248 sender: Sender<HttpAnalysisResult>,
249 cancel_signal: Option<Arc<AtomicBool>>,
250 ) -> Result<(), HuginnNetHttpError>
251 where
252 F: FnMut() -> Option<Result<Vec<u8>, HuginnNetHttpError>>,
253 {
254 while let Some(packet_result) = packet_fn() {
255 if let Some(ref cancel) = cancel_signal {
256 if cancel.load(Ordering::Relaxed) {
257 debug!("Cancellation signal received, stopping packet processing");
258 break;
259 }
260 }
261
262 match packet_result {
263 Ok(packet) => match self.process_packet(&packet) {
264 Ok(result) => {
265 if sender.send(result).is_err() {
266 error!("Receiver dropped, stopping packet processing");
267 break;
268 }
269 }
270 Err(http_error) => {
271 debug!("Error processing packet: {}", http_error);
272 }
273 },
274 Err(e) => {
275 error!("Failed to read packet: {}", e);
276 }
277 }
278 }
279 Ok(())
280 }
281
282 fn process_parallel<F>(
283 &mut self,
284 mut packet_fn: F,
285 cancel_signal: Option<Arc<AtomicBool>>,
286 ) -> Result<(), HuginnNetHttpError>
287 where
288 F: FnMut() -> Option<Result<Vec<u8>, HuginnNetHttpError>>,
289 {
290 let pool = self.worker_pool.as_ref().ok_or_else(|| {
291 HuginnNetHttpError::Misconfiguration("Worker pool not initialized".to_string())
292 })?;
293
294 while let Some(packet_result) = packet_fn() {
295 if let Some(ref cancel) = cancel_signal {
296 if cancel.load(Ordering::Relaxed) {
297 debug!("Cancellation signal received, stopping packet processing");
298 break;
299 }
300 }
301
302 match packet_result {
303 Ok(packet) => {
304 let _ = pool.dispatch(packet);
305 }
306 Err(e) => {
307 error!("Failed to read packet: {}", e);
308 }
309 }
310 }
311 Ok(())
312 }
313
314 pub fn analyze_network(
324 &mut self,
325 interface_name: &str,
326 sender: Sender<HttpAnalysisResult>,
327 cancel_signal: Option<Arc<AtomicBool>>,
328 ) -> Result<(), HuginnNetHttpError> {
329 let interfaces = datalink::interfaces();
330 let interface = interfaces
331 .into_iter()
332 .find(|iface| iface.name == interface_name)
333 .ok_or_else(|| {
334 HuginnNetHttpError::Parse(format!(
335 "Could not find network interface: {interface_name}"
336 ))
337 })?;
338
339 debug!("Using network interface: {}", interface.name);
340
341 let config = Config { promiscuous: true, ..Config::default() };
342
343 let (_tx, mut rx) = match datalink::channel(&interface, config) {
344 Ok(Channel::Ethernet(tx, rx)) => (tx, rx),
345 Ok(_) => return Err(HuginnNetHttpError::Parse("Unhandled channel type".to_string())),
346 Err(e) => {
347 return Err(HuginnNetHttpError::Parse(format!("Unable to create channel: {e}")))
348 }
349 };
350
351 self.process_with(
352 move || match rx.next() {
353 Ok(packet) => Some(Ok(packet.to_vec())),
354 Err(e) => {
355 Some(Err(HuginnNetHttpError::Parse(format!("Error receiving packet: {e}"))))
356 }
357 },
358 sender,
359 cancel_signal,
360 )
361 }
362
363 pub fn analyze_pcap(
373 &mut self,
374 pcap_path: &str,
375 sender: Sender<HttpAnalysisResult>,
376 cancel_signal: Option<Arc<AtomicBool>>,
377 ) -> Result<(), HuginnNetHttpError> {
378 let file = File::open(pcap_path)
379 .map_err(|e| HuginnNetHttpError::Parse(format!("Failed to open PCAP file: {e}")))?;
380 let mut pcap_reader = PcapReader::new(file)
381 .map_err(|e| HuginnNetHttpError::Parse(format!("Failed to create PCAP reader: {e}")))?;
382
383 self.process_with(
384 move || match pcap_reader.next_packet() {
385 Some(Ok(packet)) => Some(Ok(packet.data.to_vec())),
386 Some(Err(e)) => {
387 Some(Err(HuginnNetHttpError::Parse(format!("Error reading PCAP packet: {e}"))))
388 }
389 None => None,
390 },
391 sender,
392 cancel_signal,
393 )
394 }
395
396 fn process_packet(&mut self, packet: &[u8]) -> Result<HttpAnalysisResult, HuginnNetHttpError> {
404 if let Some(ref filter) = self.filter_config {
405 if !raw_filter::apply(packet, filter) {
406 debug!("Filtered out packet before parsing");
407 return Ok(HttpAnalysisResult { http_request: None, http_response: None });
408 }
409 }
410
411 let matcher = self
412 .database
413 .as_ref()
414 .map(|db| SignatureMatcher::new(db.as_ref()));
415
416 match parse_packet(packet) {
417 IpPacket::Ipv4(ipv4) => process::process_ipv4_packet(
418 &ipv4,
419 &mut self.http_flows,
420 &self.http_processors,
421 matcher.as_ref(),
422 ),
423 IpPacket::Ipv6(ipv6) => process::process_ipv6_packet(
424 &ipv6,
425 &mut self.http_flows,
426 &self.http_processors,
427 matcher.as_ref(),
428 ),
429 IpPacket::None => Ok(HttpAnalysisResult { http_request: None, http_response: None }),
430 }
431 }
432}