huginn_net_http/
parallel.rs

1//! Parallel processing support for HTTP analysis using worker pool architecture.
2//!
3//! This module provides multi-threaded packet processing with hash-based worker assignment
4//! to maintain HTTP flow consistency (request/response tracking). Unlike TCP which hashes
5//! only the source IP, HTTP hashes the complete flow (src_ip, dst_ip, src_port, dst_port)
6//! to ensure requests and responses from the same connection are processed by the same worker.
7
8use crate::error::HuginnNetHttpError;
9use crate::filter::FilterConfig;
10use crate::http_process::{FlowKey, HttpProcessors, TcpFlow};
11use crate::packet_hash;
12use crate::raw_filter;
13use crate::{HttpAnalysisResult, SignatureMatcher};
14use crossbeam_channel::{bounded, Sender};
15use huginn_net_db as db;
16use std::fmt;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use tracing::debug;
21use ttl_cache::TtlCache;
22
23/// Worker configuration parameters
24struct WorkerConfig {
25    batch_size: usize,
26    timeout_ms: u64,
27    max_connections: usize,
28}
29
30/// Worker pool for parallel HTTP packet processing.
31pub struct WorkerPool {
32    packet_senders: Arc<Vec<Sender<Vec<u8>>>>,
33    result_sender: Arc<Mutex<Option<std::sync::mpsc::Sender<HttpAnalysisResult>>>>,
34    shutdown_flag: Arc<AtomicBool>,
35    dispatched_count: Arc<AtomicU64>,
36    dropped_count: Arc<AtomicU64>,
37    worker_dropped: Vec<Arc<AtomicU64>>,
38    num_workers: usize,
39    pub batch_size: usize,
40    pub timeout_ms: u64,
41}
42
43/// Statistics for a single worker thread.
44#[derive(Debug, Clone)]
45pub struct WorkerStats {
46    pub id: usize,
47    pub queue_size: usize,
48    pub dropped: u64,
49}
50
51/// Pool-level statistics.
52#[derive(Debug, Clone)]
53pub struct PoolStats {
54    pub total_dispatched: u64,
55    pub total_dropped: u64,
56    pub workers: Vec<WorkerStats>,
57}
58
59/// Result of dispatching a packet to a worker.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum DispatchResult {
62    /// Packet successfully queued for processing
63    Queued,
64    /// Worker queue full, packet dropped
65    Dropped,
66}
67
68impl WorkerPool {
69    /// Creates a new worker pool for HTTP analysis.
70    ///
71    /// # Parameters
72    /// - `num_workers`: Number of worker threads
73    /// - `queue_size`: Size of each worker's packet queue
74    /// - `batch_size`: Maximum packets to process in one batch
75    /// - `timeout_ms`: Worker receive timeout in milliseconds
76    /// - `result_sender`: Channel to send analysis results
77    /// - `database`: Optional signature database for matching
78    /// - `max_connections`: Maximum HTTP flows to track per worker
79    /// - `filter_config`: Optional filter configuration for packet filtering
80    ///
81    /// # Returns
82    /// A new `WorkerPool` or an error if creation fails.
83    #[allow(clippy::too_many_arguments)]
84    pub fn new(
85        num_workers: usize,
86        queue_size: usize,
87        batch_size: usize,
88        timeout_ms: u64,
89        result_sender: std::sync::mpsc::Sender<HttpAnalysisResult>,
90        database: Option<Arc<db::Database>>,
91        max_connections: usize,
92        filter_config: Option<FilterConfig>,
93    ) -> Result<Arc<Self>, HuginnNetHttpError> {
94        if num_workers == 0 {
95            return Err(HuginnNetHttpError::Misconfiguration(
96                "Worker count must be at least 1".to_string(),
97            ));
98        }
99
100        let mut packet_senders = Vec::with_capacity(num_workers);
101        let mut worker_dropped = Vec::with_capacity(num_workers);
102        let shutdown_flag = Arc::new(AtomicBool::new(false));
103
104        for worker_id in 0..num_workers {
105            let (tx, rx) = bounded::<Vec<u8>>(queue_size);
106            packet_senders.push(tx);
107
108            let result_sender_clone = result_sender.clone();
109            let db_clone = database.clone();
110            let dropped = Arc::new(AtomicU64::new(0));
111            worker_dropped.push(Arc::clone(&dropped));
112            let shutdown_flag_clone = Arc::clone(&shutdown_flag);
113            let worker_filter = filter_config.clone();
114
115            thread::Builder::new()
116                .name(format!("http-worker-{worker_id}"))
117                .spawn(move || {
118                    Self::worker_loop(
119                        worker_id,
120                        rx,
121                        result_sender_clone,
122                        db_clone,
123                        dropped,
124                        shutdown_flag_clone,
125                        WorkerConfig { batch_size, timeout_ms, max_connections },
126                        worker_filter,
127                    )
128                })
129                .map_err(|e| {
130                    HuginnNetHttpError::Misconfiguration(format!(
131                        "Failed to spawn worker thread {worker_id}: {e}"
132                    ))
133                })?;
134        }
135
136        Ok(Arc::new(Self {
137            packet_senders: Arc::new(packet_senders),
138            result_sender: Arc::new(Mutex::new(Some(result_sender))),
139            shutdown_flag,
140            dispatched_count: Arc::new(AtomicU64::new(0)),
141            dropped_count: Arc::new(AtomicU64::new(0)),
142            worker_dropped,
143            num_workers,
144            batch_size,
145            timeout_ms,
146        }))
147    }
148
149    /// Worker thread main loop with batching support.
150    #[allow(clippy::too_many_arguments)]
151    fn worker_loop(
152        worker_id: usize,
153        rx: crossbeam_channel::Receiver<Vec<u8>>,
154        result_sender: std::sync::mpsc::Sender<HttpAnalysisResult>,
155        database: Option<Arc<db::Database>>,
156        dropped: Arc<AtomicU64>,
157        shutdown_flag: Arc<AtomicBool>,
158        config: WorkerConfig,
159        filter_config: Option<FilterConfig>,
160    ) {
161        use crossbeam_channel::RecvTimeoutError;
162        use std::time::Duration;
163
164        debug!("HTTP worker {} started", worker_id);
165
166        let matcher = database
167            .as_ref()
168            .map(|db| SignatureMatcher::new(db.as_ref()));
169        let mut http_flows = TtlCache::new(config.max_connections);
170        let http_processors = HttpProcessors::new();
171        let timeout = Duration::from_millis(config.timeout_ms);
172        let mut batch = Vec::with_capacity(config.batch_size);
173
174        loop {
175            if shutdown_flag.load(Ordering::Relaxed) {
176                debug!("HTTP worker {} received shutdown signal", worker_id);
177                break;
178            }
179
180            // Receive first packet with timeout (blocking)
181            match rx.recv_timeout(timeout) {
182                Ok(packet) => {
183                    batch.push(packet);
184
185                    // Try to fill the batch with additional packets (non-blocking)
186                    while batch.len() < config.batch_size {
187                        match rx.try_recv() {
188                            Ok(packet) => batch.push(packet),
189                            Err(_) => break,
190                        }
191                    }
192
193                    // Process all packets in the batch
194                    for packet in batch.drain(..) {
195                        match Self::process_packet(
196                            &packet,
197                            &mut http_flows,
198                            &http_processors,
199                            matcher.as_ref(),
200                            filter_config.as_ref(),
201                        ) {
202                            Ok(result) => {
203                                if result_sender.send(result).is_err() {
204                                    debug!("HTTP worker {} result channel closed", worker_id);
205                                    return;
206                                }
207                            }
208                            Err(_) => {
209                                // Packet processing error, increment dropped count
210                                dropped.fetch_add(1, Ordering::Relaxed);
211                            }
212                        }
213                    }
214                }
215                Err(RecvTimeoutError::Timeout) => {
216                    if shutdown_flag.load(Ordering::Relaxed) {
217                        debug!("HTTP worker {} received shutdown signal", worker_id);
218                        break;
219                    }
220                    continue;
221                }
222                Err(RecvTimeoutError::Disconnected) => {
223                    debug!("HTTP worker {} channel disconnected", worker_id);
224                    break;
225                }
226            }
227        }
228
229        debug!("HTTP worker {} stopped", worker_id);
230    }
231
232    /// Processes a single packet within a worker thread.
233    fn process_packet(
234        packet: &[u8],
235        http_flows: &mut TtlCache<FlowKey, TcpFlow>,
236        http_processors: &HttpProcessors,
237        matcher: Option<&SignatureMatcher>,
238        filter: Option<&FilterConfig>,
239    ) -> Result<HttpAnalysisResult, HuginnNetHttpError> {
240        if let Some(filter_cfg) = filter {
241            if !raw_filter::apply(packet, filter_cfg) {
242                debug!("Filtered out packet before parsing");
243                return Ok(HttpAnalysisResult { http_request: None, http_response: None });
244            }
245        }
246
247        use crate::packet_parser::{parse_packet, IpPacket};
248        use crate::process;
249
250        match parse_packet(packet) {
251            IpPacket::Ipv4(ipv4) => {
252                process::process_ipv4_packet(&ipv4, http_flows, http_processors, matcher)
253            }
254            IpPacket::Ipv6(ipv6) => {
255                process::process_ipv6_packet(&ipv6, http_flows, http_processors, matcher)
256            }
257            IpPacket::None => Ok(HttpAnalysisResult { http_request: None, http_response: None }),
258        }
259    }
260
261    pub fn dispatch(&self, packet: Vec<u8>) -> DispatchResult {
262        // Don't accept new packets if shutting down
263        if self.shutdown_flag.load(Ordering::Relaxed) {
264            self.dropped_count.fetch_add(1, Ordering::Relaxed);
265            return DispatchResult::Dropped;
266        }
267
268        let worker_id = packet_hash::hash_flow(&packet, self.num_workers);
269
270        self.dispatched_count.fetch_add(1, Ordering::Relaxed);
271
272        if let Some(sender) = self.packet_senders.get(worker_id) {
273            match sender.try_send(packet) {
274                Ok(()) => DispatchResult::Queued,
275                Err(_) => {
276                    self.dropped_count.fetch_add(1, Ordering::Relaxed);
277                    self.worker_dropped[worker_id].fetch_add(1, Ordering::Relaxed);
278                    DispatchResult::Dropped
279                }
280            }
281        } else {
282            self.dropped_count.fetch_add(1, Ordering::Relaxed);
283            DispatchResult::Dropped
284        }
285    }
286
287    pub fn stats(&self) -> PoolStats {
288        let workers = self
289            .packet_senders
290            .iter()
291            .enumerate()
292            .map(|(id, sender)| WorkerStats {
293                id,
294                queue_size: sender.len(),
295                dropped: self.worker_dropped[id].load(Ordering::Relaxed),
296            })
297            .collect();
298
299        PoolStats {
300            total_dispatched: self.dispatched_count.load(Ordering::Relaxed),
301            total_dropped: self.dropped_count.load(Ordering::Relaxed),
302            workers,
303        }
304    }
305
306    /// Initiates graceful shutdown of the worker pool.
307    pub fn shutdown(&self) {
308        // Set shutdown flag to stop workers on next timeout
309        self.shutdown_flag.store(true, Ordering::Relaxed);
310
311        // Drop result sender to signal workers
312        if let Ok(mut sender) = self.result_sender.lock() {
313            *sender = None;
314        }
315    }
316}
317
318impl fmt::Display for PoolStats {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        writeln!(f, "HTTP Worker Pool Statistics:")?;
321        writeln!(f, "  Total dispatched: {}", self.total_dispatched)?;
322        writeln!(f, "  Total dropped: {}", self.total_dropped)?;
323        writeln!(f, "  Workers: {}", self.workers.len())?;
324        for worker in &self.workers {
325            writeln!(f, "    {worker}")?;
326        }
327        Ok(())
328    }
329}
330
331impl fmt::Display for WorkerStats {
332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333        write!(
334            f,
335            "Worker {}: queue_size={}, dropped={}",
336            self.id, self.queue_size, self.dropped
337        )
338    }
339}