pub mod avro;
pub mod json;
pub mod parquet;
use std::net::IpAddr;
use std::path::Path;
use crate::bpf::BpfExpr;
use crate::error::ExportError;
use crate::filter::{Filter, PacketMeta};
use crate::pcap;
pub trait PacketSink {
fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError>;
fn close(&mut self) -> Result<u64, ExportError>;
}
const CHANNEL_CAPACITY: usize = 4096;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExportFormat {
Json,
Parquet,
Avro,
}
impl ExportFormat {
pub fn from_extension(path: &Path) -> Option<Self> {
match path.extension()?.to_str()? {
"jsonl" | "json" | "ndjson" => Some(Self::Json),
"parquet" => Some(Self::Parquet),
"avro" => Some(Self::Avro),
_ => None,
}
}
pub fn parse(s: &str) -> Option<Self> {
match s.to_ascii_lowercase().as_str() {
"json" | "jsonl" | "ndjson" => Some(Self::Json),
"parquet" => Some(Self::Parquet),
"avro" => Some(Self::Avro),
_ => None,
}
}
}
#[derive(Debug)]
pub struct OutputTarget {
pub path: std::path::PathBuf,
pub format: Option<ExportFormat>,
pub compress_payload: bool,
}
#[derive(Debug)]
pub struct MultiExportOptions {
pub targets: Vec<OutputTarget>,
pub filter: Filter,
pub bpf_filter: Option<BpfExpr>,
pub unidirectional: bool,
}
#[derive(Debug)]
pub struct MultiExportReport {
pub outputs: Vec<ExportReport>,
}
#[derive(Debug, Default)]
pub struct ExportOptions {
pub output: std::path::PathBuf,
pub format: Option<ExportFormat>,
pub compress_payload: bool,
pub filter: Filter,
pub bpf_filter: Option<BpfExpr>,
pub unidirectional: bool,
}
#[derive(Debug)]
pub struct ExportReport {
pub packets_written: u64,
pub output_path: std::path::PathBuf,
}
#[derive(Debug, Clone)]
pub struct PacketRecord {
pub timestamp_ns: u64,
pub src_ip: Option<IpAddr>,
pub dst_ip: Option<IpAddr>,
pub src_port: Option<u16>,
pub dst_port: Option<u16>,
pub protocol: Option<u8>,
pub flow_id: Option<u64>,
pub caplen: u32,
pub origlen: u32,
pub tcp_flags: Option<u8>,
pub payload: Vec<u8>,
}
pub fn export_multi(
input: &Path,
opts: &MultiExportOptions,
) -> Result<MultiExportReport, ExportError> {
if opts.targets.is_empty() {
return Ok(MultiExportReport {
outputs: Vec::new(),
});
}
struct SinkState {
sink: Box<dyn PacketSink>,
path: std::path::PathBuf,
}
let mut sinks: Vec<SinkState> = opts
.targets
.iter()
.map(|t| {
Ok(SinkState {
sink: build_sink(t)?,
path: t.path.clone(),
})
})
.collect::<Result<Vec<_>, ExportError>>()?;
let (tx, rx) = std::sync::mpsc::sync_channel::<PacketRecord>(CHANNEL_CAPACITY);
let input_owned = input.to_owned();
let filter = opts.filter.clone();
let bpf_filter = opts.bpf_filter.clone();
let unidirectional = opts.unidirectional;
let producer = std::thread::spawn(move || -> Result<(), ExportError> {
let has_filter = !filter.is_empty() || bpf_filter.is_some();
for item in
pcap::open_with_payload(&input_owned).map_err(|e| ExportError::Parse(e.to_string()))?
{
let packet = item.map_err(|e| ExportError::Parse(e.to_string()))?;
let meta = PacketMeta::from_packet(
packet.info.timestamp_ns,
packet.info.captured_len,
&packet.data,
);
if has_filter {
let ok = (filter.is_empty() || filter.matches(&meta))
&& bpf_filter.as_ref().is_none_or(|b| b.eval(&meta));
if !ok {
continue;
}
}
let record = build_record(&packet, &meta, unidirectional);
if tx.send(record).is_err() {
break;
}
}
Ok(())
});
for record in rx {
for state in &mut sinks {
state.sink.write(&record)?;
}
}
producer.join().map_err(|_| ExportError::ThreadPanic)??;
let outputs = sinks
.into_iter()
.map(|mut state| {
let packets_written = state.sink.close()?;
Ok(ExportReport {
packets_written,
output_path: state.path,
})
})
.collect::<Result<Vec<_>, ExportError>>()?;
Ok(MultiExportReport { outputs })
}
pub fn export_file(input: &Path, opts: &ExportOptions) -> Result<ExportReport, ExportError> {
let format = opts
.format
.or_else(|| ExportFormat::from_extension(&opts.output))
.ok_or_else(|| {
ExportError::UnknownFormat(
opts.output
.extension()
.and_then(|e| e.to_str())
.unwrap_or("<none>")
.to_owned(),
)
})?;
let multi_opts = MultiExportOptions {
targets: vec![OutputTarget {
path: opts.output.clone(),
format: Some(format),
compress_payload: opts.compress_payload,
}],
filter: opts.filter.clone(),
bpf_filter: opts.bpf_filter.clone(),
unidirectional: opts.unidirectional,
};
let mut report = export_multi(input, &multi_opts)?;
Ok(report.outputs.remove(0))
}
fn build_sink(target: &OutputTarget) -> Result<Box<dyn PacketSink>, ExportError> {
let format = target
.format
.or_else(|| ExportFormat::from_extension(&target.path))
.ok_or_else(|| {
ExportError::UnknownFormat(
target
.path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("<none>")
.to_owned(),
)
})?;
let sink: Box<dyn PacketSink> = match format {
ExportFormat::Json => Box::new(json::JsonSink::create(
&target.path,
target.compress_payload,
)?),
ExportFormat::Parquet => Box::new(parquet::ParquetSink::create(
&target.path,
target.compress_payload,
)?),
ExportFormat::Avro => Box::new(avro::AvroSink::create(
&target.path,
target.compress_payload,
)?),
};
Ok(sink)
}
fn build_record(
packet: &pcap::PacketData,
meta: &PacketMeta,
unidirectional: bool,
) -> PacketRecord {
let (src_ip, dst_ip, src_port, dst_port, protocol, flow_id) =
if let Some(ref key) = meta.flow_key {
let port = matches!(key.protocol, 6 | 17);
(
Some(key.src_ip),
Some(key.dst_ip),
port.then_some(key.src_port),
port.then_some(key.dst_port),
Some(key.protocol),
Some(key.flow_id(unidirectional)),
)
} else {
(None, None, None, None, None, None)
};
let tcp_flags = if meta.flow_key.as_ref().is_some_and(|k| k.protocol == 6) {
Some(meta.tcp_flags)
} else {
None
};
PacketRecord {
timestamp_ns: meta.timestamp_ns,
src_ip,
dst_ip,
src_port,
dst_port,
protocol,
flow_id,
caplen: packet.info.captured_len,
origlen: packet.info.original_len,
tcp_flags,
payload: packet.data.clone(),
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};
use crate::filter::{PacketMeta, TcpFlagsFilter};
use crate::flow::FlowKey;
use crate::pcap::{PacketData, PacketInfo};
use super::*;
fn make_packet(caplen: u32) -> PacketData {
PacketData {
info: PacketInfo {
timestamp_ns: 0,
captured_len: caplen,
original_len: caplen,
flow_key: None,
},
data: vec![0u8; caplen as usize],
}
}
fn make_meta(src: IpAddr, dst: IpAddr, sport: u16, dport: u16, proto: u8) -> PacketMeta {
PacketMeta {
timestamp_ns: 0,
captured_len: 60,
flow_key: Some(FlowKey::new(src, dst, sport, dport, proto)),
tcp_flags: 0,
}
}
fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
IpAddr::V4(Ipv4Addr::new(a, b, c, d))
}
#[test]
fn test_build_record_tcp_has_ports() {
let pkt = make_packet(60);
let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 1234, 443, 6);
let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.src_port, Some(1234));
assert_eq!(rec.dst_port, Some(443));
}
#[test]
fn test_build_record_udp_has_ports() {
let pkt = make_packet(60);
let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 5000, 53, 17);
let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.src_port, Some(5000));
assert_eq!(rec.dst_port, Some(53));
}
#[test]
fn test_build_record_icmp_ports_are_none() {
let pkt = make_packet(60);
let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 0, 0, 1);
let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.src_port, None, "ICMP src_port must be None");
assert_eq!(rec.dst_port, None, "ICMP dst_port must be None");
}
#[test]
fn test_build_record_icmpv6_ports_are_none() {
let pkt = make_packet(60);
let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 0, 0, 58);
let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.src_port, None);
assert_eq!(rec.dst_port, None);
}
#[test]
fn test_build_record_non_ip_all_flow_fields_none() {
let pkt = make_packet(60);
let meta = PacketMeta {
timestamp_ns: 1_000,
captured_len: 60,
flow_key: None,
tcp_flags: 0,
};
let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.src_ip, None);
assert_eq!(rec.dst_ip, None);
assert_eq!(rec.src_port, None);
assert_eq!(rec.dst_port, None);
assert_eq!(rec.protocol, None);
assert_eq!(rec.flow_id, None);
assert_eq!(rec.tcp_flags, None);
}
#[test]
fn test_build_record_tcp_flags_only_for_tcp() {
let pkt = make_packet(60);
let mut meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 100, 200, 17);
meta.tcp_flags = 0x02; let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.tcp_flags, None);
let mut meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 100, 80, 6);
meta.tcp_flags = TcpFlagsFilter::parse("SYN+ACK").unwrap().mask;
let rec = build_record(&pkt, &meta, false);
assert_eq!(rec.tcp_flags, Some(0x12));
}
}