1pub mod error;
2pub mod filter;
3pub mod observable;
4pub mod output;
5pub mod packet_hash;
6pub mod packet_parser;
7pub mod parallel;
8pub mod process;
9pub mod raw_filter;
10pub mod tls;
11pub mod tls_client_hello_reader;
12pub mod tls_process;
13
14pub use error::*;
16pub use filter::*;
17pub use observable::*;
18pub use output::*;
19pub use parallel::{DispatchResult, PoolStats, WorkerPool, WorkerStats};
20pub use process::*;
21pub use tls::*;
22pub use tls_client_hello_reader::TlsClientHelloReader;
23pub use tls_process::{
24 parse_tls_client_hello, parse_tls_client_hello_ja4, process_tls_ipv4, process_tls_ipv6,
25};
26
27use crate::packet_parser::{parse_packet, IpPacket};
28use pcap_file::pcap::PcapReader;
29use pnet::datalink::{self, Channel, Config};
30use std::fs::File;
31use std::net::IpAddr;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::mpsc::Sender;
34use std::sync::Arc;
35use tracing::{debug, error};
36use ttl_cache::TtlCache;
37
38#[derive(Debug, Clone)]
42struct ParallelConfig {
43 num_workers: usize,
45 queue_size: usize,
47 batch_size: usize,
50 timeout_ms: u64,
53}
54
55pub type FlowKey = (IpAddr, IpAddr, u16, u16);
57
58pub struct HuginnNetTls {
63 tcp_flows: TtlCache<FlowKey, TlsClientHelloReader>,
64 parallel_config: Option<ParallelConfig>,
65 worker_pool: Option<Arc<WorkerPool>>,
66 filter_config: Option<FilterConfig>,
67 max_connections: usize,
68}
69
70impl HuginnNetTls {
71 pub fn new(max_connections: usize) -> Self {
79 Self::with_max_connections(max_connections)
80 }
81}
82
83impl HuginnNetTls {
84 pub fn with_max_connections(max_connections: usize) -> Self {
92 Self {
93 tcp_flows: TtlCache::new(max_connections),
94 parallel_config: None,
95 worker_pool: None,
96 filter_config: None,
97 max_connections,
98 }
99 }
100
101 pub fn with_filter(mut self, config: FilterConfig) -> Self {
103 self.filter_config = Some(config);
104 self
105 }
106
107 pub fn with_config(
144 num_workers: usize,
145 queue_size: usize,
146 batch_size: usize,
147 timeout_ms: u64,
148 ) -> Self {
149 Self::with_config_and_max_connections(
150 num_workers,
151 queue_size,
152 batch_size,
153 timeout_ms,
154 10000,
155 )
156 }
157
158 pub fn with_config_and_max_connections(
170 num_workers: usize,
171 queue_size: usize,
172 batch_size: usize,
173 timeout_ms: u64,
174 max_connections: usize,
175 ) -> Self {
176 Self {
177 tcp_flows: TtlCache::new(max_connections),
178 parallel_config: Some(ParallelConfig {
179 num_workers,
180 queue_size,
181 batch_size,
182 timeout_ms,
183 }),
184 worker_pool: None,
185 filter_config: None,
186 max_connections,
187 }
188 }
189
190 pub fn stats(&self) -> Option<PoolStats> {
195 self.worker_pool.as_ref().map(|pool| pool.stats())
196 }
197
198 pub fn worker_pool(&self) -> Option<Arc<WorkerPool>> {
203 self.worker_pool.clone()
204 }
205
206 pub fn init_pool(&mut self, sender: Sender<TlsClientOutput>) -> Result<(), HuginnNetTlsError> {
214 if let Some(config) = &self.parallel_config {
215 if self.worker_pool.is_none() {
216 let worker_pool = Arc::new(WorkerPool::new(
217 config.num_workers,
218 config.queue_size,
219 config.batch_size,
220 config.timeout_ms,
221 sender,
222 self.max_connections,
223 self.filter_config.clone(),
224 )?);
225 self.worker_pool = Some(worker_pool);
226 }
227 }
228 Ok(())
229 }
230
231 fn process_with<F>(
232 &mut self,
233 packet_fn: F,
234 sender: Sender<TlsClientOutput>,
235 cancel_signal: Option<Arc<AtomicBool>>,
236 ) -> Result<(), HuginnNetTlsError>
237 where
238 F: FnMut() -> Option<Result<Vec<u8>, HuginnNetTlsError>>,
239 {
240 if self.parallel_config.is_some() {
241 self.process_parallel(packet_fn, sender, cancel_signal)
242 } else {
243 self.process_sequential(packet_fn, sender, cancel_signal)
244 }
245 }
246
247 fn process_parallel<F>(
248 &mut self,
249 mut packet_fn: F,
250 sender: Sender<TlsClientOutput>,
251 cancel_signal: Option<Arc<AtomicBool>>,
252 ) -> Result<(), HuginnNetTlsError>
253 where
254 F: FnMut() -> Option<Result<Vec<u8>, HuginnNetTlsError>>,
255 {
256 let config = self
257 .parallel_config
258 .as_ref()
259 .ok_or_else(|| HuginnNetTlsError::Parse("Parallel config not found".to_string()))?;
260
261 if self.worker_pool.is_none() {
262 let worker_pool = Arc::new(WorkerPool::new(
263 config.num_workers,
264 config.queue_size,
265 config.batch_size,
266 config.timeout_ms,
267 sender,
268 self.max_connections,
269 self.filter_config.clone(),
270 )?);
271 self.worker_pool = Some(worker_pool);
272 }
273
274 let worker_pool = self
275 .worker_pool
276 .as_ref()
277 .ok_or_else(|| HuginnNetTlsError::Parse("Worker pool not initialized".to_string()))?
278 .clone();
279
280 while let Some(packet_result) = packet_fn() {
281 if let Some(ref cancel) = cancel_signal {
282 if cancel.load(Ordering::Relaxed) {
283 debug!("Cancellation signal received, stopping packet processing");
284 break;
285 }
286 }
287
288 match packet_result {
289 Ok(packet) => {
290 let _ = worker_pool.dispatch(packet);
291 }
292 Err(e) => {
293 error!("Failed to read packet: {e}");
294 }
295 }
296 }
297
298 Ok(())
299 }
300
301 fn process_sequential<F>(
302 &mut self,
303 mut packet_fn: F,
304 sender: Sender<TlsClientOutput>,
305 cancel_signal: Option<Arc<AtomicBool>>,
306 ) -> Result<(), HuginnNetTlsError>
307 where
308 F: FnMut() -> Option<Result<Vec<u8>, HuginnNetTlsError>>,
309 {
310 while let Some(packet_result) = packet_fn() {
311 if let Some(ref cancel) = cancel_signal {
312 if cancel.load(Ordering::Relaxed) {
313 debug!("Cancellation signal received, stopping packet processing");
314 break;
315 }
316 }
317
318 match packet_result {
319 Ok(packet) => match self.process_packet(&packet) {
320 Ok(Some(result)) => {
321 if sender.send(result).is_err() {
322 error!("Receiver dropped, stopping packet processing");
323 break;
324 }
325 }
326 Ok(None) => {
327 debug!("No TLS found, continuing packet processing");
328 }
329 Err(tls_error) => {
330 debug!("Error processing packet: {tls_error}");
331 }
332 },
333 Err(e) => {
334 error!("Failed to read packet: {e}");
335 }
336 }
337 }
338 Ok(())
339 }
340
341 pub fn analyze_network(
353 &mut self,
354 interface_name: &str,
355 sender: Sender<TlsClientOutput>,
356 cancel_signal: Option<Arc<AtomicBool>>,
357 ) -> Result<(), HuginnNetTlsError> {
358 let interfaces = datalink::interfaces();
359 let interface = interfaces
360 .into_iter()
361 .find(|iface| iface.name == interface_name)
362 .ok_or_else(|| {
363 HuginnNetTlsError::Parse(format!(
364 "Could not find network interface: {interface_name}"
365 ))
366 })?;
367
368 debug!("Using network interface: {}", interface.name);
369
370 let config = Config { promiscuous: true, ..Config::default() };
371
372 let (_tx, mut rx) = match datalink::channel(&interface, config) {
373 Ok(Channel::Ethernet(tx, rx)) => (tx, rx),
374 Ok(_) => return Err(HuginnNetTlsError::Parse("Unhandled channel type".to_string())),
375 Err(e) => {
376 return Err(HuginnNetTlsError::Parse(format!("Unable to create channel: {e}")))
377 }
378 };
379
380 self.process_with(
381 move || match rx.next() {
382 Ok(packet) => Some(Ok(packet.to_vec())),
383 Err(e) => {
384 Some(Err(HuginnNetTlsError::Parse(format!("Error receiving packet: {e}"))))
385 }
386 },
387 sender,
388 cancel_signal,
389 )
390 }
391
392 pub fn analyze_pcap(
402 &mut self,
403 pcap_path: &str,
404 sender: Sender<TlsClientOutput>,
405 cancel_signal: Option<Arc<AtomicBool>>,
406 ) -> Result<(), HuginnNetTlsError> {
407 let file = File::open(pcap_path)
408 .map_err(|e| HuginnNetTlsError::Parse(format!("Failed to open PCAP file: {e}")))?;
409 let mut pcap_reader = PcapReader::new(file)
410 .map_err(|e| HuginnNetTlsError::Parse(format!("Failed to create PCAP reader: {e}")))?;
411
412 self.process_with(
413 move || match pcap_reader.next_packet() {
414 Some(Ok(packet)) => Some(Ok(packet.data.to_vec())),
415 Some(Err(e)) => {
416 Some(Err(HuginnNetTlsError::Parse(format!("Error reading PCAP packet: {e}"))))
417 }
418 None => None,
419 },
420 sender,
421 cancel_signal,
422 )
423 }
424
425 fn process_packet(
433 &mut self,
434 packet: &[u8],
435 ) -> Result<Option<TlsClientOutput>, HuginnNetTlsError> {
436 if let Some(ref filter) = self.filter_config {
437 if !raw_filter::apply(packet, filter) {
438 debug!("Filtered out packet before parsing");
439 return Ok(None);
440 }
441 }
442
443 match parse_packet(packet) {
444 IpPacket::Ipv4(ipv4) => process_ipv4_packet(&ipv4, &mut self.tcp_flows),
445 IpPacket::Ipv6(ipv6) => process_ipv6_packet(&ipv6, &mut self.tcp_flows),
446 IpPacket::None => Ok(None),
447 }
448 }
449}