stackforge_core/
parallel.rs1use bytes::Bytes;
7use rayon::prelude::*;
8
9use crate::error::Result;
10use crate::packet::Packet;
11
12pub fn parse_batch(raw_packets: &[Vec<u8>]) -> Vec<Packet> {
42 raw_packets
43 .par_iter()
44 .map(|raw| {
45 let mut pkt = Packet::from_bytes(raw.clone());
46 let _ = pkt.parse();
47 pkt
48 })
49 .collect()
50}
51
52pub fn parse_batch_bytes(raw_packets: &[Bytes]) -> Vec<Packet> {
54 raw_packets
55 .par_iter()
56 .map(|raw| {
57 let mut pkt = Packet::from_bytes(raw.clone());
58 let _ = pkt.parse();
59 pkt
60 })
61 .collect()
62}
63
64pub fn parse_batch_ok(raw_packets: &[Vec<u8>]) -> Vec<Packet> {
66 raw_packets
67 .par_iter()
68 .filter_map(|raw| {
69 let mut pkt = Packet::from_bytes(raw.clone());
70 pkt.parse().ok().map(|()| pkt)
71 })
72 .collect()
73}
74
75pub fn par_map<F, R>(packets: &[Packet], f: F) -> Vec<R>
79where
80 F: Fn(&Packet) -> R + Sync + Send,
81 R: Send,
82{
83 packets.par_iter().map(f).collect()
84}
85
86pub fn par_filter<F>(packets: &[Packet], predicate: F) -> Vec<&Packet>
88where
89 F: Fn(&Packet) -> bool + Sync + Send,
90{
91 packets.par_iter().filter(|p| predicate(p)).collect()
92}
93
94pub fn parse_and_map<F, R>(raw_packets: &[Vec<u8>], f: F) -> Vec<R>
99where
100 F: Fn(&Packet) -> R + Sync + Send,
101 R: Send,
102{
103 raw_packets
104 .par_iter()
105 .map(|raw| {
106 let mut pkt = Packet::from_bytes(raw.clone());
107 let _ = pkt.parse();
108 f(&pkt)
109 })
110 .collect()
111}
112
113pub fn par_count<F>(packets: &[Packet], predicate: F) -> usize
115where
116 F: Fn(&Packet) -> bool + Sync + Send,
117{
118 packets.par_iter().filter(|p| predicate(p)).count()
119}
120
121pub fn summarize_batch(raw_packets: &[Vec<u8>]) -> Vec<String> {
123 parse_and_map(raw_packets, |pkt| {
124 let buf = pkt.as_bytes();
125 pkt.layer_enums()
126 .iter()
127 .map(|le| le.summary(buf))
128 .collect::<Vec<_>>()
129 .join(" / ")
130 })
131}
132
133pub fn configure_thread_pool(num_threads: usize) -> Result<()> {
142 rayon::ThreadPoolBuilder::new()
143 .num_threads(num_threads)
144 .build_global()
145 .map_err(|e| crate::error::PacketError::ParseError {
146 offset: 0,
147 message: format!("Failed to configure thread pool: {e}"),
148 })
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::layer::LayerKind;
155
156 fn arp_packet() -> Vec<u8> {
157 vec![
158 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x08, 0x06,
159 0x00, 0x01, 0x08, 0x00, 0x06, 0x04, 0x00, 0x01, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
160 0xc0, 0xa8, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xc0, 0xa8, 0x01, 0x02,
161 ]
162 }
163
164 #[test]
165 fn test_parse_batch() {
166 let packets: Vec<Vec<u8>> = (0..100).map(|_| arp_packet()).collect();
167 let parsed = parse_batch(&packets);
168 assert_eq!(parsed.len(), 100);
169 for pkt in &parsed {
170 assert!(pkt.is_parsed());
171 assert_eq!(pkt.layer_count(), 2);
172 }
173 }
174
175 #[test]
176 fn test_parse_batch_bytes() {
177 let packets: Vec<Bytes> = (0..50).map(|_| Bytes::from(arp_packet())).collect();
178 let parsed = parse_batch_bytes(&packets);
179 assert_eq!(parsed.len(), 50);
180 for pkt in &parsed {
181 assert!(pkt.is_parsed());
182 }
183 }
184
185 #[test]
186 fn test_par_map() {
187 let packets: Vec<Vec<u8>> = (0..10).map(|_| arp_packet()).collect();
188 let parsed = parse_batch(&packets);
189 let has_arp: Vec<bool> = par_map(&parsed, |pkt| pkt.get_layer(LayerKind::Arp).is_some());
190 assert!(has_arp.iter().all(|&v| v));
191 }
192
193 #[test]
194 fn test_par_filter() {
195 let packets: Vec<Vec<u8>> = (0..10).map(|_| arp_packet()).collect();
196 let parsed = parse_batch(&packets);
197 let arp_packets = par_filter(&parsed, |pkt| pkt.get_layer(LayerKind::Arp).is_some());
198 assert_eq!(arp_packets.len(), 10);
199 }
200
201 #[test]
202 fn test_parse_and_map() {
203 let packets: Vec<Vec<u8>> = (0..10).map(|_| arp_packet()).collect();
204 let layer_counts: Vec<usize> = parse_and_map(&packets, |pkt| pkt.layer_count());
205 assert!(layer_counts.iter().all(|&c| c == 2));
206 }
207
208 #[test]
209 fn test_par_count() {
210 let packets: Vec<Vec<u8>> = (0..20).map(|_| arp_packet()).collect();
211 let parsed = parse_batch(&packets);
212 let count = par_count(&parsed, |pkt| pkt.get_layer(LayerKind::Arp).is_some());
213 assert_eq!(count, 20);
214 }
215
216 #[test]
217 fn test_empty_batch() {
218 let empty: Vec<Vec<u8>> = vec![];
219 let parsed = parse_batch(&empty);
220 assert!(parsed.is_empty());
221 }
222
223 #[test]
224 fn test_parse_batch_with_errors() {
225 let packets = vec![
226 arp_packet(), vec![0x01, 0x02], arp_packet(), ];
230 let parsed = parse_batch(&packets);
231 assert_eq!(parsed.len(), 3);
232 assert!(parsed[0].is_parsed());
233 assert!(parsed[2].is_parsed());
234 }
235}