1use std::io::{BufReader, Read};
8use std::net::IpAddr;
9use std::path::Path;
10
11use etherparse::SlicedPacket;
12use pcap_parser::traits::PcapReaderIterator;
13use pcap_parser::{LegacyPcapReader, PcapBlockOwned, PcapError, PcapNGReader};
14
15use crate::error::PcapError as ToolkitPcapError;
16use crate::flow::FlowKey;
17
18#[derive(Debug)]
20pub struct PacketData {
21 pub info: PacketInfo,
23 pub data: Vec<u8>,
25}
26
27#[derive(Debug)]
29pub struct PacketInfo {
30 pub timestamp_ns: u64,
32 pub captured_len: u32,
34 pub original_len: u32,
36 pub flow_key: Option<FlowKey>,
38}
39
40const BUF_SIZE: usize = 65536;
42
43fn map_pcap_err<I: std::fmt::Debug>(e: PcapError<I>) -> ToolkitPcapError {
45 ToolkitPcapError::Parse(format!("{e:?}"))
46}
47
48pub fn iter_legacy<R: Read>(
52 reader: R,
53) -> Result<impl Iterator<Item = Result<PacketInfo, ToolkitPcapError>>, ToolkitPcapError> {
54 let mut pcap = LegacyPcapReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
55 let mut ns_precision = false;
56
57 loop {
59 match pcap.next() {
60 Ok((offset, block)) => {
61 if let PcapBlockOwned::LegacyHeader(hdr) = block {
62 ns_precision = hdr.magic_number == 0xa1b2_3c4d;
64 pcap.consume(offset);
65 break;
66 }
67 pcap.consume(offset);
68 }
69 Err(PcapError::Eof) => break,
70 Err(PcapError::Incomplete(_)) => {
71 pcap.refill().map_err(map_pcap_err)?;
72 }
73 Err(e) => return Err(map_pcap_err(e)),
74 }
75 }
76
77 Ok(LegacyIter {
78 reader: pcap,
79 ns_precision,
80 done: false,
81 })
82}
83
84struct LegacyIter<R: Read> {
85 reader: LegacyPcapReader<R>,
86 ns_precision: bool,
87 done: bool,
88}
89
90impl<R: Read> Iterator for LegacyIter<R> {
91 type Item = Result<PacketInfo, ToolkitPcapError>;
92
93 fn next(&mut self) -> Option<Self::Item> {
94 if self.done {
95 return None;
96 }
97 loop {
98 match self.reader.next() {
99 Ok((offset, block)) => {
100 let result = if let PcapBlockOwned::Legacy(pkt) = block {
101 let ts_ns = if self.ns_precision {
102 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
103 } else {
104 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
105 };
106 let flow_key = parse_flow_key(pkt.data);
107 Some(Ok(PacketInfo {
108 timestamp_ns: ts_ns,
109 captured_len: pkt.caplen,
110 original_len: pkt.origlen,
111 flow_key,
112 }))
113 } else {
114 None
115 };
116 self.reader.consume(offset);
117 if let Some(item) = result {
118 return Some(item);
119 }
120 }
121 Err(PcapError::Eof) => {
122 self.done = true;
123 return None;
124 }
125 Err(PcapError::Incomplete(_)) => {
126 if let Err(e) = self.reader.refill() {
127 self.done = true;
128 return Some(Err(map_pcap_err(e)));
129 }
130 }
131 Err(e) => {
132 self.done = true;
133 return Some(Err(map_pcap_err(e)));
134 }
135 }
136 }
137 }
138}
139
140pub fn iter_pcapng<R: Read>(
145 reader: R,
146) -> Result<impl Iterator<Item = Result<PacketInfo, ToolkitPcapError>>, ToolkitPcapError> {
147 let pcap = PcapNGReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
148 Ok(PcapNGIter {
149 reader: pcap,
150 done: false,
151 })
152}
153
154struct PcapNGIter<R: Read> {
155 reader: PcapNGReader<R>,
156 done: bool,
157}
158
159impl<R: Read> Iterator for PcapNGIter<R> {
160 type Item = Result<PacketInfo, ToolkitPcapError>;
161
162 fn next(&mut self) -> Option<Self::Item> {
163 if self.done {
164 return None;
165 }
166 loop {
167 match self.reader.next() {
168 Ok((offset, block)) => {
169 const RESOLUTION: u64 = 1_000_000;
171 const TS_OFFSET: u64 = 0;
172
173 let result = match block {
174 PcapBlockOwned::NG(ng_block) => {
175 use pcap_parser::Block;
176 match ng_block {
177 Block::EnhancedPacket(epb) => {
178 let (ts_sec, ts_frac) = epb.decode_ts(TS_OFFSET, RESOLUTION);
179 let ts_ns = u64::from(ts_sec) * 1_000_000_000
180 + u64::from(ts_frac) * (1_000_000_000 / RESOLUTION);
181 let flow_key = parse_flow_key(epb.data);
182 Some(Ok(PacketInfo {
183 timestamp_ns: ts_ns,
184 captured_len: epb.caplen,
185 original_len: epb.origlen,
186 flow_key,
187 }))
188 }
189 Block::SimplePacket(spb) => {
190 let flow_key = parse_flow_key(spb.data);
191 Some(Ok(PacketInfo {
192 timestamp_ns: 0,
193 captured_len: spb.data.len() as u32,
194 original_len: spb.origlen,
195 flow_key,
196 }))
197 }
198 _ => None,
199 }
200 }
201 _ => None,
202 };
203 self.reader.consume(offset);
204 if let Some(item) = result {
205 return Some(item);
206 }
207 }
208 Err(PcapError::Eof) => {
209 self.done = true;
210 return None;
211 }
212 Err(PcapError::Incomplete(_)) => {
213 if let Err(e) = self.reader.refill() {
214 self.done = true;
215 return Some(Err(map_pcap_err(e)));
216 }
217 }
218 Err(e) => {
219 self.done = true;
220 return Some(Err(map_pcap_err(e)));
221 }
222 }
223 }
224 }
225}
226
227pub fn open(
231 path: &Path,
232) -> Result<Box<dyn Iterator<Item = Result<PacketInfo, ToolkitPcapError>>>, ToolkitPcapError> {
233 let file = std::fs::File::open(path)?;
234 let mut buf = BufReader::new(file);
235
236 let mut magic = [0u8; 4];
238 buf.read_exact(&mut magic)?;
239
240 let file2 = std::fs::File::open(path)?;
242
243 let u32_magic = u32::from_le_bytes(magic);
244 match u32_magic {
245 0xa1b2_c3d4 | 0xd4c3_b2a1 | 0xa1b2_3c4d | 0x4d3c_b2a1 => Ok(Box::new(iter_legacy(file2)?)),
247 0x0a0d_0d0a => Ok(Box::new(iter_pcapng(file2)?)),
249 _ => Err(ToolkitPcapError::Parse(format!(
250 "unrecognised file magic: {u32_magic:#010x}"
251 ))),
252 }
253}
254
255pub fn open_with_payload(
261 path: &Path,
262) -> Result<Box<dyn Iterator<Item = Result<PacketData, ToolkitPcapError>>>, ToolkitPcapError> {
263 let file = std::fs::File::open(path)?;
264 let mut buf = BufReader::new(file);
265
266 let mut magic = [0u8; 4];
267 buf.read_exact(&mut magic)?;
268
269 let file2 = std::fs::File::open(path)?;
270
271 let u32_magic = u32::from_le_bytes(magic);
272 match u32_magic {
273 0xa1b2_c3d4 | 0xd4c3_b2a1 | 0xa1b2_3c4d | 0x4d3c_b2a1 => {
274 Ok(Box::new(iter_legacy_with_payload(file2)?))
275 }
276 0x0a0d_0d0a => Ok(Box::new(iter_pcapng_with_payload(file2)?)),
277 _ => Err(ToolkitPcapError::Parse(format!(
278 "unrecognised file magic: {u32_magic:#010x}"
279 ))),
280 }
281}
282
283fn iter_legacy_with_payload<R: Read + 'static>(
284 reader: R,
285) -> Result<impl Iterator<Item = Result<PacketData, ToolkitPcapError>>, ToolkitPcapError> {
286 let mut pcap = LegacyPcapReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
287 let mut ns_precision = false;
288
289 loop {
290 match pcap.next() {
291 Ok((offset, block)) => {
292 if let PcapBlockOwned::LegacyHeader(hdr) = block {
293 ns_precision = hdr.magic_number == 0xa1b2_3c4d;
294 pcap.consume(offset);
295 break;
296 }
297 pcap.consume(offset);
298 }
299 Err(PcapError::Eof) => break,
300 Err(PcapError::Incomplete(_)) => {
301 pcap.refill().map_err(map_pcap_err)?;
302 }
303 Err(e) => return Err(map_pcap_err(e)),
304 }
305 }
306
307 Ok(LegacyWithPayloadIter {
308 reader: pcap,
309 ns_precision,
310 done: false,
311 })
312}
313
314struct LegacyWithPayloadIter<R: Read> {
315 reader: LegacyPcapReader<R>,
316 ns_precision: bool,
317 done: bool,
318}
319
320impl<R: Read> Iterator for LegacyWithPayloadIter<R> {
321 type Item = Result<PacketData, ToolkitPcapError>;
322
323 fn next(&mut self) -> Option<Self::Item> {
324 if self.done {
325 return None;
326 }
327 loop {
328 match self.reader.next() {
329 Ok((offset, block)) => {
330 let result = if let PcapBlockOwned::Legacy(pkt) = block {
331 let ts_ns = if self.ns_precision {
332 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
333 } else {
334 u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
335 };
336 let data = pkt.data.to_vec();
337 let flow_key = parse_flow_key(&data);
338 Some(Ok(PacketData {
339 info: PacketInfo {
340 timestamp_ns: ts_ns,
341 captured_len: pkt.caplen,
342 original_len: pkt.origlen,
343 flow_key,
344 },
345 data,
346 }))
347 } else {
348 None
349 };
350 self.reader.consume(offset);
351 if let Some(item) = result {
352 return Some(item);
353 }
354 }
355 Err(PcapError::Eof) => {
356 self.done = true;
357 return None;
358 }
359 Err(PcapError::Incomplete(_)) => {
360 if let Err(e) = self.reader.refill() {
361 self.done = true;
362 return Some(Err(map_pcap_err(e)));
363 }
364 }
365 Err(e) => {
366 self.done = true;
367 return Some(Err(map_pcap_err(e)));
368 }
369 }
370 }
371 }
372}
373
374fn iter_pcapng_with_payload<R: Read + 'static>(
375 reader: R,
376) -> Result<impl Iterator<Item = Result<PacketData, ToolkitPcapError>>, ToolkitPcapError> {
377 let pcap = PcapNGReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
378 Ok(PcapNGWithPayloadIter {
379 reader: pcap,
380 done: false,
381 })
382}
383
384struct PcapNGWithPayloadIter<R: Read> {
385 reader: PcapNGReader<R>,
386 done: bool,
387}
388
389impl<R: Read> Iterator for PcapNGWithPayloadIter<R> {
390 type Item = Result<PacketData, ToolkitPcapError>;
391
392 fn next(&mut self) -> Option<Self::Item> {
393 if self.done {
394 return None;
395 }
396 loop {
397 match self.reader.next() {
398 Ok((offset, block)) => {
399 const RESOLUTION: u64 = 1_000_000;
400 const TS_OFFSET: u64 = 0;
401
402 let result = match block {
403 PcapBlockOwned::NG(ng_block) => {
404 use pcap_parser::Block;
405 match ng_block {
406 Block::EnhancedPacket(epb) => {
407 let (ts_sec, ts_frac) = epb.decode_ts(TS_OFFSET, RESOLUTION);
408 let ts_ns = u64::from(ts_sec) * 1_000_000_000
409 + u64::from(ts_frac) * (1_000_000_000 / RESOLUTION);
410 let data = epb.data.to_vec();
411 let flow_key = parse_flow_key(&data);
412 Some(Ok(PacketData {
413 info: PacketInfo {
414 timestamp_ns: ts_ns,
415 captured_len: epb.caplen,
416 original_len: epb.origlen,
417 flow_key,
418 },
419 data,
420 }))
421 }
422 Block::SimplePacket(spb) => {
423 let data = spb.data.to_vec();
424 let flow_key = parse_flow_key(&data);
425 Some(Ok(PacketData {
426 info: PacketInfo {
427 timestamp_ns: 0,
428 captured_len: spb.data.len() as u32,
429 original_len: spb.origlen,
430 flow_key,
431 },
432 data,
433 }))
434 }
435 _ => None,
436 }
437 }
438 _ => None,
439 };
440 self.reader.consume(offset);
441 if let Some(item) = result {
442 return Some(item);
443 }
444 }
445 Err(PcapError::Eof) => {
446 self.done = true;
447 return None;
448 }
449 Err(PcapError::Incomplete(_)) => {
450 if let Err(e) = self.reader.refill() {
451 self.done = true;
452 return Some(Err(map_pcap_err(e)));
453 }
454 }
455 Err(e) => {
456 self.done = true;
457 return Some(Err(map_pcap_err(e)));
458 }
459 }
460 }
461 }
462}
463
464pub fn count_flows_in_file(
475 path: &Path,
476 filter: &crate::filter::Filter,
477 bpf: Option<&crate::bpf::BpfExpr>,
478 unidirectional: bool,
479) -> Result<std::collections::HashMap<u64, u64>, ToolkitPcapError> {
480 use crate::filter::PacketMeta;
481 use std::collections::HashMap;
482
483 let mut counts: HashMap<u64, u64> = HashMap::new();
484 let has_filter = !filter.is_empty() || bpf.is_some();
485
486 for item in open_with_payload(path)? {
487 let packet = item?;
488 let meta = PacketMeta::from_packet(
489 packet.info.timestamp_ns,
490 packet.info.captured_len,
491 &packet.data,
492 );
493
494 if has_filter {
495 let ok =
496 (filter.is_empty() || filter.matches(&meta)) && bpf.is_none_or(|b| b.eval(&meta));
497 if !ok {
498 continue;
499 }
500 }
501
502 if let Some(ref key) = meta.flow_key {
503 let id = key.flow_id(unidirectional);
504 *counts.entry(id).or_default() += 1;
505 }
506 }
507
508 Ok(counts)
509}
510
511fn parse_flow_key(data: &[u8]) -> Option<FlowKey> {
515 let sliced = SlicedPacket::from_ethernet(data).ok()?;
516
517 let (src_ip, dst_ip, protocol) = match &sliced.net {
518 Some(etherparse::NetSlice::Ipv4(v4)) => {
519 let h = v4.header();
520 (
521 IpAddr::V4(h.source_addr()),
522 IpAddr::V4(h.destination_addr()),
523 h.protocol().0,
524 )
525 }
526 Some(etherparse::NetSlice::Ipv6(v6)) => {
527 let h = v6.header();
528 (
529 IpAddr::V6(h.source_addr()),
530 IpAddr::V6(h.destination_addr()),
531 h.next_header().0,
532 )
533 }
534 _ => return None,
536 };
537
538 let (src_port, dst_port) = match &sliced.transport {
539 Some(etherparse::TransportSlice::Tcp(tcp)) => (tcp.source_port(), tcp.destination_port()),
540 Some(etherparse::TransportSlice::Udp(udp)) => (udp.source_port(), udp.destination_port()),
541 _ => (0, 0),
542 };
543
544 Some(FlowKey::new(src_ip, dst_ip, src_port, dst_port, protocol))
545}