pcap_toolkit/export/mod.rs
1//! Phase 5: Structured data export (JSON, Parquet, Avro).
2//!
3//! Reads a PCAP file, applies optional filters, and writes each packet as a
4//! typed record to the requested format.
5//!
6//! ## Pipeline (7.3 channel-based)
7//!
8//! 1. A **producer thread** streams packets from the PCAP file, applies
9//! structured + BPF filters, and sends [`PacketRecord`]s through a bounded
10//! [`std::sync::mpsc::sync_channel`].
11//! 2. The **main thread** receives records and writes them to a
12//! format-specific [`PacketSink`] (JSON, Parquet, or Avro).
13//!
14//! Decoupling I/O (producer) from serialisation (consumer) lets both stages
15//! run concurrently, improving throughput for CPU-bound formats.
16
17pub mod avro;
18pub mod json;
19pub mod parquet;
20
21use std::net::IpAddr;
22use std::path::Path;
23
24use crate::bpf::BpfExpr;
25use crate::error::ExportError;
26use crate::filter::{Filter, PacketMeta};
27use crate::pcap;
28
29// ── PacketSink trait ──────────────────────────────────────────────────────────
30
31/// Common interface for streaming packet writers.
32///
33/// Each format (JSON, Parquet, Avro) provides its own implementation.
34/// Records are pushed one at a time via [`write`]; [`close`] flushes any
35/// internal buffer and returns the total packet count.
36pub trait PacketSink {
37 /// Write a single packet record to the output.
38 fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError>;
39
40 /// Flush all buffered data and finalise the output file.
41 ///
42 /// Returns the total number of records written.
43 fn close(&mut self) -> Result<u64, ExportError>;
44}
45
46/// Channel capacity for the producer → consumer bounded channel.
47const CHANNEL_CAPACITY: usize = 4096;
48
49// ── Public types ─────────────────────────────────────────────────────────────
50
51/// Output format for packet export.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ExportFormat {
54 /// Newline-delimited JSON (`.jsonl`).
55 Json,
56 /// Apache Parquet columnar format (`.parquet`).
57 Parquet,
58 /// Apache Avro record format (`.avro`).
59 Avro,
60}
61
62impl ExportFormat {
63 /// Infer the format from a file extension, returning `None` if unknown.
64 pub fn from_extension(path: &Path) -> Option<Self> {
65 match path.extension()?.to_str()? {
66 "jsonl" | "json" | "ndjson" => Some(Self::Json),
67 "parquet" => Some(Self::Parquet),
68 "avro" => Some(Self::Avro),
69 _ => None,
70 }
71 }
72
73 /// Parse from a user-supplied string (case-insensitive).
74 pub fn parse(s: &str) -> Option<Self> {
75 match s.to_ascii_lowercase().as_str() {
76 "json" | "jsonl" | "ndjson" => Some(Self::Json),
77 "parquet" => Some(Self::Parquet),
78 "avro" => Some(Self::Avro),
79 _ => None,
80 }
81 }
82}
83
84/// A single output target for fan-out export.
85#[derive(Debug)]
86pub struct OutputTarget {
87 /// Destination file path.
88 pub path: std::path::PathBuf,
89 /// Output format. When `None`, inferred from the file extension.
90 pub format: Option<ExportFormat>,
91 /// Apply Zstd compression to payload bytes.
92 pub compress_payload: bool,
93}
94
95/// Options for [`export_multi`] (fan-out to multiple simultaneous outputs).
96#[derive(Debug)]
97pub struct MultiExportOptions {
98 /// One or more output targets written in a single streaming pass.
99 pub targets: Vec<OutputTarget>,
100 /// Structured packet filter applied to each packet.
101 pub filter: Filter,
102 /// BPF expression filter AND-ed with `filter`.
103 pub bpf_filter: Option<BpfExpr>,
104 /// Compute flow IDs unidirectionally (default: bidirectional).
105 pub unidirectional: bool,
106}
107
108/// Summary returned by [`export_multi`] on success.
109#[derive(Debug)]
110pub struct MultiExportReport {
111 /// Per-target reports, in the same order as [`MultiExportOptions::targets`].
112 pub outputs: Vec<ExportReport>,
113}
114
115/// Options for [`export_file`] (single-output convenience wrapper).
116#[derive(Debug, Default)]
117pub struct ExportOptions {
118 /// Destination file path (extension used for format auto-detection).
119 pub output: std::path::PathBuf,
120 /// Output format. When `None`, inferred from the output file extension.
121 pub format: Option<ExportFormat>,
122 /// Apply Zstd compression to payload bytes (JSON: per-payload field;
123 /// Parquet: column-level compression; Avro: file-level codec).
124 pub compress_payload: bool,
125 /// Structured packet filter applied to each packet.
126 pub filter: Filter,
127 /// BPF expression filter AND-ed with `filter`.
128 pub bpf_filter: Option<BpfExpr>,
129 /// Compute flow IDs unidirectionally (default: bidirectional).
130 pub unidirectional: bool,
131}
132
133/// Summary returned by [`export_file`] on success.
134#[derive(Debug)]
135pub struct ExportReport {
136 /// Total packets written to the output file.
137 pub packets_written: u64,
138 /// Path of the output file.
139 pub output_path: std::path::PathBuf,
140}
141
142/// A single packet record ready for serialisation.
143#[derive(Debug, Clone)]
144pub struct PacketRecord {
145 /// Timestamp in nanoseconds since the Unix epoch.
146 pub timestamp_ns: u64,
147 /// Source IP address (IPv4 or IPv6), if available.
148 pub src_ip: Option<IpAddr>,
149 /// Destination IP address, if available.
150 pub dst_ip: Option<IpAddr>,
151 /// Source port (TCP/UDP only).
152 pub src_port: Option<u16>,
153 /// Destination port (TCP/UDP only).
154 pub dst_port: Option<u16>,
155 /// IP protocol number (6=TCP, 17=UDP, …).
156 pub protocol: Option<u8>,
157 /// Deterministic 64-bit flow ID.
158 pub flow_id: Option<u64>,
159 /// Captured packet length (bytes stored in the file).
160 pub caplen: u32,
161 /// Original wire length.
162 pub origlen: u32,
163 /// TCP control-flags bitmask; `None` for non-TCP traffic.
164 pub tcp_flags: Option<u8>,
165 /// Raw captured bytes (Ethernet frame payload up to snap length).
166 pub payload: Vec<u8>,
167}
168
169// ── Public entry points ──────────────────────────────────────────────────────
170
171/// Export packets from `input` to one or more simultaneous outputs in a single
172/// streaming pass.
173///
174/// A bounded channel decouples the PCAP producer thread from the consumer
175/// thread that fans each record out to all sinks. Memory usage is
176/// O(channel capacity + per-sink buffer) regardless of capture size.
177///
178/// # Errors
179/// Returns [`ExportError`] on I/O failure, format detection failure, or
180/// serialisation error. If any sink fails mid-stream the producer is signalled
181/// to stop and the first error is returned.
182pub fn export_multi(
183 input: &Path,
184 opts: &MultiExportOptions,
185) -> Result<MultiExportReport, ExportError> {
186 if opts.targets.is_empty() {
187 return Ok(MultiExportReport {
188 outputs: Vec::new(),
189 });
190 }
191
192 // Build and validate all sinks up front so we fail fast on bad paths/formats.
193 struct SinkState {
194 sink: Box<dyn PacketSink>,
195 path: std::path::PathBuf,
196 }
197 let mut sinks: Vec<SinkState> = opts
198 .targets
199 .iter()
200 .map(|t| {
201 Ok(SinkState {
202 sink: build_sink(t)?,
203 path: t.path.clone(),
204 })
205 })
206 .collect::<Result<Vec<_>, ExportError>>()?;
207
208 let (tx, rx) = std::sync::mpsc::sync_channel::<PacketRecord>(CHANNEL_CAPACITY);
209
210 // Producer: reads PCAP, applies filters, sends PacketRecords.
211 let input_owned = input.to_owned();
212 let filter = opts.filter.clone();
213 let bpf_filter = opts.bpf_filter.clone();
214 let unidirectional = opts.unidirectional;
215
216 let producer = std::thread::spawn(move || -> Result<(), ExportError> {
217 let has_filter = !filter.is_empty() || bpf_filter.is_some();
218 for item in
219 pcap::open_with_payload(&input_owned).map_err(|e| ExportError::Parse(e.to_string()))?
220 {
221 let packet = item.map_err(|e| ExportError::Parse(e.to_string()))?;
222 let meta = PacketMeta::from_packet(
223 packet.info.timestamp_ns,
224 packet.info.captured_len,
225 &packet.data,
226 );
227 if has_filter {
228 let ok = (filter.is_empty() || filter.matches(&meta))
229 && bpf_filter.as_ref().is_none_or(|b| b.eval(&meta));
230 if !ok {
231 continue;
232 }
233 }
234 let record = build_record(&packet, &meta, unidirectional);
235 if tx.send(record).is_err() {
236 // Consumer dropped the receiver due to a write error; stop producing.
237 break;
238 }
239 }
240 Ok(())
241 });
242
243 // Consumer: fan each record out to all sinks.
244 for record in rx {
245 for state in &mut sinks {
246 state.sink.write(&record)?;
247 }
248 }
249
250 producer.join().map_err(|_| ExportError::ThreadPanic)??;
251
252 // Close all sinks and collect per-output reports.
253 let outputs = sinks
254 .into_iter()
255 .map(|mut state| {
256 let packets_written = state.sink.close()?;
257 Ok(ExportReport {
258 packets_written,
259 output_path: state.path,
260 })
261 })
262 .collect::<Result<Vec<_>, ExportError>>()?;
263
264 Ok(MultiExportReport { outputs })
265}
266
267/// Export all packets from `input` to a single output (convenience wrapper
268/// around [`export_multi`] for backward compatibility).
269///
270/// # Errors
271/// Returns [`ExportError`] on I/O failure, format detection failure, or
272/// serialisation error.
273pub fn export_file(input: &Path, opts: &ExportOptions) -> Result<ExportReport, ExportError> {
274 // Resolve format up front so we can return a descriptive error before
275 // touching the file system.
276 let format = opts
277 .format
278 .or_else(|| ExportFormat::from_extension(&opts.output))
279 .ok_or_else(|| {
280 ExportError::UnknownFormat(
281 opts.output
282 .extension()
283 .and_then(|e| e.to_str())
284 .unwrap_or("<none>")
285 .to_owned(),
286 )
287 })?;
288
289 let multi_opts = MultiExportOptions {
290 targets: vec![OutputTarget {
291 path: opts.output.clone(),
292 format: Some(format),
293 compress_payload: opts.compress_payload,
294 }],
295 filter: opts.filter.clone(),
296 bpf_filter: opts.bpf_filter.clone(),
297 unidirectional: opts.unidirectional,
298 };
299 let mut report = export_multi(input, &multi_opts)?;
300 Ok(report.outputs.remove(0))
301}
302
303// ── Sink construction ────────────────────────────────────────────────────────
304
305fn build_sink(target: &OutputTarget) -> Result<Box<dyn PacketSink>, ExportError> {
306 let format = target
307 .format
308 .or_else(|| ExportFormat::from_extension(&target.path))
309 .ok_or_else(|| {
310 ExportError::UnknownFormat(
311 target
312 .path
313 .extension()
314 .and_then(|e| e.to_str())
315 .unwrap_or("<none>")
316 .to_owned(),
317 )
318 })?;
319 let sink: Box<dyn PacketSink> = match format {
320 ExportFormat::Json => Box::new(json::JsonSink::create(
321 &target.path,
322 target.compress_payload,
323 )?),
324 ExportFormat::Parquet => Box::new(parquet::ParquetSink::create(
325 &target.path,
326 target.compress_payload,
327 )?),
328 ExportFormat::Avro => Box::new(avro::AvroSink::create(
329 &target.path,
330 target.compress_payload,
331 )?),
332 };
333 Ok(sink)
334}
335
336// ── Record construction ───────────────────────────────────────────────────────
337
338fn build_record(
339 packet: &pcap::PacketData,
340 meta: &PacketMeta,
341 unidirectional: bool,
342) -> PacketRecord {
343 let (src_ip, dst_ip, src_port, dst_port, protocol, flow_id) =
344 if let Some(ref key) = meta.flow_key {
345 (
346 Some(key.src_ip),
347 Some(key.dst_ip),
348 Some(key.src_port),
349 Some(key.dst_port),
350 Some(key.protocol),
351 Some(key.flow_id(unidirectional)),
352 )
353 } else {
354 (None, None, None, None, None, None)
355 };
356
357 // Report tcp_flags only for TCP packets.
358 let tcp_flags = if meta.flow_key.as_ref().is_some_and(|k| k.protocol == 6) {
359 Some(meta.tcp_flags)
360 } else {
361 None
362 };
363
364 PacketRecord {
365 timestamp_ns: meta.timestamp_ns,
366 src_ip,
367 dst_ip,
368 src_port,
369 dst_port,
370 protocol,
371 flow_id,
372 caplen: packet.info.captured_len,
373 origlen: packet.info.original_len,
374 tcp_flags,
375 payload: packet.data.clone(),
376 }
377}