1pub mod avro;
18pub mod json;
19pub mod parquet;
20
21use std::net::IpAddr;
22use std::path::Path;
23
24use crate::bpf::BpfExpr;
25use crate::error::ExportError;
26use crate::filter::{Filter, PacketMeta};
27use crate::pcap;
28
29pub trait PacketSink {
37 fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError>;
39
40 fn close(&mut self) -> Result<u64, ExportError>;
44}
45
46const CHANNEL_CAPACITY: usize = 4096;
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ExportFormat {
54 Json,
56 Parquet,
58 Avro,
60}
61
62impl ExportFormat {
63 pub fn from_extension(path: &Path) -> Option<Self> {
65 match path.extension()?.to_str()? {
66 "jsonl" | "json" | "ndjson" => Some(Self::Json),
67 "parquet" => Some(Self::Parquet),
68 "avro" => Some(Self::Avro),
69 _ => None,
70 }
71 }
72
73 pub fn parse(s: &str) -> Option<Self> {
75 match s.to_ascii_lowercase().as_str() {
76 "json" | "jsonl" | "ndjson" => Some(Self::Json),
77 "parquet" => Some(Self::Parquet),
78 "avro" => Some(Self::Avro),
79 _ => None,
80 }
81 }
82}
83
84#[derive(Debug)]
86pub struct OutputTarget {
87 pub path: std::path::PathBuf,
89 pub format: Option<ExportFormat>,
91 pub compress_payload: bool,
93}
94
95#[derive(Debug)]
97pub struct MultiExportOptions {
98 pub targets: Vec<OutputTarget>,
100 pub filter: Filter,
102 pub bpf_filter: Option<BpfExpr>,
104 pub unidirectional: bool,
106}
107
108#[derive(Debug)]
110pub struct MultiExportReport {
111 pub outputs: Vec<ExportReport>,
113}
114
115#[derive(Debug, Default)]
117pub struct ExportOptions {
118 pub output: std::path::PathBuf,
120 pub format: Option<ExportFormat>,
122 pub compress_payload: bool,
125 pub filter: Filter,
127 pub bpf_filter: Option<BpfExpr>,
129 pub unidirectional: bool,
131}
132
133#[derive(Debug)]
135pub struct ExportReport {
136 pub packets_written: u64,
138 pub output_path: std::path::PathBuf,
140}
141
142#[derive(Debug, Clone)]
144pub struct PacketRecord {
145 pub timestamp_ns: u64,
147 pub src_ip: Option<IpAddr>,
149 pub dst_ip: Option<IpAddr>,
151 pub src_port: Option<u16>,
153 pub dst_port: Option<u16>,
155 pub protocol: Option<u8>,
157 pub flow_id: Option<u64>,
159 pub caplen: u32,
161 pub origlen: u32,
163 pub tcp_flags: Option<u8>,
165 pub payload: Vec<u8>,
167}
168
169pub fn export_multi(
183 input: &Path,
184 opts: &MultiExportOptions,
185) -> Result<MultiExportReport, ExportError> {
186 if opts.targets.is_empty() {
187 return Ok(MultiExportReport {
188 outputs: Vec::new(),
189 });
190 }
191
192 struct SinkState {
194 sink: Box<dyn PacketSink>,
195 path: std::path::PathBuf,
196 }
197 let mut sinks: Vec<SinkState> = opts
198 .targets
199 .iter()
200 .map(|t| {
201 Ok(SinkState {
202 sink: build_sink(t)?,
203 path: t.path.clone(),
204 })
205 })
206 .collect::<Result<Vec<_>, ExportError>>()?;
207
208 let (tx, rx) = std::sync::mpsc::sync_channel::<PacketRecord>(CHANNEL_CAPACITY);
209
210 let input_owned = input.to_owned();
212 let filter = opts.filter.clone();
213 let bpf_filter = opts.bpf_filter.clone();
214 let unidirectional = opts.unidirectional;
215
216 let producer = std::thread::spawn(move || -> Result<(), ExportError> {
217 let has_filter = !filter.is_empty() || bpf_filter.is_some();
218 for item in
219 pcap::open_with_payload(&input_owned).map_err(|e| ExportError::Parse(e.to_string()))?
220 {
221 let packet = item.map_err(|e| ExportError::Parse(e.to_string()))?;
222 let meta = PacketMeta::from_packet(
223 packet.info.timestamp_ns,
224 packet.info.captured_len,
225 &packet.data,
226 );
227 if has_filter {
228 let ok = (filter.is_empty() || filter.matches(&meta))
229 && bpf_filter.as_ref().is_none_or(|b| b.eval(&meta));
230 if !ok {
231 continue;
232 }
233 }
234 let record = build_record(&packet, &meta, unidirectional);
235 if tx.send(record).is_err() {
236 break;
238 }
239 }
240 Ok(())
241 });
242
243 for record in rx {
245 for state in &mut sinks {
246 state.sink.write(&record)?;
247 }
248 }
249
250 producer.join().map_err(|_| ExportError::ThreadPanic)??;
251
252 let outputs = sinks
254 .into_iter()
255 .map(|mut state| {
256 let packets_written = state.sink.close()?;
257 Ok(ExportReport {
258 packets_written,
259 output_path: state.path,
260 })
261 })
262 .collect::<Result<Vec<_>, ExportError>>()?;
263
264 Ok(MultiExportReport { outputs })
265}
266
267pub fn export_file(input: &Path, opts: &ExportOptions) -> Result<ExportReport, ExportError> {
274 let format = opts
277 .format
278 .or_else(|| ExportFormat::from_extension(&opts.output))
279 .ok_or_else(|| {
280 ExportError::UnknownFormat(
281 opts.output
282 .extension()
283 .and_then(|e| e.to_str())
284 .unwrap_or("<none>")
285 .to_owned(),
286 )
287 })?;
288
289 let multi_opts = MultiExportOptions {
290 targets: vec![OutputTarget {
291 path: opts.output.clone(),
292 format: Some(format),
293 compress_payload: opts.compress_payload,
294 }],
295 filter: opts.filter.clone(),
296 bpf_filter: opts.bpf_filter.clone(),
297 unidirectional: opts.unidirectional,
298 };
299 let mut report = export_multi(input, &multi_opts)?;
300 Ok(report.outputs.remove(0))
301}
302
303fn build_sink(target: &OutputTarget) -> Result<Box<dyn PacketSink>, ExportError> {
306 let format = target
307 .format
308 .or_else(|| ExportFormat::from_extension(&target.path))
309 .ok_or_else(|| {
310 ExportError::UnknownFormat(
311 target
312 .path
313 .extension()
314 .and_then(|e| e.to_str())
315 .unwrap_or("<none>")
316 .to_owned(),
317 )
318 })?;
319 let sink: Box<dyn PacketSink> = match format {
320 ExportFormat::Json => Box::new(json::JsonSink::create(
321 &target.path,
322 target.compress_payload,
323 )?),
324 ExportFormat::Parquet => Box::new(parquet::ParquetSink::create(
325 &target.path,
326 target.compress_payload,
327 )?),
328 ExportFormat::Avro => Box::new(avro::AvroSink::create(
329 &target.path,
330 target.compress_payload,
331 )?),
332 };
333 Ok(sink)
334}
335
336fn build_record(
339 packet: &pcap::PacketData,
340 meta: &PacketMeta,
341 unidirectional: bool,
342) -> PacketRecord {
343 let (src_ip, dst_ip, src_port, dst_port, protocol, flow_id) =
344 if let Some(ref key) = meta.flow_key {
345 let port = matches!(key.protocol, 6 | 17);
348 (
349 Some(key.src_ip),
350 Some(key.dst_ip),
351 port.then_some(key.src_port),
352 port.then_some(key.dst_port),
353 Some(key.protocol),
354 Some(key.flow_id(unidirectional)),
355 )
356 } else {
357 (None, None, None, None, None, None)
358 };
359
360 let tcp_flags = if meta.flow_key.as_ref().is_some_and(|k| k.protocol == 6) {
362 Some(meta.tcp_flags)
363 } else {
364 None
365 };
366
367 PacketRecord {
368 timestamp_ns: meta.timestamp_ns,
369 src_ip,
370 dst_ip,
371 src_port,
372 dst_port,
373 protocol,
374 flow_id,
375 caplen: packet.info.captured_len,
376 origlen: packet.info.original_len,
377 tcp_flags,
378 payload: packet.data.clone(),
379 }
380}
381
382#[cfg(test)]
385mod tests {
386 use std::net::{IpAddr, Ipv4Addr};
387
388 use crate::filter::{PacketMeta, TcpFlagsFilter};
389 use crate::flow::FlowKey;
390 use crate::pcap::{PacketData, PacketInfo};
391
392 use super::*;
393
394 fn make_packet(caplen: u32) -> PacketData {
395 PacketData {
396 info: PacketInfo {
397 timestamp_ns: 0,
398 captured_len: caplen,
399 original_len: caplen,
400 flow_key: None,
401 },
402 data: vec![0u8; caplen as usize],
403 }
404 }
405
406 fn make_meta(src: IpAddr, dst: IpAddr, sport: u16, dport: u16, proto: u8) -> PacketMeta {
407 PacketMeta {
408 timestamp_ns: 0,
409 captured_len: 60,
410 flow_key: Some(FlowKey::new(src, dst, sport, dport, proto)),
411 tcp_flags: 0,
412 }
413 }
414
415 fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
416 IpAddr::V4(Ipv4Addr::new(a, b, c, d))
417 }
418
419 #[test]
420 fn test_build_record_tcp_has_ports() {
421 let pkt = make_packet(60);
422 let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 1234, 443, 6);
423 let rec = build_record(&pkt, &meta, false);
424 assert_eq!(rec.src_port, Some(1234));
425 assert_eq!(rec.dst_port, Some(443));
426 }
427
428 #[test]
429 fn test_build_record_udp_has_ports() {
430 let pkt = make_packet(60);
431 let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 5000, 53, 17);
432 let rec = build_record(&pkt, &meta, false);
433 assert_eq!(rec.src_port, Some(5000));
434 assert_eq!(rec.dst_port, Some(53));
435 }
436
437 #[test]
438 fn test_build_record_icmp_ports_are_none() {
439 let pkt = make_packet(60);
440 let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 0, 0, 1);
443 let rec = build_record(&pkt, &meta, false);
444 assert_eq!(rec.src_port, None, "ICMP src_port must be None");
445 assert_eq!(rec.dst_port, None, "ICMP dst_port must be None");
446 }
447
448 #[test]
449 fn test_build_record_icmpv6_ports_are_none() {
450 let pkt = make_packet(60);
451 let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 0, 0, 58);
452 let rec = build_record(&pkt, &meta, false);
453 assert_eq!(rec.src_port, None);
454 assert_eq!(rec.dst_port, None);
455 }
456
457 #[test]
458 fn test_build_record_non_ip_all_flow_fields_none() {
459 let pkt = make_packet(60);
460 let meta = PacketMeta {
461 timestamp_ns: 1_000,
462 captured_len: 60,
463 flow_key: None,
464 tcp_flags: 0,
465 };
466 let rec = build_record(&pkt, &meta, false);
467 assert_eq!(rec.src_ip, None);
468 assert_eq!(rec.dst_ip, None);
469 assert_eq!(rec.src_port, None);
470 assert_eq!(rec.dst_port, None);
471 assert_eq!(rec.protocol, None);
472 assert_eq!(rec.flow_id, None);
473 assert_eq!(rec.tcp_flags, None);
474 }
475
476 #[test]
477 fn test_build_record_tcp_flags_only_for_tcp() {
478 let pkt = make_packet(60);
479
480 let mut meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 100, 200, 17);
482 meta.tcp_flags = 0x02; let rec = build_record(&pkt, &meta, false);
484 assert_eq!(rec.tcp_flags, None);
485
486 let mut meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 100, 80, 6);
488 meta.tcp_flags = TcpFlagsFilter::parse("SYN+ACK").unwrap().mask;
489 let rec = build_record(&pkt, &meta, false);
490 assert_eq!(rec.tcp_flags, Some(0x12));
491 }
492}