1use crate::error::HuginnNetHttpError;
9use crate::filter::FilterConfig;
10use crate::http_process::{FlowKey, HttpProcessors, TcpFlow};
11use crate::packet_hash;
12use crate::raw_filter;
13use crate::{HttpAnalysisResult, SignatureMatcher};
14use crossbeam_channel::{bounded, Sender};
15use huginn_net_db as db;
16use std::fmt;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19use std::thread;
20use tracing::debug;
21use ttl_cache::TtlCache;
22
23struct WorkerConfig {
25 batch_size: usize,
26 timeout_ms: u64,
27 max_connections: usize,
28}
29
30pub struct WorkerPool {
32 packet_senders: Arc<Vec<Sender<Vec<u8>>>>,
33 result_sender: Arc<Mutex<Option<std::sync::mpsc::Sender<HttpAnalysisResult>>>>,
34 shutdown_flag: Arc<AtomicBool>,
35 dispatched_count: Arc<AtomicU64>,
36 dropped_count: Arc<AtomicU64>,
37 worker_dropped: Vec<Arc<AtomicU64>>,
38 num_workers: usize,
39 pub batch_size: usize,
40 pub timeout_ms: u64,
41}
42
43#[derive(Debug, Clone)]
45pub struct WorkerStats {
46 pub id: usize,
47 pub queue_size: usize,
48 pub dropped: u64,
49}
50
51#[derive(Debug, Clone)]
53pub struct PoolStats {
54 pub total_dispatched: u64,
55 pub total_dropped: u64,
56 pub workers: Vec<WorkerStats>,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum DispatchResult {
62 Queued,
64 Dropped,
66}
67
68impl WorkerPool {
69 #[allow(clippy::too_many_arguments)]
84 pub fn new(
85 num_workers: usize,
86 queue_size: usize,
87 batch_size: usize,
88 timeout_ms: u64,
89 result_sender: std::sync::mpsc::Sender<HttpAnalysisResult>,
90 database: Option<Arc<db::Database>>,
91 max_connections: usize,
92 filter_config: Option<FilterConfig>,
93 ) -> Result<Arc<Self>, HuginnNetHttpError> {
94 if num_workers == 0 {
95 return Err(HuginnNetHttpError::Misconfiguration(
96 "Worker count must be at least 1".to_string(),
97 ));
98 }
99
100 let mut packet_senders = Vec::with_capacity(num_workers);
101 let mut worker_dropped = Vec::with_capacity(num_workers);
102 let shutdown_flag = Arc::new(AtomicBool::new(false));
103
104 for worker_id in 0..num_workers {
105 let (tx, rx) = bounded::<Vec<u8>>(queue_size);
106 packet_senders.push(tx);
107
108 let result_sender_clone = result_sender.clone();
109 let db_clone = database.clone();
110 let dropped = Arc::new(AtomicU64::new(0));
111 worker_dropped.push(Arc::clone(&dropped));
112 let shutdown_flag_clone = Arc::clone(&shutdown_flag);
113 let worker_filter = filter_config.clone();
114
115 thread::Builder::new()
116 .name(format!("http-worker-{worker_id}"))
117 .spawn(move || {
118 Self::worker_loop(
119 worker_id,
120 rx,
121 result_sender_clone,
122 db_clone,
123 dropped,
124 shutdown_flag_clone,
125 WorkerConfig { batch_size, timeout_ms, max_connections },
126 worker_filter,
127 )
128 })
129 .map_err(|e| {
130 HuginnNetHttpError::Misconfiguration(format!(
131 "Failed to spawn worker thread {worker_id}: {e}"
132 ))
133 })?;
134 }
135
136 Ok(Arc::new(Self {
137 packet_senders: Arc::new(packet_senders),
138 result_sender: Arc::new(Mutex::new(Some(result_sender))),
139 shutdown_flag,
140 dispatched_count: Arc::new(AtomicU64::new(0)),
141 dropped_count: Arc::new(AtomicU64::new(0)),
142 worker_dropped,
143 num_workers,
144 batch_size,
145 timeout_ms,
146 }))
147 }
148
149 #[allow(clippy::too_many_arguments)]
151 fn worker_loop(
152 worker_id: usize,
153 rx: crossbeam_channel::Receiver<Vec<u8>>,
154 result_sender: std::sync::mpsc::Sender<HttpAnalysisResult>,
155 database: Option<Arc<db::Database>>,
156 dropped: Arc<AtomicU64>,
157 shutdown_flag: Arc<AtomicBool>,
158 config: WorkerConfig,
159 filter_config: Option<FilterConfig>,
160 ) {
161 use crossbeam_channel::RecvTimeoutError;
162 use std::time::Duration;
163
164 debug!("HTTP worker {} started", worker_id);
165
166 let matcher = database
167 .as_ref()
168 .map(|db| SignatureMatcher::new(db.as_ref()));
169 let mut http_flows = TtlCache::new(config.max_connections);
170 let http_processors = HttpProcessors::new();
171 let timeout = Duration::from_millis(config.timeout_ms);
172 let mut batch = Vec::with_capacity(config.batch_size);
173
174 loop {
175 if shutdown_flag.load(Ordering::Relaxed) {
176 debug!("HTTP worker {} received shutdown signal", worker_id);
177 break;
178 }
179
180 match rx.recv_timeout(timeout) {
182 Ok(packet) => {
183 batch.push(packet);
184
185 while batch.len() < config.batch_size {
187 match rx.try_recv() {
188 Ok(packet) => batch.push(packet),
189 Err(_) => break,
190 }
191 }
192
193 for packet in batch.drain(..) {
195 match Self::process_packet(
196 &packet,
197 &mut http_flows,
198 &http_processors,
199 matcher.as_ref(),
200 filter_config.as_ref(),
201 ) {
202 Ok(result) => {
203 if result_sender.send(result).is_err() {
204 debug!("HTTP worker {} result channel closed", worker_id);
205 return;
206 }
207 }
208 Err(_) => {
209 dropped.fetch_add(1, Ordering::Relaxed);
211 }
212 }
213 }
214 }
215 Err(RecvTimeoutError::Timeout) => {
216 if shutdown_flag.load(Ordering::Relaxed) {
217 debug!("HTTP worker {} received shutdown signal", worker_id);
218 break;
219 }
220 continue;
221 }
222 Err(RecvTimeoutError::Disconnected) => {
223 debug!("HTTP worker {} channel disconnected", worker_id);
224 break;
225 }
226 }
227 }
228
229 debug!("HTTP worker {} stopped", worker_id);
230 }
231
232 fn process_packet(
234 packet: &[u8],
235 http_flows: &mut TtlCache<FlowKey, TcpFlow>,
236 http_processors: &HttpProcessors,
237 matcher: Option<&SignatureMatcher>,
238 filter: Option<&FilterConfig>,
239 ) -> Result<HttpAnalysisResult, HuginnNetHttpError> {
240 if let Some(filter_cfg) = filter {
241 if !raw_filter::apply(packet, filter_cfg) {
242 debug!("Filtered out packet before parsing");
243 return Ok(HttpAnalysisResult { http_request: None, http_response: None });
244 }
245 }
246
247 use crate::packet_parser::{parse_packet, IpPacket};
248 use crate::process;
249
250 match parse_packet(packet) {
251 IpPacket::Ipv4(ipv4) => {
252 process::process_ipv4_packet(&ipv4, http_flows, http_processors, matcher)
253 }
254 IpPacket::Ipv6(ipv6) => {
255 process::process_ipv6_packet(&ipv6, http_flows, http_processors, matcher)
256 }
257 IpPacket::None => Ok(HttpAnalysisResult { http_request: None, http_response: None }),
258 }
259 }
260
261 pub fn dispatch(&self, packet: Vec<u8>) -> DispatchResult {
262 if self.shutdown_flag.load(Ordering::Relaxed) {
264 self.dropped_count.fetch_add(1, Ordering::Relaxed);
265 return DispatchResult::Dropped;
266 }
267
268 let worker_id = packet_hash::hash_flow(&packet, self.num_workers);
269
270 self.dispatched_count.fetch_add(1, Ordering::Relaxed);
271
272 if let Some(sender) = self.packet_senders.get(worker_id) {
273 match sender.try_send(packet) {
274 Ok(()) => DispatchResult::Queued,
275 Err(_) => {
276 self.dropped_count.fetch_add(1, Ordering::Relaxed);
277 self.worker_dropped[worker_id].fetch_add(1, Ordering::Relaxed);
278 DispatchResult::Dropped
279 }
280 }
281 } else {
282 self.dropped_count.fetch_add(1, Ordering::Relaxed);
283 DispatchResult::Dropped
284 }
285 }
286
287 pub fn stats(&self) -> PoolStats {
288 let workers = self
289 .packet_senders
290 .iter()
291 .enumerate()
292 .map(|(id, sender)| WorkerStats {
293 id,
294 queue_size: sender.len(),
295 dropped: self.worker_dropped[id].load(Ordering::Relaxed),
296 })
297 .collect();
298
299 PoolStats {
300 total_dispatched: self.dispatched_count.load(Ordering::Relaxed),
301 total_dropped: self.dropped_count.load(Ordering::Relaxed),
302 workers,
303 }
304 }
305
306 pub fn shutdown(&self) {
308 self.shutdown_flag.store(true, Ordering::Relaxed);
310
311 if let Ok(mut sender) = self.result_sender.lock() {
313 *sender = None;
314 }
315 }
316}
317
318impl fmt::Display for PoolStats {
319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 writeln!(f, "HTTP Worker Pool Statistics:")?;
321 writeln!(f, " Total dispatched: {}", self.total_dispatched)?;
322 writeln!(f, " Total dropped: {}", self.total_dropped)?;
323 writeln!(f, " Workers: {}", self.workers.len())?;
324 for worker in &self.workers {
325 writeln!(f, " {worker}")?;
326 }
327 Ok(())
328 }
329}
330
331impl fmt::Display for WorkerStats {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 write!(
334 f,
335 "Worker {}: queue_size={}, dropped={}",
336 self.id, self.queue_size, self.dropped
337 )
338 }
339}