Skip to main content

anomalyx_normalize/parsers/
pcap.rs

1//! PCAP / PCAPNG packet-capture parser — the ground truth of network analysis.
2//!
3//! A capture is decoded to **one row per packet** with the columns the detectors
4//! need: `timestamp` (epoch seconds, `Float`) — the marquee input for
5//! **beaconing/C2 detection via `cadence`** on inter-arrival times — plus
6//! `length` (original) and `caplen` for volume `point` spikes, and, when the
7//! link layer is Ethernet or raw IP, `src_ip` / `dst_ip` / `ip_proto` for `mv`
8//! over per-packet features.
9//!
10//! The container (both legacy PCAP and PCAPNG, either byte order, µs or ns
11//! resolution) is decoded by `pcap-parser`; the L2/L3 headers by `etherparse`.
12//! Binary magic (confidence `MAGIC`); extensions `.pcap` / `.pcapng` / `.cap`.
13//! Behind the default-on `pcap` feature.
14
15use crate::parser::{Confidence, FormatParser, MAGIC};
16use crate::table::TableBuilder;
17use ax_core::{AxError, Column, Value};
18use pcap_parser::pcapng::Block;
19use pcap_parser::{create_reader, Linktype, PcapBlockOwned, PcapError};
20use std::collections::BTreeMap;
21
22#[derive(Debug, Default, Clone)]
23pub struct PcapParser;
24
25/// The legacy-pcap magics (µs/ns × LE/BE) and the PCAPNG section-header magic.
26const MAGICS: [[u8; 4]; 5] = [
27    [0xd4, 0xc3, 0xb2, 0xa1], // legacy µs, little-endian
28    [0xa1, 0xb2, 0xc3, 0xd4], // legacy µs, big-endian
29    [0x4d, 0x3c, 0xb2, 0xa1], // legacy ns, little-endian
30    [0xa1, 0xb2, 0x3c, 0x4d], // legacy ns, big-endian
31    [0x0a, 0x0d, 0x0d, 0x0a], // PCAPNG section header block
32];
33
34/// The default PCAPNG timestamp resolution (microseconds) when an interface
35/// declares none.
36const DEFAULT_TS_RESOLUTION: u64 = 1_000_000;
37
38/// Builds the per-packet row shared by every block type. `timestamp` is omitted
39/// (left `Null`) when a block carries none (e.g. a PCAPNG simple packet).
40fn packet_row(timestamp: Option<f64>, orig_len: u32, cap_len: u32) -> BTreeMap<String, Value> {
41    let mut row = BTreeMap::new();
42    if let Some(ts) = timestamp.filter(|t| t.is_finite()) {
43        row.insert("timestamp".to_string(), Value::Float(ts));
44    }
45    row.insert("length".to_string(), Value::Int(i64::from(orig_len)));
46    row.insert("caplen".to_string(), Value::Int(i64::from(cap_len)));
47    row
48}
49
50/// Decodes the L3 addresses from a packet, given its link type. Best-effort: an
51/// unsupported link type or an undecodable packet simply contributes no L3
52/// columns (the packet still has its timestamp/length).
53fn add_l3(linktype: Linktype, data: &[u8], row: &mut BTreeMap<String, Value>) {
54    use etherparse::{NetSlice, SlicedPacket};
55    let sliced = match linktype.0 {
56        1 => SlicedPacket::from_ethernet(data),         // ETHERNET
57        101 | 228 | 229 => SlicedPacket::from_ip(data), // RAW / IPV4 / IPV6
58        _ => return,
59    };
60    let Ok(sliced) = sliced else { return };
61    match sliced.net {
62        Some(NetSlice::Ipv4(ip)) => {
63            let h = ip.header();
64            row.insert("src_ip".into(), Value::Str(h.source_addr().to_string()));
65            row.insert(
66                "dst_ip".into(),
67                Value::Str(h.destination_addr().to_string()),
68            );
69            row.insert("ip_proto".into(), Value::Int(i64::from(h.protocol().0)));
70        }
71        Some(NetSlice::Ipv6(ip)) => {
72            let h = ip.header();
73            row.insert("src_ip".into(), Value::Str(h.source_addr().to_string()));
74            row.insert(
75                "dst_ip".into(),
76                Value::Str(h.destination_addr().to_string()),
77            );
78            row.insert("ip_proto".into(), Value::Int(i64::from(h.next_header().0)));
79        }
80        _ => {}
81    }
82}
83
84impl PcapParser {
85    fn err(&self, msg: impl std::fmt::Display) -> AxError {
86        AxError::Parse {
87            format: self.id().to_string(),
88            message: msg.to_string(),
89        }
90    }
91}
92
93impl FormatParser for PcapParser {
94    fn id(&self) -> &'static str {
95        "pcap"
96    }
97    fn extensions(&self) -> &'static [&'static str] {
98        &["pcap", "pcapng", "cap"]
99    }
100    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
101        let head = bytes.get(..4)?;
102        MAGICS.iter().any(|m| m == head).then_some(MAGIC)
103    }
104    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
105        let mut reader = create_reader(65536, bytes).map_err(|e| self.err(format!("{e:?}")))?;
106        let mut builder = TableBuilder::new();
107        let mut linktype = Linktype::ETHERNET; // legacy header / NG interface sets this
108        let mut nanosecond = false; // legacy timestamp precision
109        let mut resolution = DEFAULT_TS_RESOLUTION; // NG timestamp resolution
110
111        loop {
112            match reader.next() {
113                Ok((offset, block)) => {
114                    match block {
115                        PcapBlockOwned::LegacyHeader(hdr) => {
116                            linktype = hdr.network;
117                            nanosecond = hdr.is_nanosecond_precision();
118                        }
119                        PcapBlockOwned::Legacy(b) => {
120                            let scale = if nanosecond { 1e-9 } else { 1e-6 };
121                            let ts = f64::from(b.ts_sec) + f64::from(b.ts_usec) * scale;
122                            let mut row = packet_row(Some(ts), b.origlen, b.caplen);
123                            add_l3(linktype, b.data, &mut row);
124                            builder.push_row(row);
125                        }
126                        PcapBlockOwned::NG(Block::InterfaceDescription(idb)) => {
127                            linktype = idb.linktype;
128                            resolution = idb.ts_resolution().unwrap_or(DEFAULT_TS_RESOLUTION);
129                        }
130                        PcapBlockOwned::NG(Block::EnhancedPacket(epb)) => {
131                            let ts = epb.decode_ts_f64(0, resolution);
132                            let mut row = packet_row(Some(ts), epb.origlen, epb.caplen);
133                            add_l3(linktype, epb.data, &mut row);
134                            builder.push_row(row);
135                        }
136                        PcapBlockOwned::NG(Block::SimplePacket(spb)) => {
137                            let caplen = spb.data.len() as u32;
138                            let mut row = packet_row(None, spb.origlen, caplen);
139                            add_l3(linktype, spb.data, &mut row);
140                            builder.push_row(row);
141                        }
142                        PcapBlockOwned::NG(_) => {} // section header, stats, name resolution …
143                    }
144                    reader.consume(offset);
145                }
146                Err(PcapError::Eof) => break,
147                Err(PcapError::Incomplete(_)) => {
148                    // Grow the buffer; if no more data can be read, stop.
149                    if reader.refill().is_err() {
150                        break;
151                    }
152                }
153                Err(e) => return Err(self.err(format!("{e:?}"))),
154            }
155        }
156        Ok(builder.finish())
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use ax_core::ColType;
164
165    // ---- byte builders for fixtures --------------------------------------
166
167    fn push_u16(b: &mut Vec<u8>, v: u16) {
168        b.extend_from_slice(&v.to_le_bytes());
169    }
170    fn push_u32(b: &mut Vec<u8>, v: u32) {
171        b.extend_from_slice(&v.to_le_bytes());
172    }
173
174    /// A legacy little-endian PCAP with two packets (4 bytes of data each); the
175    /// second has `orig_len > caplen` (a truncated capture).
176    fn build_legacy_pcap(nanosecond: bool) -> Vec<u8> {
177        let mut b = Vec::new();
178        let magic: u32 = if nanosecond { 0xa1b2_3c4d } else { 0xa1b2_c3d4 };
179        push_u32(&mut b, magic);
180        push_u16(&mut b, 2); // version major
181        push_u16(&mut b, 4); // version minor
182        push_u32(&mut b, 0); // thiszone
183        push_u32(&mut b, 0); // sigfigs
184        push_u32(&mut b, 65535); // snaplen
185        push_u32(&mut b, 1); // network = Ethernet
186
187        // packet 0: ts 1000.0, caplen 4, origlen 4
188        push_u32(&mut b, 1000);
189        push_u32(&mut b, 0);
190        push_u32(&mut b, 4);
191        push_u32(&mut b, 4);
192        b.extend_from_slice(&[0, 0, 0, 0]);
193
194        // packet 1: ts 1001 + frac, caplen 4, origlen 60 (truncated)
195        let frac: u32 = if nanosecond { 500_000_000 } else { 500_000 };
196        push_u32(&mut b, 1001);
197        push_u32(&mut b, frac);
198        push_u32(&mut b, 4);
199        push_u32(&mut b, 60);
200        b.extend_from_slice(&[0, 0, 0, 0]);
201        b
202    }
203
204    /// A minimal PCAPNG (SHB + IDB + one Enhanced Packet) at ts 1.5s.
205    fn build_pcapng() -> Vec<u8> {
206        let mut b = Vec::new();
207        // Section Header Block (28 bytes).
208        push_u32(&mut b, 0x0a0d_0d0a);
209        push_u32(&mut b, 28);
210        push_u32(&mut b, 0x1a2b_3c4d); // byte-order magic
211        push_u16(&mut b, 1); // major
212        push_u16(&mut b, 0); // minor
213        push_u32(&mut b, 0xffff_ffff); // section length = -1 (low)
214        push_u32(&mut b, 0xffff_ffff); // section length = -1 (high)
215        push_u32(&mut b, 28);
216        // Interface Description Block (20 bytes): linktype Ethernet, no options.
217        push_u32(&mut b, 0x0000_0001);
218        push_u32(&mut b, 20);
219        push_u16(&mut b, 1); // linktype Ethernet
220        push_u16(&mut b, 0); // reserved
221        push_u32(&mut b, 65535); // snaplen
222        push_u32(&mut b, 20);
223        // Enhanced Packet Block (36 bytes): ts_low = 1_500_000 µs → 1.5s.
224        push_u32(&mut b, 0x0000_0006);
225        push_u32(&mut b, 36);
226        push_u32(&mut b, 0); // interface id
227        push_u32(&mut b, 0); // ts high
228        push_u32(&mut b, 1_500_000); // ts low
229        push_u32(&mut b, 4); // caplen
230        push_u32(&mut b, 4); // origlen
231        b.extend_from_slice(&[0, 0, 0, 0]); // data (4 bytes, already aligned)
232        push_u32(&mut b, 36);
233        b
234    }
235
236    /// A real Ethernet/IPv4/UDP frame: 1.2.3.4 → 5.6.7.8, proto 17.
237    fn build_eth_ipv4_udp() -> Vec<u8> {
238        let mut f = Vec::new();
239        f.extend_from_slice(&[0xff; 6]); // dst MAC
240        f.extend_from_slice(&[0x11; 6]); // src MAC
241        push_u16_be(&mut f, 0x0800); // ethertype IPv4
242                                     // IPv4 header (20 bytes), total_len 30 (20 + 8 UDP + 2 payload).
243        f.push(0x45); // version 4, ihl 5
244        f.push(0x00); // dscp/ecn
245        push_u16_be(&mut f, 30); // total length
246        push_u16_be(&mut f, 0); // id
247        push_u16_be(&mut f, 0); // flags/frag
248        f.push(64); // ttl
249        f.push(17); // protocol UDP
250        push_u16_be(&mut f, 0); // header checksum (not verified by the slicer)
251        f.extend_from_slice(&[1, 2, 3, 4]); // src ip
252        f.extend_from_slice(&[5, 6, 7, 8]); // dst ip
253                                            // UDP header (8 bytes) + 2 payload.
254        push_u16_be(&mut f, 1234); // src port
255        push_u16_be(&mut f, 53); // dst port
256        push_u16_be(&mut f, 10); // length (8 + 2)
257        push_u16_be(&mut f, 0); // checksum
258        f.extend_from_slice(b"hi");
259        f
260    }
261    fn push_u16_be(b: &mut Vec<u8>, v: u16) {
262        b.extend_from_slice(&v.to_be_bytes());
263    }
264
265    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
266        cols.iter()
267            .find(|c| c.name == name)
268            .unwrap_or_else(|| panic!("missing column {name}"))
269    }
270
271    // ---- tests -----------------------------------------------------------
272
273    #[test]
274    fn legacy_pcap_timestamps_and_lengths() {
275        let cols = PcapParser
276            .parse("c.pcap", &build_legacy_pcap(false))
277            .unwrap();
278        let ts = col(&cols, "timestamp");
279        assert_eq!(ts.ty, ColType::Float);
280        assert_eq!(ts.cells, vec![Value::Float(1000.0), Value::Float(1001.5)]);
281        // length is the original length; caplen the captured (truncated) length.
282        assert_eq!(
283            col(&cols, "length").cells,
284            vec![Value::Int(4), Value::Int(60)]
285        );
286        assert_eq!(
287            col(&cols, "caplen").cells,
288            vec![Value::Int(4), Value::Int(4)]
289        );
290    }
291
292    #[test]
293    fn nanosecond_precision_scales_the_fraction() {
294        let cols = PcapParser
295            .parse("c.pcap", &build_legacy_pcap(true))
296            .unwrap();
297        // 1001 s + 500_000_000 ns = 1001.5 s (vs the µs interpretation).
298        assert_eq!(col(&cols, "timestamp").cells[1], Value::Float(1001.5));
299    }
300
301    #[test]
302    fn pcapng_enhanced_packet_decodes() {
303        let cols = PcapParser.parse("c.pcapng", &build_pcapng()).unwrap();
304        assert_eq!(col(&cols, "timestamp").cells, vec![Value::Float(1.5)]);
305        assert_eq!(col(&cols, "length").cells, vec![Value::Int(4)]);
306    }
307
308    #[test]
309    fn add_l3_decodes_ethernet_ipv4() {
310        let mut row = BTreeMap::new();
311        add_l3(Linktype::ETHERNET, &build_eth_ipv4_udp(), &mut row);
312        assert_eq!(row.get("src_ip"), Some(&Value::Str("1.2.3.4".into())));
313        assert_eq!(row.get("dst_ip"), Some(&Value::Str("5.6.7.8".into())));
314        assert_eq!(row.get("ip_proto"), Some(&Value::Int(17))); // UDP
315    }
316
317    /// An IPv6/UDP frame (no Ethernet): ::1 → ::2, next-header 17.
318    fn build_ipv6_udp() -> Vec<u8> {
319        let mut f = Vec::new();
320        f.extend_from_slice(&[0x60, 0, 0, 0]); // version 6, traffic class, flow label
321        push_u16_be(&mut f, 10); // payload length (8 UDP + 2)
322        f.push(17); // next header UDP
323        f.push(64); // hop limit
324        f.extend_from_slice(&[0; 15]);
325        f.push(1); // src ::1
326        f.extend_from_slice(&[0; 15]);
327        f.push(2); // dst ::2
328        push_u16_be(&mut f, 1234);
329        push_u16_be(&mut f, 53);
330        push_u16_be(&mut f, 10);
331        push_u16_be(&mut f, 0);
332        f.extend_from_slice(b"hi");
333        f
334    }
335
336    #[test]
337    fn add_l3_decodes_raw_ipv4_via_from_ip() {
338        // Raw-IP link types (e.g. 228) take the from_ip path, no Ethernet header.
339        let frame = build_eth_ipv4_udp();
340        let ip_only = &frame[14..]; // strip the 14-byte Ethernet header
341        let mut row = BTreeMap::new();
342        add_l3(Linktype(228), ip_only, &mut row); // LINKTYPE_IPV4
343        assert_eq!(row.get("src_ip"), Some(&Value::Str("1.2.3.4".into())));
344        assert_eq!(row.get("ip_proto"), Some(&Value::Int(17)));
345    }
346
347    #[test]
348    fn add_l3_decodes_ipv6() {
349        let mut row = BTreeMap::new();
350        add_l3(Linktype(101), &build_ipv6_udp(), &mut row); // RAW; version nibble selects v6
351        assert_eq!(row.get("src_ip"), Some(&Value::Str("::1".into())));
352        assert_eq!(row.get("dst_ip"), Some(&Value::Str("::2".into())));
353        assert_eq!(row.get("ip_proto"), Some(&Value::Int(17))); // next header
354    }
355
356    #[test]
357    fn add_l3_skips_unsupported_and_undecodable() {
358        // Unknown link type → no L3 columns.
359        let mut row = BTreeMap::new();
360        add_l3(Linktype(999), &build_eth_ipv4_udp(), &mut row);
361        assert!(row.is_empty());
362        // Ethernet link type but garbage/too-short data → no L3 columns.
363        let mut row2 = BTreeMap::new();
364        add_l3(Linktype::ETHERNET, &[0, 1, 2], &mut row2);
365        assert!(row2.is_empty());
366    }
367
368    #[test]
369    fn end_to_end_l3_columns_present_for_a_real_frame() {
370        // A legacy pcap whose single packet is a full Ethernet/IPv4/UDP frame.
371        let frame = build_eth_ipv4_udp();
372        let mut b = Vec::new();
373        push_u32(&mut b, 0xa1b2_c3d4);
374        push_u16(&mut b, 2);
375        push_u16(&mut b, 4);
376        push_u32(&mut b, 0);
377        push_u32(&mut b, 0);
378        push_u32(&mut b, 65535);
379        push_u32(&mut b, 1);
380        push_u32(&mut b, 7); // ts_sec
381        push_u32(&mut b, 0);
382        push_u32(&mut b, frame.len() as u32);
383        push_u32(&mut b, frame.len() as u32);
384        b.extend_from_slice(&frame);
385
386        let cols = PcapParser.parse("c.pcap", &b).unwrap();
387        assert_eq!(col(&cols, "src_ip").cells[0], Value::Str("1.2.3.4".into()));
388        assert_eq!(col(&cols, "ip_proto").cells[0], Value::Int(17));
389        assert_eq!(col(&cols, "timestamp").cells[0], Value::Float(7.0));
390    }
391
392    #[test]
393    fn malformed_input_errors() {
394        assert!(matches!(
395            PcapParser.parse("c.pcap", b"this is not a capture"),
396            Err(AxError::Parse { .. })
397        ));
398    }
399
400    #[test]
401    fn sniff_keys_on_each_magic() {
402        assert_eq!(PcapParser.sniff(&build_legacy_pcap(false)), Some(MAGIC));
403        assert_eq!(PcapParser.sniff(&build_legacy_pcap(true)), Some(MAGIC));
404        assert_eq!(PcapParser.sniff(&build_pcapng()), Some(MAGIC));
405        assert_eq!(
406            PcapParser.sniff(&[0xa1, 0xb2, 0xc3, 0xd4, 0, 0]),
407            Some(MAGIC)
408        ); // BE µs
409        assert_eq!(PcapParser.sniff(b"PAR1...."), None); // parquet
410        assert_eq!(PcapParser.sniff(b"\x00\x01\x02"), None); // too short
411        assert_eq!(PcapParser.sniff(b"{\"a\":1}"), None);
412    }
413
414    #[test]
415    fn claims_pcap_extensions() {
416        assert_eq!(PcapParser.extensions(), &["pcap", "pcapng", "cap"]);
417    }
418
419    #[test]
420    fn resolves_by_extension_and_magic() {
421        let reg = crate::parser::ParserRegistry::default();
422        assert_eq!(reg.resolve("dump.pcap", b"zz").unwrap().id(), "pcap");
423        assert_eq!(reg.resolve("dump.pcapng", b"zz").unwrap().id(), "pcap");
424        assert_eq!(
425            reg.resolve("-", &build_legacy_pcap(false)).unwrap().id(),
426            "pcap"
427        );
428    }
429}