pub mod avro;
pub mod json;
pub mod parquet;
use std::net::IpAddr;
use std::path::Path;
use crate::bpf::BpfExpr;
use crate::error::ExportError;
use crate::filter::{Filter, PacketMeta};
use crate::pcap;
pub trait PacketSink {
fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError>;
fn close(&mut self) -> Result<u64, ExportError>;
}
const CHANNEL_CAPACITY: usize = 4096;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExportFormat {
Json,
Parquet,
Avro,
}
impl ExportFormat {
pub fn from_extension(path: &Path) -> Option<Self> {
match path.extension()?.to_str()? {
"jsonl" | "json" | "ndjson" => Some(Self::Json),
"parquet" => Some(Self::Parquet),
"avro" => Some(Self::Avro),
_ => None,
}
}
pub fn parse(s: &str) -> Option<Self> {
match s.to_ascii_lowercase().as_str() {
"json" | "jsonl" | "ndjson" => Some(Self::Json),
"parquet" => Some(Self::Parquet),
"avro" => Some(Self::Avro),
_ => None,
}
}
}
#[derive(Debug)]
pub struct OutputTarget {
pub path: std::path::PathBuf,
pub format: Option<ExportFormat>,
pub compress_payload: bool,
}
#[derive(Debug)]
pub struct MultiExportOptions {
pub targets: Vec<OutputTarget>,
pub filter: Filter,
pub bpf_filter: Option<BpfExpr>,
pub unidirectional: bool,
}
#[derive(Debug)]
pub struct MultiExportReport {
pub outputs: Vec<ExportReport>,
}
#[derive(Debug, Default)]
pub struct ExportOptions {
pub output: std::path::PathBuf,
pub format: Option<ExportFormat>,
pub compress_payload: bool,
pub filter: Filter,
pub bpf_filter: Option<BpfExpr>,
pub unidirectional: bool,
}
#[derive(Debug)]
pub struct ExportReport {
pub packets_written: u64,
pub output_path: std::path::PathBuf,
}
#[derive(Debug, Clone)]
pub struct PacketRecord {
pub timestamp_ns: u64,
pub src_ip: Option<IpAddr>,
pub dst_ip: Option<IpAddr>,
pub src_port: Option<u16>,
pub dst_port: Option<u16>,
pub protocol: Option<u8>,
pub flow_id: Option<u64>,
pub caplen: u32,
pub origlen: u32,
pub tcp_flags: Option<u8>,
pub payload: Vec<u8>,
}
pub fn export_multi(
input: &Path,
opts: &MultiExportOptions,
) -> Result<MultiExportReport, ExportError> {
if opts.targets.is_empty() {
return Ok(MultiExportReport {
outputs: Vec::new(),
});
}
struct SinkState {
sink: Box<dyn PacketSink>,
path: std::path::PathBuf,
}
let mut sinks: Vec<SinkState> = opts
.targets
.iter()
.map(|t| {
Ok(SinkState {
sink: build_sink(t)?,
path: t.path.clone(),
})
})
.collect::<Result<Vec<_>, ExportError>>()?;
let (tx, rx) = std::sync::mpsc::sync_channel::<PacketRecord>(CHANNEL_CAPACITY);
let input_owned = input.to_owned();
let filter = opts.filter.clone();
let bpf_filter = opts.bpf_filter.clone();
let unidirectional = opts.unidirectional;
let producer = std::thread::spawn(move || -> Result<(), ExportError> {
let has_filter = !filter.is_empty() || bpf_filter.is_some();
for item in
pcap::open_with_payload(&input_owned).map_err(|e| ExportError::Parse(e.to_string()))?
{
let packet = item.map_err(|e| ExportError::Parse(e.to_string()))?;
let meta = PacketMeta::from_packet(
packet.info.timestamp_ns,
packet.info.captured_len,
&packet.data,
);
if has_filter {
let ok = (filter.is_empty() || filter.matches(&meta))
&& bpf_filter.as_ref().is_none_or(|b| b.eval(&meta));
if !ok {
continue;
}
}
let record = build_record(&packet, &meta, unidirectional);
if tx.send(record).is_err() {
break;
}
}
Ok(())
});
for record in rx {
for state in &mut sinks {
state.sink.write(&record)?;
}
}
producer.join().map_err(|_| ExportError::ThreadPanic)??;
let outputs = sinks
.into_iter()
.map(|mut state| {
let packets_written = state.sink.close()?;
Ok(ExportReport {
packets_written,
output_path: state.path,
})
})
.collect::<Result<Vec<_>, ExportError>>()?;
Ok(MultiExportReport { outputs })
}
pub fn export_file(input: &Path, opts: &ExportOptions) -> Result<ExportReport, ExportError> {
let format = opts
.format
.or_else(|| ExportFormat::from_extension(&opts.output))
.ok_or_else(|| {
ExportError::UnknownFormat(
opts.output
.extension()
.and_then(|e| e.to_str())
.unwrap_or("<none>")
.to_owned(),
)
})?;
let multi_opts = MultiExportOptions {
targets: vec![OutputTarget {
path: opts.output.clone(),
format: Some(format),
compress_payload: opts.compress_payload,
}],
filter: opts.filter.clone(),
bpf_filter: opts.bpf_filter.clone(),
unidirectional: opts.unidirectional,
};
let mut report = export_multi(input, &multi_opts)?;
Ok(report.outputs.remove(0))
}
fn build_sink(target: &OutputTarget) -> Result<Box<dyn PacketSink>, ExportError> {
let format = target
.format
.or_else(|| ExportFormat::from_extension(&target.path))
.ok_or_else(|| {
ExportError::UnknownFormat(
target
.path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("<none>")
.to_owned(),
)
})?;
let sink: Box<dyn PacketSink> = match format {
ExportFormat::Json => Box::new(json::JsonSink::create(
&target.path,
target.compress_payload,
)?),
ExportFormat::Parquet => Box::new(parquet::ParquetSink::create(
&target.path,
target.compress_payload,
)?),
ExportFormat::Avro => Box::new(avro::AvroSink::create(
&target.path,
target.compress_payload,
)?),
};
Ok(sink)
}
fn build_record(
packet: &pcap::PacketData,
meta: &PacketMeta,
unidirectional: bool,
) -> PacketRecord {
let (src_ip, dst_ip, src_port, dst_port, protocol, flow_id) =
if let Some(ref key) = meta.flow_key {
(
Some(key.src_ip),
Some(key.dst_ip),
Some(key.src_port),
Some(key.dst_port),
Some(key.protocol),
Some(key.flow_id(unidirectional)),
)
} else {
(None, None, None, None, None, None)
};
let tcp_flags = if meta.flow_key.as_ref().is_some_and(|k| k.protocol == 6) {
Some(meta.tcp_flags)
} else {
None
};
PacketRecord {
timestamp_ns: meta.timestamp_ns,
src_ip,
dst_ip,
src_port,
dst_port,
protocol,
flow_id,
caplen: packet.info.captured_len,
origlen: packet.info.original_len,
tcp_flags,
payload: packet.data.clone(),
}
}