huginn_net_tls/
lib.rs

1pub mod error;
2pub mod filter;
3pub mod observable;
4pub mod output;
5pub mod packet_hash;
6pub mod packet_parser;
7pub mod parallel;
8pub mod process;
9pub mod raw_filter;
10pub mod tls;
11pub mod tls_client_hello_reader;
12pub mod tls_process;
13
14// Re-exports
15pub use error::*;
16pub use filter::*;
17pub use observable::*;
18pub use output::*;
19pub use parallel::{DispatchResult, PoolStats, WorkerPool, WorkerStats};
20pub use process::*;
21pub use tls::*;
22pub use tls_client_hello_reader::TlsClientHelloReader;
23pub use tls_process::{
24    parse_tls_client_hello, parse_tls_client_hello_ja4, process_tls_ipv4, process_tls_ipv6,
25};
26
27use crate::packet_parser::{parse_packet, IpPacket};
28use pcap_file::pcap::PcapReader;
29use pnet::datalink::{self, Channel, Config};
30use std::fs::File;
31use std::net::IpAddr;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::mpsc::Sender;
34use std::sync::Arc;
35use tracing::{debug, error};
36use ttl_cache::TtlCache;
37
38/// Configuration for parallel processing
39///
40/// Controls the behavior of worker threads in parallel mode.
41#[derive(Debug, Clone)]
42struct ParallelConfig {
43    /// Number of worker threads to spawn
44    num_workers: usize,
45    /// Size of packet queue per worker (affects memory usage and backpressure)
46    queue_size: usize,
47    /// Maximum packets to process in one batch before checking for new work
48    /// Higher = better throughput, lower = better latency (typical: 16-64)
49    batch_size: usize,
50    /// Worker receive timeout in milliseconds
51    /// Lower = faster shutdown, higher = better throughput (typical: 5-50)
52    timeout_ms: u64,
53}
54
55/// FlowKey: (Source IP, Destination IP, Source Port, Destination Port)
56pub type FlowKey = (IpAddr, IpAddr, u16, u16);
57
58/// A TLS-focused passive fingerprinting analyzer using JA4 methodology.
59///
60/// The `HuginnNetTls` struct handles TLS packet analysis and JA4 fingerprinting
61/// following the official FoxIO specification.
62pub struct HuginnNetTls {
63    tcp_flows: TtlCache<FlowKey, TlsClientHelloReader>,
64    parallel_config: Option<ParallelConfig>,
65    worker_pool: Option<Arc<WorkerPool>>,
66    filter_config: Option<FilterConfig>,
67    max_connections: usize,
68}
69
70impl HuginnNetTls {
71    /// Creates a new instance of `HuginnNetTls` in sequential mode (single-threaded).
72    ///
73    /// # Parameters
74    /// - `max_connections`: Maximum number of TCP flows to track
75    ///
76    /// # Returns
77    /// A new `HuginnNetTls` instance ready for TLS analysis.
78    pub fn new(max_connections: usize) -> Self {
79        Self::with_max_connections(max_connections)
80    }
81}
82
83impl HuginnNetTls {
84    /// Creates a new instance with a specified maximum number of connections.
85    ///
86    /// # Parameters
87    /// - `max_connections`: Maximum number of TCP flows to track
88    ///
89    /// # Returns
90    /// A new `HuginnNetTls` instance ready for TLS analysis.
91    pub fn with_max_connections(max_connections: usize) -> Self {
92        Self {
93            tcp_flows: TtlCache::new(max_connections),
94            parallel_config: None,
95            worker_pool: None,
96            filter_config: None,
97            max_connections,
98        }
99    }
100
101    /// Configure packet filtering (builder pattern)
102    pub fn with_filter(mut self, config: FilterConfig) -> Self {
103        self.filter_config = Some(config);
104        self
105    }
106
107    /// Creates a new instance with full parallel configuration.
108    ///
109    /// # Parameters
110    /// - `num_workers`: Number of worker threads (recommended: 2-4 on 8-core systems)
111    /// - `queue_size`: Size of packet queue per worker (typical: 100-200)
112    /// - `batch_size`: Maximum packets to process in one batch (typical: 16-64, recommended: 32)
113    /// - `timeout_ms`: Worker receive timeout in milliseconds (typical: 5-50, recommended: 10)
114    ///
115    /// # Configuration Guide
116    ///
117    /// ## batch_size
118    /// - **Low (8-16)**: Lower latency, more responsive, higher overhead
119    /// - **Medium (32)**: Balanced throughput and latency *(recommended)*
120    /// - **High (64-128)**: Maximum throughput, higher latency
121    ///
122    /// ## timeout_ms
123    /// - **Low (5-10ms)**: Fast shutdown, slightly lower throughput *(recommended: 10)*
124    /// - **Medium (20-50ms)**: Better throughput, slower shutdown
125    /// - **High (100ms+)**: Maximum throughput, slow shutdown
126    ///
127    /// # Example
128    /// ```rust,no_run
129    /// use huginn_net_tls::HuginnNetTls;
130    ///
131    /// // Balanced configuration (recommended)
132    /// let tls = HuginnNetTls::with_config(4, 100, 32, 10);
133    ///
134    /// // Low latency
135    /// let low_latency = HuginnNetTls::with_config(2, 100, 8, 5);
136    ///
137    /// // High throughput
138    /// let high_throughput = HuginnNetTls::with_config(4, 200, 64, 20);
139    /// ```
140    ///
141    /// # Returns
142    /// A new `HuginnNetTls` instance with parallel configuration.
143    pub fn with_config(
144        num_workers: usize,
145        queue_size: usize,
146        batch_size: usize,
147        timeout_ms: u64,
148    ) -> Self {
149        Self::with_config_and_max_connections(
150            num_workers,
151            queue_size,
152            batch_size,
153            timeout_ms,
154            10000,
155        )
156    }
157
158    /// Creates a new instance with full parallel configuration and max connections.
159    ///
160    /// # Parameters
161    /// - `num_workers`: Number of worker threads
162    /// - `queue_size`: Size of packet queue per worker
163    /// - `batch_size`: Maximum packets to process in one batch
164    /// - `timeout_ms`: Worker receive timeout in milliseconds
165    /// - `max_connections`: Maximum number of TCP flows to track
166    ///
167    /// # Returns
168    /// A new `HuginnNetTls` instance with parallel configuration.
169    pub fn with_config_and_max_connections(
170        num_workers: usize,
171        queue_size: usize,
172        batch_size: usize,
173        timeout_ms: u64,
174        max_connections: usize,
175    ) -> Self {
176        Self {
177            tcp_flows: TtlCache::new(max_connections),
178            parallel_config: Some(ParallelConfig {
179                num_workers,
180                queue_size,
181                batch_size,
182                timeout_ms,
183            }),
184            worker_pool: None,
185            filter_config: None,
186            max_connections,
187        }
188    }
189
190    /// Get worker pool statistics (only available in parallel mode, after analyze_* is called)
191    ///
192    /// # Returns
193    /// `Some(PoolStats)` if parallel mode is active, `None` otherwise
194    pub fn stats(&self) -> Option<PoolStats> {
195        self.worker_pool.as_ref().map(|pool| pool.stats())
196    }
197
198    /// Get a reference to the worker pool (only available in parallel mode, after analyze_* is called)
199    ///
200    /// # Returns
201    /// `Some(Arc<WorkerPool>)` if parallel mode is active, `None` otherwise
202    pub fn worker_pool(&self) -> Option<Arc<WorkerPool>> {
203        self.worker_pool.clone()
204    }
205
206    /// Initialize the worker pool (only for parallel mode, called automatically by analyze_*)
207    ///
208    /// This can be called explicitly to get the pool reference before starting analysis
209    ///
210    /// # Errors
211    /// - If the worker pool creation fails.
212    ///
213    pub fn init_pool(&mut self, sender: Sender<TlsClientOutput>) -> Result<(), HuginnNetTlsError> {
214        if let Some(config) = &self.parallel_config {
215            if self.worker_pool.is_none() {
216                let worker_pool = Arc::new(WorkerPool::new(
217                    config.num_workers,
218                    config.queue_size,
219                    config.batch_size,
220                    config.timeout_ms,
221                    sender,
222                    self.max_connections,
223                    self.filter_config.clone(),
224                )?);
225                self.worker_pool = Some(worker_pool);
226            }
227        }
228        Ok(())
229    }
230
231    fn process_with<F>(
232        &mut self,
233        packet_fn: F,
234        sender: Sender<TlsClientOutput>,
235        cancel_signal: Option<Arc<AtomicBool>>,
236    ) -> Result<(), HuginnNetTlsError>
237    where
238        F: FnMut() -> Option<Result<Vec<u8>, HuginnNetTlsError>>,
239    {
240        if self.parallel_config.is_some() {
241            self.process_parallel(packet_fn, sender, cancel_signal)
242        } else {
243            self.process_sequential(packet_fn, sender, cancel_signal)
244        }
245    }
246
247    fn process_parallel<F>(
248        &mut self,
249        mut packet_fn: F,
250        sender: Sender<TlsClientOutput>,
251        cancel_signal: Option<Arc<AtomicBool>>,
252    ) -> Result<(), HuginnNetTlsError>
253    where
254        F: FnMut() -> Option<Result<Vec<u8>, HuginnNetTlsError>>,
255    {
256        let config = self
257            .parallel_config
258            .as_ref()
259            .ok_or_else(|| HuginnNetTlsError::Parse("Parallel config not found".to_string()))?;
260
261        if self.worker_pool.is_none() {
262            let worker_pool = Arc::new(WorkerPool::new(
263                config.num_workers,
264                config.queue_size,
265                config.batch_size,
266                config.timeout_ms,
267                sender,
268                self.max_connections,
269                self.filter_config.clone(),
270            )?);
271            self.worker_pool = Some(worker_pool);
272        }
273
274        let worker_pool = self
275            .worker_pool
276            .as_ref()
277            .ok_or_else(|| HuginnNetTlsError::Parse("Worker pool not initialized".to_string()))?
278            .clone();
279
280        while let Some(packet_result) = packet_fn() {
281            if let Some(ref cancel) = cancel_signal {
282                if cancel.load(Ordering::Relaxed) {
283                    debug!("Cancellation signal received, stopping packet processing");
284                    break;
285                }
286            }
287
288            match packet_result {
289                Ok(packet) => {
290                    let _ = worker_pool.dispatch(packet);
291                }
292                Err(e) => {
293                    error!("Failed to read packet: {e}");
294                }
295            }
296        }
297
298        Ok(())
299    }
300
301    fn process_sequential<F>(
302        &mut self,
303        mut packet_fn: F,
304        sender: Sender<TlsClientOutput>,
305        cancel_signal: Option<Arc<AtomicBool>>,
306    ) -> Result<(), HuginnNetTlsError>
307    where
308        F: FnMut() -> Option<Result<Vec<u8>, HuginnNetTlsError>>,
309    {
310        while let Some(packet_result) = packet_fn() {
311            if let Some(ref cancel) = cancel_signal {
312                if cancel.load(Ordering::Relaxed) {
313                    debug!("Cancellation signal received, stopping packet processing");
314                    break;
315                }
316            }
317
318            match packet_result {
319                Ok(packet) => match self.process_packet(&packet) {
320                    Ok(Some(result)) => {
321                        if sender.send(result).is_err() {
322                            error!("Receiver dropped, stopping packet processing");
323                            break;
324                        }
325                    }
326                    Ok(None) => {
327                        debug!("No TLS found, continuing packet processing");
328                    }
329                    Err(tls_error) => {
330                        debug!("Error processing packet: {tls_error}");
331                    }
332                },
333                Err(e) => {
334                    error!("Failed to read packet: {e}");
335                }
336            }
337        }
338        Ok(())
339    }
340
341    /// Captures and analyzes packets on the specified network interface.
342    ///
343    /// Sends `TlsClientOutput` through the provided channel.
344    ///
345    /// # Parameters
346    /// - `interface_name`: The name of the network interface to analyze.
347    /// - `sender`: A `Sender` to send `TlsClientOutput` objects back to the caller.
348    /// - `cancel_signal`: Optional `Arc<AtomicBool>` to signal graceful shutdown.
349    ///
350    /// # Errors
351    /// - If the network interface cannot be found or a channel cannot be created.
352    pub fn analyze_network(
353        &mut self,
354        interface_name: &str,
355        sender: Sender<TlsClientOutput>,
356        cancel_signal: Option<Arc<AtomicBool>>,
357    ) -> Result<(), HuginnNetTlsError> {
358        let interfaces = datalink::interfaces();
359        let interface = interfaces
360            .into_iter()
361            .find(|iface| iface.name == interface_name)
362            .ok_or_else(|| {
363                HuginnNetTlsError::Parse(format!(
364                    "Could not find network interface: {interface_name}"
365                ))
366            })?;
367
368        debug!("Using network interface: {}", interface.name);
369
370        let config = Config { promiscuous: true, ..Config::default() };
371
372        let (_tx, mut rx) = match datalink::channel(&interface, config) {
373            Ok(Channel::Ethernet(tx, rx)) => (tx, rx),
374            Ok(_) => return Err(HuginnNetTlsError::Parse("Unhandled channel type".to_string())),
375            Err(e) => {
376                return Err(HuginnNetTlsError::Parse(format!("Unable to create channel: {e}")))
377            }
378        };
379
380        self.process_with(
381            move || match rx.next() {
382                Ok(packet) => Some(Ok(packet.to_vec())),
383                Err(e) => {
384                    Some(Err(HuginnNetTlsError::Parse(format!("Error receiving packet: {e}"))))
385                }
386            },
387            sender,
388            cancel_signal,
389        )
390    }
391
392    /// Analyzes packets from a PCAP file.
393    ///
394    /// # Parameters
395    /// - `pcap_path`: The path to the PCAP file to analyze.
396    /// - `sender`: A `Sender` to send `TlsClientOutput` objects back to the caller.
397    /// - `cancel_signal`: Optional `Arc<AtomicBool>` to signal graceful shutdown.
398    ///
399    /// # Errors
400    /// - If the PCAP file cannot be opened or read.
401    pub fn analyze_pcap(
402        &mut self,
403        pcap_path: &str,
404        sender: Sender<TlsClientOutput>,
405        cancel_signal: Option<Arc<AtomicBool>>,
406    ) -> Result<(), HuginnNetTlsError> {
407        let file = File::open(pcap_path)
408            .map_err(|e| HuginnNetTlsError::Parse(format!("Failed to open PCAP file: {e}")))?;
409        let mut pcap_reader = PcapReader::new(file)
410            .map_err(|e| HuginnNetTlsError::Parse(format!("Failed to create PCAP reader: {e}")))?;
411
412        self.process_with(
413            move || match pcap_reader.next_packet() {
414                Some(Ok(packet)) => Some(Ok(packet.data.to_vec())),
415                Some(Err(e)) => {
416                    Some(Err(HuginnNetTlsError::Parse(format!("Error reading PCAP packet: {e}"))))
417                }
418                None => None,
419            },
420            sender,
421            cancel_signal,
422        )
423    }
424
425    /// Processes a single packet and extracts TLS information if present.
426    ///
427    /// # Parameters
428    /// - `packet`: The raw packet data.
429    ///
430    /// # Returns
431    /// A `Result` containing an optional `TlsClientOutput` or an error.
432    fn process_packet(
433        &mut self,
434        packet: &[u8],
435    ) -> Result<Option<TlsClientOutput>, HuginnNetTlsError> {
436        if let Some(ref filter) = self.filter_config {
437            if !raw_filter::apply(packet, filter) {
438                debug!("Filtered out packet before parsing");
439                return Ok(None);
440            }
441        }
442
443        match parse_packet(packet) {
444            IpPacket::Ipv4(ipv4) => process_ipv4_packet(&ipv4, &mut self.tcp_flows),
445            IpPacket::Ipv6(ipv6) => process_ipv6_packet(&ipv6, &mut self.tcp_flows),
446            IpPacket::None => Ok(None),
447        }
448    }
449}