Skip to main content

stackforge_core/
parallel.rs

1//! Multithreaded packet processing using Rayon.
2//!
3//! This module provides parallel variants of common packet operations
4//! for high-throughput scenarios like PCAP batch processing and flow extraction.
5
6use bytes::Bytes;
7use rayon::prelude::*;
8
9use crate::error::Result;
10use crate::packet::Packet;
11
12/// Parse a batch of raw byte buffers into packets in parallel.
13///
14/// Each buffer is wrapped in a `Packet`, parsed, and returned.
15/// Parse errors are collected; packets that fail to parse are returned unparsed.
16///
17/// # Example
18///
19/// ```rust
20/// use stackforge_core::parallel::parse_batch;
21///
22/// let raw_packets: Vec<Vec<u8>> = vec![
23///     // Ethernet + ARP
24///     vec![
25///         0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
26///         0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
27///         0x08, 0x06,
28///         0x00, 0x01, 0x08, 0x00, 0x06, 0x04,
29///         0x00, 0x01,
30///         0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
31///         0xc0, 0xa8, 0x01, 0x01,
32///         0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
33///         0xc0, 0xa8, 0x01, 0x02,
34///     ],
35/// ];
36///
37/// let parsed = parse_batch(&raw_packets);
38/// assert_eq!(parsed.len(), 1);
39/// assert!(parsed[0].is_parsed());
40/// ```
41pub fn parse_batch(raw_packets: &[Vec<u8>]) -> Vec<Packet> {
42    raw_packets
43        .par_iter()
44        .map(|raw| {
45            let mut pkt = Packet::from_bytes(raw.clone());
46            let _ = pkt.parse();
47            pkt
48        })
49        .collect()
50}
51
52/// Parse a batch of `Bytes` buffers into packets in parallel (zero-copy).
53pub fn parse_batch_bytes(raw_packets: &[Bytes]) -> Vec<Packet> {
54    raw_packets
55        .par_iter()
56        .map(|raw| {
57            let mut pkt = Packet::from_bytes(raw.clone());
58            let _ = pkt.parse();
59            pkt
60        })
61        .collect()
62}
63
64/// Parse a batch and return only successfully parsed packets.
65pub fn parse_batch_ok(raw_packets: &[Vec<u8>]) -> Vec<Packet> {
66    raw_packets
67        .par_iter()
68        .filter_map(|raw| {
69            let mut pkt = Packet::from_bytes(raw.clone());
70            pkt.parse().ok().map(|()| pkt)
71        })
72        .collect()
73}
74
75/// Apply a function to each packet in parallel, collecting results.
76///
77/// Useful for extracting fields, computing summaries, or filtering.
78pub fn par_map<F, R>(packets: &[Packet], f: F) -> Vec<R>
79where
80    F: Fn(&Packet) -> R + Sync + Send,
81    R: Send,
82{
83    packets.par_iter().map(f).collect()
84}
85
86/// Filter packets in parallel using a predicate.
87pub fn par_filter<F>(packets: &[Packet], predicate: F) -> Vec<&Packet>
88where
89    F: Fn(&Packet) -> bool + Sync + Send,
90{
91    packets.par_iter().filter(|p| predicate(p)).collect()
92}
93
94/// Parse and immediately apply a transform in parallel.
95///
96/// This is more efficient than `parse_batch` + `par_map` because it avoids
97/// materializing the intermediate `Vec<Packet>`.
98pub fn parse_and_map<F, R>(raw_packets: &[Vec<u8>], f: F) -> Vec<R>
99where
100    F: Fn(&Packet) -> R + Sync + Send,
101    R: Send,
102{
103    raw_packets
104        .par_iter()
105        .map(|raw| {
106            let mut pkt = Packet::from_bytes(raw.clone());
107            let _ = pkt.parse();
108            f(&pkt)
109        })
110        .collect()
111}
112
113/// Count packets matching a predicate in parallel.
114pub fn par_count<F>(packets: &[Packet], predicate: F) -> usize
115where
116    F: Fn(&Packet) -> bool + Sync + Send,
117{
118    packets.par_iter().filter(|p| predicate(p)).count()
119}
120
121/// Parallel summary extraction: parse and summarize each packet.
122pub fn summarize_batch(raw_packets: &[Vec<u8>]) -> Vec<String> {
123    parse_and_map(raw_packets, |pkt| {
124        let buf = pkt.as_bytes();
125        pkt.layer_enums()
126            .iter()
127            .map(|le| le.summary(buf))
128            .collect::<Vec<_>>()
129            .join(" / ")
130    })
131}
132
133/// Configure the global Rayon thread pool.
134///
135/// Call this once at startup to control the number of worker threads.
136/// If not called, Rayon defaults to the number of logical CPUs.
137///
138/// # Errors
139///
140/// Returns an error if the thread pool has already been initialized.
141pub fn configure_thread_pool(num_threads: usize) -> Result<()> {
142    rayon::ThreadPoolBuilder::new()
143        .num_threads(num_threads)
144        .build_global()
145        .map_err(|e| crate::error::PacketError::ParseError {
146            offset: 0,
147            message: format!("Failed to configure thread pool: {e}"),
148        })
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::layer::LayerKind;
155
156    fn arp_packet() -> Vec<u8> {
157        vec![
158            0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x08, 0x06,
159            0x00, 0x01, 0x08, 0x00, 0x06, 0x04, 0x00, 0x01, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
160            0xc0, 0xa8, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc0, 0xa8, 0x01, 0x02,
161        ]
162    }
163
164    #[test]
165    fn test_parse_batch() {
166        let packets: Vec<Vec<u8>> = (0..100).map(|_| arp_packet()).collect();
167        let parsed = parse_batch(&packets);
168        assert_eq!(parsed.len(), 100);
169        for pkt in &parsed {
170            assert!(pkt.is_parsed());
171            assert_eq!(pkt.layer_count(), 2);
172        }
173    }
174
175    #[test]
176    fn test_parse_batch_bytes() {
177        let packets: Vec<Bytes> = (0..50).map(|_| Bytes::from(arp_packet())).collect();
178        let parsed = parse_batch_bytes(&packets);
179        assert_eq!(parsed.len(), 50);
180        for pkt in &parsed {
181            assert!(pkt.is_parsed());
182        }
183    }
184
185    #[test]
186    fn test_par_map() {
187        let packets: Vec<Vec<u8>> = (0..10).map(|_| arp_packet()).collect();
188        let parsed = parse_batch(&packets);
189        let has_arp: Vec<bool> = par_map(&parsed, |pkt| pkt.get_layer(LayerKind::Arp).is_some());
190        assert!(has_arp.iter().all(|&v| v));
191    }
192
193    #[test]
194    fn test_par_filter() {
195        let packets: Vec<Vec<u8>> = (0..10).map(|_| arp_packet()).collect();
196        let parsed = parse_batch(&packets);
197        let arp_packets = par_filter(&parsed, |pkt| pkt.get_layer(LayerKind::Arp).is_some());
198        assert_eq!(arp_packets.len(), 10);
199    }
200
201    #[test]
202    fn test_parse_and_map() {
203        let packets: Vec<Vec<u8>> = (0..10).map(|_| arp_packet()).collect();
204        let layer_counts: Vec<usize> = parse_and_map(&packets, |pkt| pkt.layer_count());
205        assert!(layer_counts.iter().all(|&c| c == 2));
206    }
207
208    #[test]
209    fn test_par_count() {
210        let packets: Vec<Vec<u8>> = (0..20).map(|_| arp_packet()).collect();
211        let parsed = parse_batch(&packets);
212        let count = par_count(&parsed, |pkt| pkt.get_layer(LayerKind::Arp).is_some());
213        assert_eq!(count, 20);
214    }
215
216    #[test]
217    fn test_empty_batch() {
218        let empty: Vec<Vec<u8>> = vec![];
219        let parsed = parse_batch(&empty);
220        assert!(parsed.is_empty());
221    }
222
223    #[test]
224    fn test_parse_batch_with_errors() {
225        let packets = vec![
226            arp_packet(),     // valid
227            vec![0x01, 0x02], // too short, but won't error (just empty layers)
228            arp_packet(),     // valid
229        ];
230        let parsed = parse_batch(&packets);
231        assert_eq!(parsed.len(), 3);
232        assert!(parsed[0].is_parsed());
233        assert!(parsed[2].is_parsed());
234    }
235}