Skip to main content

pcap_toolkit/pcap/
mod.rs

1//! Streaming PCAP / PCAPng reader.
2//!
3//! Yields one [`PacketInfo`] per packet in file order, without buffering
4//! payloads. Uses `pcap-parser` for file framing and `etherparse` for
5//! zero-copy layer parsing.
6
7use std::io::{BufReader, Read};
8use std::net::IpAddr;
9use std::path::Path;
10
11use etherparse::SlicedPacket;
12use pcap_parser::traits::PcapReaderIterator;
13use pcap_parser::{LegacyPcapReader, PcapBlockOwned, PcapError, PcapNGReader};
14
15use crate::error::PcapError as ToolkitPcapError;
16use crate::flow::FlowKey;
17
18/// Parsed metadata for a single packet, together with the raw captured bytes.
19#[derive(Debug)]
20pub struct PacketData {
21    /// Parsed metadata (same as [`PacketInfo`]).
22    pub info: PacketInfo,
23    /// Raw bytes as captured (Ethernet frame or link-layer PDU).
24    pub data: Vec<u8>,
25}
26
27/// Parsed metadata for a single packet.
28#[derive(Debug)]
29pub struct PacketInfo {
30    /// Timestamp in nanoseconds since the Unix epoch.
31    pub timestamp_ns: u64,
32    /// Number of bytes captured (on-wire may be larger if truncated by snap length).
33    pub captured_len: u32,
34    /// On-wire packet length.
35    pub original_len: u32,
36    /// Parsed 5-tuple, if the packet contains an IP layer with a transport header.
37    pub flow_key: Option<FlowKey>,
38}
39
40/// Read buffer size passed to `pcap-parser` (64 KiB).
41const BUF_SIZE: usize = 65536;
42
43/// Convert a pcap-parser error (with any slice type) to our error type.
44fn map_pcap_err<I: std::fmt::Debug>(e: PcapError<I>) -> ToolkitPcapError {
45    ToolkitPcapError::Parse(format!("{e:?}"))
46}
47
48/// Iterate over packets in a legacy PCAP file.
49///
50/// Automatically detects nanosecond-precision magic and adjusts timestamps.
51pub fn iter_legacy<R: Read>(
52    reader: R,
53) -> Result<impl Iterator<Item = Result<PacketInfo, ToolkitPcapError>>, ToolkitPcapError> {
54    let mut pcap = LegacyPcapReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
55    let mut ns_precision = false;
56
57    // Read the global header to detect ns-resolution magic.
58    loop {
59        match pcap.next() {
60            Ok((offset, block)) => {
61                if let PcapBlockOwned::LegacyHeader(hdr) = block {
62                    // Magic 0xa1b23c4d = nanosecond resolution.
63                    ns_precision = hdr.magic_number == 0xa1b2_3c4d;
64                    pcap.consume(offset);
65                    break;
66                }
67                pcap.consume(offset);
68            }
69            Err(PcapError::Eof) => break,
70            Err(PcapError::Incomplete(_)) => {
71                pcap.refill().map_err(map_pcap_err)?;
72            }
73            Err(e) => return Err(map_pcap_err(e)),
74        }
75    }
76
77    Ok(LegacyIter {
78        reader: pcap,
79        ns_precision,
80        done: false,
81    })
82}
83
84struct LegacyIter<R: Read> {
85    reader: LegacyPcapReader<R>,
86    ns_precision: bool,
87    done: bool,
88}
89
90impl<R: Read> Iterator for LegacyIter<R> {
91    type Item = Result<PacketInfo, ToolkitPcapError>;
92
93    fn next(&mut self) -> Option<Self::Item> {
94        if self.done {
95            return None;
96        }
97        loop {
98            match self.reader.next() {
99                Ok((offset, block)) => {
100                    let result = if let PcapBlockOwned::Legacy(pkt) = block {
101                        let ts_ns = if self.ns_precision {
102                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
103                        } else {
104                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
105                        };
106                        let flow_key = parse_flow_key(pkt.data);
107                        Some(Ok(PacketInfo {
108                            timestamp_ns: ts_ns,
109                            captured_len: pkt.caplen,
110                            original_len: pkt.origlen,
111                            flow_key,
112                        }))
113                    } else {
114                        None
115                    };
116                    self.reader.consume(offset);
117                    if let Some(item) = result {
118                        return Some(item);
119                    }
120                }
121                Err(PcapError::Eof) => {
122                    self.done = true;
123                    return None;
124                }
125                Err(PcapError::Incomplete(_)) => {
126                    if let Err(e) = self.reader.refill() {
127                        self.done = true;
128                        return Some(Err(map_pcap_err(e)));
129                    }
130                }
131                Err(e) => {
132                    self.done = true;
133                    return Some(Err(map_pcap_err(e)));
134                }
135            }
136        }
137    }
138}
139
140/// Iterate over packets in a PCAPng file.
141///
142/// Uses default timestamp resolution (microseconds) and zero offset.
143/// Full IDB-aware timestamp decoding is deferred to a later phase.
144pub fn iter_pcapng<R: Read>(
145    reader: R,
146) -> Result<impl Iterator<Item = Result<PacketInfo, ToolkitPcapError>>, ToolkitPcapError> {
147    let pcap = PcapNGReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
148    Ok(PcapNGIter {
149        reader: pcap,
150        done: false,
151    })
152}
153
154struct PcapNGIter<R: Read> {
155    reader: PcapNGReader<R>,
156    done: bool,
157}
158
159impl<R: Read> Iterator for PcapNGIter<R> {
160    type Item = Result<PacketInfo, ToolkitPcapError>;
161
162    fn next(&mut self) -> Option<Self::Item> {
163        if self.done {
164            return None;
165        }
166        loop {
167            match self.reader.next() {
168                Ok((offset, block)) => {
169                    // Default: usec resolution, zero ts_offset.
170                    const RESOLUTION: u64 = 1_000_000;
171                    const TS_OFFSET: u64 = 0;
172
173                    let result = match block {
174                        PcapBlockOwned::NG(ng_block) => {
175                            use pcap_parser::Block;
176                            match ng_block {
177                                Block::EnhancedPacket(epb) => {
178                                    let (ts_sec, ts_frac) = epb.decode_ts(TS_OFFSET, RESOLUTION);
179                                    let ts_ns = u64::from(ts_sec) * 1_000_000_000
180                                        + u64::from(ts_frac) * (1_000_000_000 / RESOLUTION);
181                                    let flow_key = parse_flow_key(epb.data);
182                                    Some(Ok(PacketInfo {
183                                        timestamp_ns: ts_ns,
184                                        captured_len: epb.caplen,
185                                        original_len: epb.origlen,
186                                        flow_key,
187                                    }))
188                                }
189                                Block::SimplePacket(spb) => {
190                                    let flow_key = parse_flow_key(spb.data);
191                                    Some(Ok(PacketInfo {
192                                        timestamp_ns: 0,
193                                        captured_len: spb.data.len() as u32,
194                                        original_len: spb.origlen,
195                                        flow_key,
196                                    }))
197                                }
198                                _ => None,
199                            }
200                        }
201                        _ => None,
202                    };
203                    self.reader.consume(offset);
204                    if let Some(item) = result {
205                        return Some(item);
206                    }
207                }
208                Err(PcapError::Eof) => {
209                    self.done = true;
210                    return None;
211                }
212                Err(PcapError::Incomplete(_)) => {
213                    if let Err(e) = self.reader.refill() {
214                        self.done = true;
215                        return Some(Err(map_pcap_err(e)));
216                    }
217                }
218                Err(e) => {
219                    self.done = true;
220                    return Some(Err(map_pcap_err(e)));
221                }
222            }
223        }
224    }
225}
226
227/// Open a file, auto-detect PCAP vs PCAPng, and return a boxed packet iterator.
228///
229/// Detection is based on the first 4 bytes (magic number).
230pub fn open(
231    path: &Path,
232) -> Result<Box<dyn Iterator<Item = Result<PacketInfo, ToolkitPcapError>>>, ToolkitPcapError> {
233    let file = std::fs::File::open(path)?;
234    let mut buf = BufReader::new(file);
235
236    // Peek at the magic number (4 bytes).
237    let mut magic = [0u8; 4];
238    buf.read_exact(&mut magic)?;
239
240    // Re-open so the reader sees the full file including the magic.
241    let file2 = std::fs::File::open(path)?;
242
243    let u32_magic = u32::from_le_bytes(magic);
244    match u32_magic {
245        // Legacy PCAP: native or swapped byte order, usec or nsec precision.
246        0xa1b2_c3d4 | 0xd4c3_b2a1 | 0xa1b2_3c4d | 0x4d3c_b2a1 => Ok(Box::new(iter_legacy(file2)?)),
247        // PCAPng: SHB byte order magic (LE).
248        0x0a0d_0d0a => Ok(Box::new(iter_pcapng(file2)?)),
249        _ => Err(ToolkitPcapError::Parse(format!(
250            "unrecognised file magic: {u32_magic:#010x}"
251        ))),
252    }
253}
254
255/// Open a file and return an iterator that yields both [`PacketInfo`] and the
256/// raw captured bytes for each packet.
257///
258/// Useful for the export pipeline where the payload must be written out.
259/// Format auto-detection works the same as [`open`].
260pub fn open_with_payload(
261    path: &Path,
262) -> Result<Box<dyn Iterator<Item = Result<PacketData, ToolkitPcapError>>>, ToolkitPcapError> {
263    let file = std::fs::File::open(path)?;
264    let mut buf = BufReader::new(file);
265
266    let mut magic = [0u8; 4];
267    buf.read_exact(&mut magic)?;
268
269    let file2 = std::fs::File::open(path)?;
270
271    let u32_magic = u32::from_le_bytes(magic);
272    match u32_magic {
273        0xa1b2_c3d4 | 0xd4c3_b2a1 | 0xa1b2_3c4d | 0x4d3c_b2a1 => {
274            Ok(Box::new(iter_legacy_with_payload(file2)?))
275        }
276        0x0a0d_0d0a => Ok(Box::new(iter_pcapng_with_payload(file2)?)),
277        _ => Err(ToolkitPcapError::Parse(format!(
278            "unrecognised file magic: {u32_magic:#010x}"
279        ))),
280    }
281}
282
283fn iter_legacy_with_payload<R: Read + 'static>(
284    reader: R,
285) -> Result<impl Iterator<Item = Result<PacketData, ToolkitPcapError>>, ToolkitPcapError> {
286    let mut pcap = LegacyPcapReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
287    let mut ns_precision = false;
288
289    loop {
290        match pcap.next() {
291            Ok((offset, block)) => {
292                if let PcapBlockOwned::LegacyHeader(hdr) = block {
293                    ns_precision = hdr.magic_number == 0xa1b2_3c4d;
294                    pcap.consume(offset);
295                    break;
296                }
297                pcap.consume(offset);
298            }
299            Err(PcapError::Eof) => break,
300            Err(PcapError::Incomplete(_)) => {
301                pcap.refill().map_err(map_pcap_err)?;
302            }
303            Err(e) => return Err(map_pcap_err(e)),
304        }
305    }
306
307    Ok(LegacyWithPayloadIter {
308        reader: pcap,
309        ns_precision,
310        done: false,
311    })
312}
313
314struct LegacyWithPayloadIter<R: Read> {
315    reader: LegacyPcapReader<R>,
316    ns_precision: bool,
317    done: bool,
318}
319
320impl<R: Read> Iterator for LegacyWithPayloadIter<R> {
321    type Item = Result<PacketData, ToolkitPcapError>;
322
323    fn next(&mut self) -> Option<Self::Item> {
324        if self.done {
325            return None;
326        }
327        loop {
328            match self.reader.next() {
329                Ok((offset, block)) => {
330                    let result = if let PcapBlockOwned::Legacy(pkt) = block {
331                        let ts_ns = if self.ns_precision {
332                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec)
333                        } else {
334                            u64::from(pkt.ts_sec) * 1_000_000_000 + u64::from(pkt.ts_usec) * 1_000
335                        };
336                        let data = pkt.data.to_vec();
337                        let flow_key = parse_flow_key(&data);
338                        Some(Ok(PacketData {
339                            info: PacketInfo {
340                                timestamp_ns: ts_ns,
341                                captured_len: pkt.caplen,
342                                original_len: pkt.origlen,
343                                flow_key,
344                            },
345                            data,
346                        }))
347                    } else {
348                        None
349                    };
350                    self.reader.consume(offset);
351                    if let Some(item) = result {
352                        return Some(item);
353                    }
354                }
355                Err(PcapError::Eof) => {
356                    self.done = true;
357                    return None;
358                }
359                Err(PcapError::Incomplete(_)) => {
360                    if let Err(e) = self.reader.refill() {
361                        self.done = true;
362                        return Some(Err(map_pcap_err(e)));
363                    }
364                }
365                Err(e) => {
366                    self.done = true;
367                    return Some(Err(map_pcap_err(e)));
368                }
369            }
370        }
371    }
372}
373
374fn iter_pcapng_with_payload<R: Read + 'static>(
375    reader: R,
376) -> Result<impl Iterator<Item = Result<PacketData, ToolkitPcapError>>, ToolkitPcapError> {
377    let pcap = PcapNGReader::new(BUF_SIZE, reader).map_err(map_pcap_err)?;
378    Ok(PcapNGWithPayloadIter {
379        reader: pcap,
380        done: false,
381    })
382}
383
384struct PcapNGWithPayloadIter<R: Read> {
385    reader: PcapNGReader<R>,
386    done: bool,
387}
388
389impl<R: Read> Iterator for PcapNGWithPayloadIter<R> {
390    type Item = Result<PacketData, ToolkitPcapError>;
391
392    fn next(&mut self) -> Option<Self::Item> {
393        if self.done {
394            return None;
395        }
396        loop {
397            match self.reader.next() {
398                Ok((offset, block)) => {
399                    const RESOLUTION: u64 = 1_000_000;
400                    const TS_OFFSET: u64 = 0;
401
402                    let result = match block {
403                        PcapBlockOwned::NG(ng_block) => {
404                            use pcap_parser::Block;
405                            match ng_block {
406                                Block::EnhancedPacket(epb) => {
407                                    let (ts_sec, ts_frac) = epb.decode_ts(TS_OFFSET, RESOLUTION);
408                                    let ts_ns = u64::from(ts_sec) * 1_000_000_000
409                                        + u64::from(ts_frac) * (1_000_000_000 / RESOLUTION);
410                                    let data = epb.data.to_vec();
411                                    let flow_key = parse_flow_key(&data);
412                                    Some(Ok(PacketData {
413                                        info: PacketInfo {
414                                            timestamp_ns: ts_ns,
415                                            captured_len: epb.caplen,
416                                            original_len: epb.origlen,
417                                            flow_key,
418                                        },
419                                        data,
420                                    }))
421                                }
422                                Block::SimplePacket(spb) => {
423                                    let data = spb.data.to_vec();
424                                    let flow_key = parse_flow_key(&data);
425                                    Some(Ok(PacketData {
426                                        info: PacketInfo {
427                                            timestamp_ns: 0,
428                                            captured_len: spb.data.len() as u32,
429                                            original_len: spb.origlen,
430                                            flow_key,
431                                        },
432                                        data,
433                                    }))
434                                }
435                                _ => None,
436                            }
437                        }
438                        _ => None,
439                    };
440                    self.reader.consume(offset);
441                    if let Some(item) = result {
442                        return Some(item);
443                    }
444                }
445                Err(PcapError::Eof) => {
446                    self.done = true;
447                    return None;
448                }
449                Err(PcapError::Incomplete(_)) => {
450                    if let Err(e) = self.reader.refill() {
451                        self.done = true;
452                        return Some(Err(map_pcap_err(e)));
453                    }
454                }
455                Err(e) => {
456                    self.done = true;
457                    return Some(Err(map_pcap_err(e)));
458                }
459            }
460        }
461    }
462}
463
464/// Count packets per flow ID in a single streaming pass over `path`.
465///
466/// Applies `filter` and `bpf` to each packet (caller must clear
467/// `filter.flow_ids` to avoid circular exclusion). Non-IP packets
468/// (no flow key) are not counted.
469///
470/// Returns a map from flow ID → packet count.
471///
472/// # Errors
473/// Returns [`ToolkitPcapError`] on I/O or parse failure.
474pub fn count_flows_in_file(
475    path: &Path,
476    filter: &crate::filter::Filter,
477    bpf: Option<&crate::bpf::BpfExpr>,
478    unidirectional: bool,
479) -> Result<std::collections::HashMap<u64, u64>, ToolkitPcapError> {
480    use crate::filter::PacketMeta;
481    use std::collections::HashMap;
482
483    let mut counts: HashMap<u64, u64> = HashMap::new();
484    let has_filter = !filter.is_empty() || bpf.is_some();
485
486    for item in open_with_payload(path)? {
487        let packet = item?;
488        let meta = PacketMeta::from_packet(
489            packet.info.timestamp_ns,
490            packet.info.captured_len,
491            &packet.data,
492        );
493
494        if has_filter {
495            let ok =
496                (filter.is_empty() || filter.matches(&meta)) && bpf.is_none_or(|b| b.eval(&meta));
497            if !ok {
498                continue;
499            }
500        }
501
502        if let Some(ref key) = meta.flow_key {
503            let id = key.flow_id(unidirectional);
504            *counts.entry(id).or_default() += 1;
505        }
506    }
507
508    Ok(counts)
509}
510
511/// Attempt to extract the 5-tuple from an Ethernet frame's payload.
512///
513/// Returns `None` for non-IP or unsupported protocols.
514fn parse_flow_key(data: &[u8]) -> Option<FlowKey> {
515    let sliced = SlicedPacket::from_ethernet(data).ok()?;
516
517    let (src_ip, dst_ip, protocol) = match &sliced.net {
518        Some(etherparse::NetSlice::Ipv4(v4)) => {
519            let h = v4.header();
520            (
521                IpAddr::V4(h.source_addr()),
522                IpAddr::V4(h.destination_addr()),
523                h.protocol().0,
524            )
525        }
526        Some(etherparse::NetSlice::Ipv6(v6)) => {
527            let h = v6.header();
528            (
529                IpAddr::V6(h.source_addr()),
530                IpAddr::V6(h.destination_addr()),
531                h.next_header().0,
532            )
533        }
534        // ARP, unknown, or missing network layer — not a flow.
535        _ => return None,
536    };
537
538    let (src_port, dst_port) = match &sliced.transport {
539        Some(etherparse::TransportSlice::Tcp(tcp)) => (tcp.source_port(), tcp.destination_port()),
540        Some(etherparse::TransportSlice::Udp(udp)) => (udp.source_port(), udp.destination_port()),
541        _ => (0, 0),
542    };
543
544    Some(FlowKey::new(src_ip, dst_ip, src_port, dst_port, protocol))
545}