use crate::parser::{Confidence, FormatParser, MAGIC};
use crate::table::TableBuilder;
use ax_core::{AxError, Column, Value};
use pcap_parser::pcapng::Block;
use pcap_parser::{create_reader, Linktype, PcapBlockOwned, PcapError};
use std::collections::BTreeMap;
#[derive(Debug, Default, Clone)]
pub struct PcapParser;
const MAGICS: [[u8; 4]; 5] = [
[0xd4, 0xc3, 0xb2, 0xa1], [0xa1, 0xb2, 0xc3, 0xd4], [0x4d, 0x3c, 0xb2, 0xa1], [0xa1, 0xb2, 0x3c, 0x4d], [0x0a, 0x0d, 0x0d, 0x0a], ];
const DEFAULT_TS_RESOLUTION: u64 = 1_000_000;
fn packet_row(timestamp: Option<f64>, orig_len: u32, cap_len: u32) -> BTreeMap<String, Value> {
let mut row = BTreeMap::new();
if let Some(ts) = timestamp.filter(|t| t.is_finite()) {
row.insert("timestamp".to_string(), Value::Float(ts));
}
row.insert("length".to_string(), Value::Int(i64::from(orig_len)));
row.insert("caplen".to_string(), Value::Int(i64::from(cap_len)));
row
}
fn add_l3(linktype: Linktype, data: &[u8], row: &mut BTreeMap<String, Value>) {
use etherparse::{NetSlice, SlicedPacket};
let sliced = match linktype.0 {
1 => SlicedPacket::from_ethernet(data), 101 | 228 | 229 => SlicedPacket::from_ip(data), _ => return,
};
let Ok(sliced) = sliced else { return };
match sliced.net {
Some(NetSlice::Ipv4(ip)) => {
let h = ip.header();
row.insert("src_ip".into(), Value::Str(h.source_addr().to_string()));
row.insert(
"dst_ip".into(),
Value::Str(h.destination_addr().to_string()),
);
row.insert("ip_proto".into(), Value::Int(i64::from(h.protocol().0)));
}
Some(NetSlice::Ipv6(ip)) => {
let h = ip.header();
row.insert("src_ip".into(), Value::Str(h.source_addr().to_string()));
row.insert(
"dst_ip".into(),
Value::Str(h.destination_addr().to_string()),
);
row.insert("ip_proto".into(), Value::Int(i64::from(h.next_header().0)));
}
_ => {}
}
}
impl PcapParser {
fn err(&self, msg: impl std::fmt::Display) -> AxError {
AxError::Parse {
format: self.id().to_string(),
message: msg.to_string(),
}
}
}
impl FormatParser for PcapParser {
fn id(&self) -> &'static str {
"pcap"
}
fn extensions(&self) -> &'static [&'static str] {
&["pcap", "pcapng", "cap"]
}
fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
let head = bytes.get(..4)?;
MAGICS.iter().any(|m| m == head).then_some(MAGIC)
}
fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
let mut reader = create_reader(65536, bytes).map_err(|e| self.err(format!("{e:?}")))?;
let mut builder = TableBuilder::new();
let mut linktype = Linktype::ETHERNET; let mut nanosecond = false; let mut resolution = DEFAULT_TS_RESOLUTION;
loop {
match reader.next() {
Ok((offset, block)) => {
match block {
PcapBlockOwned::LegacyHeader(hdr) => {
linktype = hdr.network;
nanosecond = hdr.is_nanosecond_precision();
}
PcapBlockOwned::Legacy(b) => {
let scale = if nanosecond { 1e-9 } else { 1e-6 };
let ts = f64::from(b.ts_sec) + f64::from(b.ts_usec) * scale;
let mut row = packet_row(Some(ts), b.origlen, b.caplen);
add_l3(linktype, b.data, &mut row);
builder.push_row(row);
}
PcapBlockOwned::NG(Block::InterfaceDescription(idb)) => {
linktype = idb.linktype;
resolution = idb.ts_resolution().unwrap_or(DEFAULT_TS_RESOLUTION);
}
PcapBlockOwned::NG(Block::EnhancedPacket(epb)) => {
let ts = epb.decode_ts_f64(0, resolution);
let mut row = packet_row(Some(ts), epb.origlen, epb.caplen);
add_l3(linktype, epb.data, &mut row);
builder.push_row(row);
}
PcapBlockOwned::NG(Block::SimplePacket(spb)) => {
let caplen = spb.data.len() as u32;
let mut row = packet_row(None, spb.origlen, caplen);
add_l3(linktype, spb.data, &mut row);
builder.push_row(row);
}
PcapBlockOwned::NG(_) => {} }
reader.consume(offset);
}
Err(PcapError::Eof) => break,
Err(PcapError::Incomplete(_)) => {
if reader.refill().is_err() {
break;
}
}
Err(e) => return Err(self.err(format!("{e:?}"))),
}
}
Ok(builder.finish())
}
}
#[cfg(test)]
mod tests {
use super::*;
use ax_core::ColType;
fn push_u16(b: &mut Vec<u8>, v: u16) {
b.extend_from_slice(&v.to_le_bytes());
}
fn push_u32(b: &mut Vec<u8>, v: u32) {
b.extend_from_slice(&v.to_le_bytes());
}
fn build_legacy_pcap(nanosecond: bool) -> Vec<u8> {
let mut b = Vec::new();
let magic: u32 = if nanosecond { 0xa1b2_3c4d } else { 0xa1b2_c3d4 };
push_u32(&mut b, magic);
push_u16(&mut b, 2); push_u16(&mut b, 4); push_u32(&mut b, 0); push_u32(&mut b, 0); push_u32(&mut b, 65535); push_u32(&mut b, 1);
push_u32(&mut b, 1000);
push_u32(&mut b, 0);
push_u32(&mut b, 4);
push_u32(&mut b, 4);
b.extend_from_slice(&[0, 0, 0, 0]);
let frac: u32 = if nanosecond { 500_000_000 } else { 500_000 };
push_u32(&mut b, 1001);
push_u32(&mut b, frac);
push_u32(&mut b, 4);
push_u32(&mut b, 60);
b.extend_from_slice(&[0, 0, 0, 0]);
b
}
fn build_pcapng() -> Vec<u8> {
let mut b = Vec::new();
push_u32(&mut b, 0x0a0d_0d0a);
push_u32(&mut b, 28);
push_u32(&mut b, 0x1a2b_3c4d); push_u16(&mut b, 1); push_u16(&mut b, 0); push_u32(&mut b, 0xffff_ffff); push_u32(&mut b, 0xffff_ffff); push_u32(&mut b, 28);
push_u32(&mut b, 0x0000_0001);
push_u32(&mut b, 20);
push_u16(&mut b, 1); push_u16(&mut b, 0); push_u32(&mut b, 65535); push_u32(&mut b, 20);
push_u32(&mut b, 0x0000_0006);
push_u32(&mut b, 36);
push_u32(&mut b, 0); push_u32(&mut b, 0); push_u32(&mut b, 1_500_000); push_u32(&mut b, 4); push_u32(&mut b, 4); b.extend_from_slice(&[0, 0, 0, 0]); push_u32(&mut b, 36);
b
}
fn build_eth_ipv4_udp() -> Vec<u8> {
let mut f = Vec::new();
f.extend_from_slice(&[0xff; 6]); f.extend_from_slice(&[0x11; 6]); push_u16_be(&mut f, 0x0800); f.push(0x45); f.push(0x00); push_u16_be(&mut f, 30); push_u16_be(&mut f, 0); push_u16_be(&mut f, 0); f.push(64); f.push(17); push_u16_be(&mut f, 0); f.extend_from_slice(&[1, 2, 3, 4]); f.extend_from_slice(&[5, 6, 7, 8]); push_u16_be(&mut f, 1234); push_u16_be(&mut f, 53); push_u16_be(&mut f, 10); push_u16_be(&mut f, 0); f.extend_from_slice(b"hi");
f
}
fn push_u16_be(b: &mut Vec<u8>, v: u16) {
b.extend_from_slice(&v.to_be_bytes());
}
fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
cols.iter()
.find(|c| c.name == name)
.unwrap_or_else(|| panic!("missing column {name}"))
}
#[test]
fn legacy_pcap_timestamps_and_lengths() {
let cols = PcapParser
.parse("c.pcap", &build_legacy_pcap(false))
.unwrap();
let ts = col(&cols, "timestamp");
assert_eq!(ts.ty, ColType::Float);
assert_eq!(ts.cells, vec![Value::Float(1000.0), Value::Float(1001.5)]);
assert_eq!(
col(&cols, "length").cells,
vec![Value::Int(4), Value::Int(60)]
);
assert_eq!(
col(&cols, "caplen").cells,
vec![Value::Int(4), Value::Int(4)]
);
}
#[test]
fn nanosecond_precision_scales_the_fraction() {
let cols = PcapParser
.parse("c.pcap", &build_legacy_pcap(true))
.unwrap();
assert_eq!(col(&cols, "timestamp").cells[1], Value::Float(1001.5));
}
#[test]
fn pcapng_enhanced_packet_decodes() {
let cols = PcapParser.parse("c.pcapng", &build_pcapng()).unwrap();
assert_eq!(col(&cols, "timestamp").cells, vec![Value::Float(1.5)]);
assert_eq!(col(&cols, "length").cells, vec![Value::Int(4)]);
}
#[test]
fn add_l3_decodes_ethernet_ipv4() {
let mut row = BTreeMap::new();
add_l3(Linktype::ETHERNET, &build_eth_ipv4_udp(), &mut row);
assert_eq!(row.get("src_ip"), Some(&Value::Str("1.2.3.4".into())));
assert_eq!(row.get("dst_ip"), Some(&Value::Str("5.6.7.8".into())));
assert_eq!(row.get("ip_proto"), Some(&Value::Int(17))); }
fn build_ipv6_udp() -> Vec<u8> {
let mut f = Vec::new();
f.extend_from_slice(&[0x60, 0, 0, 0]); push_u16_be(&mut f, 10); f.push(17); f.push(64); f.extend_from_slice(&[0; 15]);
f.push(1); f.extend_from_slice(&[0; 15]);
f.push(2); push_u16_be(&mut f, 1234);
push_u16_be(&mut f, 53);
push_u16_be(&mut f, 10);
push_u16_be(&mut f, 0);
f.extend_from_slice(b"hi");
f
}
#[test]
fn add_l3_decodes_raw_ipv4_via_from_ip() {
let frame = build_eth_ipv4_udp();
let ip_only = &frame[14..]; let mut row = BTreeMap::new();
add_l3(Linktype(228), ip_only, &mut row); assert_eq!(row.get("src_ip"), Some(&Value::Str("1.2.3.4".into())));
assert_eq!(row.get("ip_proto"), Some(&Value::Int(17)));
}
#[test]
fn add_l3_decodes_ipv6() {
let mut row = BTreeMap::new();
add_l3(Linktype(101), &build_ipv6_udp(), &mut row); assert_eq!(row.get("src_ip"), Some(&Value::Str("::1".into())));
assert_eq!(row.get("dst_ip"), Some(&Value::Str("::2".into())));
assert_eq!(row.get("ip_proto"), Some(&Value::Int(17))); }
#[test]
fn add_l3_skips_unsupported_and_undecodable() {
let mut row = BTreeMap::new();
add_l3(Linktype(999), &build_eth_ipv4_udp(), &mut row);
assert!(row.is_empty());
let mut row2 = BTreeMap::new();
add_l3(Linktype::ETHERNET, &[0, 1, 2], &mut row2);
assert!(row2.is_empty());
}
#[test]
fn end_to_end_l3_columns_present_for_a_real_frame() {
let frame = build_eth_ipv4_udp();
let mut b = Vec::new();
push_u32(&mut b, 0xa1b2_c3d4);
push_u16(&mut b, 2);
push_u16(&mut b, 4);
push_u32(&mut b, 0);
push_u32(&mut b, 0);
push_u32(&mut b, 65535);
push_u32(&mut b, 1);
push_u32(&mut b, 7); push_u32(&mut b, 0);
push_u32(&mut b, frame.len() as u32);
push_u32(&mut b, frame.len() as u32);
b.extend_from_slice(&frame);
let cols = PcapParser.parse("c.pcap", &b).unwrap();
assert_eq!(col(&cols, "src_ip").cells[0], Value::Str("1.2.3.4".into()));
assert_eq!(col(&cols, "ip_proto").cells[0], Value::Int(17));
assert_eq!(col(&cols, "timestamp").cells[0], Value::Float(7.0));
}
#[test]
fn malformed_input_errors() {
assert!(matches!(
PcapParser.parse("c.pcap", b"this is not a capture"),
Err(AxError::Parse { .. })
));
}
#[test]
fn sniff_keys_on_each_magic() {
assert_eq!(PcapParser.sniff(&build_legacy_pcap(false)), Some(MAGIC));
assert_eq!(PcapParser.sniff(&build_legacy_pcap(true)), Some(MAGIC));
assert_eq!(PcapParser.sniff(&build_pcapng()), Some(MAGIC));
assert_eq!(
PcapParser.sniff(&[0xa1, 0xb2, 0xc3, 0xd4, 0, 0]),
Some(MAGIC)
); assert_eq!(PcapParser.sniff(b"PAR1...."), None); assert_eq!(PcapParser.sniff(b"\x00\x01\x02"), None); assert_eq!(PcapParser.sniff(b"{\"a\":1}"), None);
}
#[test]
fn claims_pcap_extensions() {
assert_eq!(PcapParser.extensions(), &["pcap", "pcapng", "cap"]);
}
#[test]
fn resolves_by_extension_and_magic() {
let reg = crate::parser::ParserRegistry::default();
assert_eq!(reg.resolve("dump.pcap", b"zz").unwrap().id(), "pcap");
assert_eq!(reg.resolve("dump.pcapng", b"zz").unwrap().id(), "pcap");
assert_eq!(
reg.resolve("-", &build_legacy_pcap(false)).unwrap().id(),
"pcap"
);
}
}