Skip to main content

pcap_toolkit/export/
mod.rs

1//! Phase 5: Structured data export (JSON, Parquet, Avro).
2//!
3//! Reads a PCAP file, applies optional filters, and writes each packet as a
4//! typed record to the requested format.
5//!
6//! ## Pipeline (7.3 channel-based)
7//!
8//! 1. A **producer thread** streams packets from the PCAP file, applies
9//!    structured + BPF filters, and sends [`PacketRecord`]s through a bounded
10//!    [`std::sync::mpsc::sync_channel`].
11//! 2. The **main thread** receives records and writes them to a
12//!    format-specific [`PacketSink`] (JSON, Parquet, or Avro).
13//!
14//! Decoupling I/O (producer) from serialisation (consumer) lets both stages
15//! run concurrently, improving throughput for CPU-bound formats.
16
17pub mod avro;
18pub mod json;
19pub mod parquet;
20
21use std::net::IpAddr;
22use std::path::Path;
23
24use crate::bpf::BpfExpr;
25use crate::error::ExportError;
26use crate::filter::{Filter, PacketMeta};
27use crate::pcap;
28
29// ── PacketSink trait ──────────────────────────────────────────────────────────
30
31/// Common interface for streaming packet writers.
32///
33/// Each format (JSON, Parquet, Avro) provides its own implementation.
34/// Records are pushed one at a time via [`write`]; [`close`] flushes any
35/// internal buffer and returns the total packet count.
36pub trait PacketSink {
37    /// Write a single packet record to the output.
38    fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError>;
39
40    /// Flush all buffered data and finalise the output file.
41    ///
42    /// Returns the total number of records written.
43    fn close(&mut self) -> Result<u64, ExportError>;
44}
45
46/// Channel capacity for the producer → consumer bounded channel.
47const CHANNEL_CAPACITY: usize = 4096;
48
49// ── Public types ─────────────────────────────────────────────────────────────
50
51/// Output format for packet export.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ExportFormat {
54    /// Newline-delimited JSON (`.jsonl`).
55    Json,
56    /// Apache Parquet columnar format (`.parquet`).
57    Parquet,
58    /// Apache Avro record format (`.avro`).
59    Avro,
60}
61
62impl ExportFormat {
63    /// Infer the format from a file extension, returning `None` if unknown.
64    pub fn from_extension(path: &Path) -> Option<Self> {
65        match path.extension()?.to_str()? {
66            "jsonl" | "json" | "ndjson" => Some(Self::Json),
67            "parquet" => Some(Self::Parquet),
68            "avro" => Some(Self::Avro),
69            _ => None,
70        }
71    }
72
73    /// Parse from a user-supplied string (case-insensitive).
74    pub fn parse(s: &str) -> Option<Self> {
75        match s.to_ascii_lowercase().as_str() {
76            "json" | "jsonl" | "ndjson" => Some(Self::Json),
77            "parquet" => Some(Self::Parquet),
78            "avro" => Some(Self::Avro),
79            _ => None,
80        }
81    }
82}
83
84/// A single output target for fan-out export.
85#[derive(Debug)]
86pub struct OutputTarget {
87    /// Destination file path.
88    pub path: std::path::PathBuf,
89    /// Output format. When `None`, inferred from the file extension.
90    pub format: Option<ExportFormat>,
91    /// Apply Zstd compression to payload bytes.
92    pub compress_payload: bool,
93}
94
95/// Options for [`export_multi`] (fan-out to multiple simultaneous outputs).
96#[derive(Debug)]
97pub struct MultiExportOptions {
98    /// One or more output targets written in a single streaming pass.
99    pub targets: Vec<OutputTarget>,
100    /// Structured packet filter applied to each packet.
101    pub filter: Filter,
102    /// BPF expression filter AND-ed with `filter`.
103    pub bpf_filter: Option<BpfExpr>,
104    /// Compute flow IDs unidirectionally (default: bidirectional).
105    pub unidirectional: bool,
106}
107
108/// Summary returned by [`export_multi`] on success.
109#[derive(Debug)]
110pub struct MultiExportReport {
111    /// Per-target reports, in the same order as [`MultiExportOptions::targets`].
112    pub outputs: Vec<ExportReport>,
113}
114
115/// Options for [`export_file`] (single-output convenience wrapper).
116#[derive(Debug, Default)]
117pub struct ExportOptions {
118    /// Destination file path (extension used for format auto-detection).
119    pub output: std::path::PathBuf,
120    /// Output format. When `None`, inferred from the output file extension.
121    pub format: Option<ExportFormat>,
122    /// Apply Zstd compression to payload bytes (JSON: per-payload field;
123    /// Parquet: column-level compression; Avro: file-level codec).
124    pub compress_payload: bool,
125    /// Structured packet filter applied to each packet.
126    pub filter: Filter,
127    /// BPF expression filter AND-ed with `filter`.
128    pub bpf_filter: Option<BpfExpr>,
129    /// Compute flow IDs unidirectionally (default: bidirectional).
130    pub unidirectional: bool,
131}
132
133/// Summary returned by [`export_file`] on success.
134#[derive(Debug)]
135pub struct ExportReport {
136    /// Total packets written to the output file.
137    pub packets_written: u64,
138    /// Path of the output file.
139    pub output_path: std::path::PathBuf,
140}
141
142/// A single packet record ready for serialisation.
143#[derive(Debug, Clone)]
144pub struct PacketRecord {
145    /// Timestamp in nanoseconds since the Unix epoch.
146    pub timestamp_ns: u64,
147    /// Source IP address (IPv4 or IPv6), if available.
148    pub src_ip: Option<IpAddr>,
149    /// Destination IP address, if available.
150    pub dst_ip: Option<IpAddr>,
151    /// Source port (TCP/UDP only).
152    pub src_port: Option<u16>,
153    /// Destination port (TCP/UDP only).
154    pub dst_port: Option<u16>,
155    /// IP protocol number (6=TCP, 17=UDP, …).
156    pub protocol: Option<u8>,
157    /// Deterministic 64-bit flow ID.
158    pub flow_id: Option<u64>,
159    /// Captured packet length (bytes stored in the file).
160    pub caplen: u32,
161    /// Original wire length.
162    pub origlen: u32,
163    /// TCP control-flags bitmask; `None` for non-TCP traffic.
164    pub tcp_flags: Option<u8>,
165    /// Raw captured bytes (Ethernet frame payload up to snap length).
166    pub payload: Vec<u8>,
167}
168
169// ── Public entry points ──────────────────────────────────────────────────────
170
171/// Export packets from `input` to one or more simultaneous outputs in a single
172/// streaming pass.
173///
174/// A bounded channel decouples the PCAP producer thread from the consumer
175/// thread that fans each record out to all sinks.  Memory usage is
176/// O(channel capacity + per-sink buffer) regardless of capture size.
177///
178/// # Errors
179/// Returns [`ExportError`] on I/O failure, format detection failure, or
180/// serialisation error.  If any sink fails mid-stream the producer is signalled
181/// to stop and the first error is returned.
182pub fn export_multi(
183    input: &Path,
184    opts: &MultiExportOptions,
185) -> Result<MultiExportReport, ExportError> {
186    if opts.targets.is_empty() {
187        return Ok(MultiExportReport {
188            outputs: Vec::new(),
189        });
190    }
191
192    // Build and validate all sinks up front so we fail fast on bad paths/formats.
193    struct SinkState {
194        sink: Box<dyn PacketSink>,
195        path: std::path::PathBuf,
196    }
197    let mut sinks: Vec<SinkState> = opts
198        .targets
199        .iter()
200        .map(|t| {
201            Ok(SinkState {
202                sink: build_sink(t)?,
203                path: t.path.clone(),
204            })
205        })
206        .collect::<Result<Vec<_>, ExportError>>()?;
207
208    let (tx, rx) = std::sync::mpsc::sync_channel::<PacketRecord>(CHANNEL_CAPACITY);
209
210    // Producer: reads PCAP, applies filters, sends PacketRecords.
211    let input_owned = input.to_owned();
212    let filter = opts.filter.clone();
213    let bpf_filter = opts.bpf_filter.clone();
214    let unidirectional = opts.unidirectional;
215
216    let producer = std::thread::spawn(move || -> Result<(), ExportError> {
217        let has_filter = !filter.is_empty() || bpf_filter.is_some();
218        for item in
219            pcap::open_with_payload(&input_owned).map_err(|e| ExportError::Parse(e.to_string()))?
220        {
221            let packet = item.map_err(|e| ExportError::Parse(e.to_string()))?;
222            let meta = PacketMeta::from_packet(
223                packet.info.timestamp_ns,
224                packet.info.captured_len,
225                &packet.data,
226            );
227            if has_filter {
228                let ok = (filter.is_empty() || filter.matches(&meta))
229                    && bpf_filter.as_ref().is_none_or(|b| b.eval(&meta));
230                if !ok {
231                    continue;
232                }
233            }
234            let record = build_record(&packet, &meta, unidirectional);
235            if tx.send(record).is_err() {
236                // Consumer dropped the receiver due to a write error; stop producing.
237                break;
238            }
239        }
240        Ok(())
241    });
242
243    // Consumer: fan each record out to all sinks.
244    for record in rx {
245        for state in &mut sinks {
246            state.sink.write(&record)?;
247        }
248    }
249
250    producer.join().map_err(|_| ExportError::ThreadPanic)??;
251
252    // Close all sinks and collect per-output reports.
253    let outputs = sinks
254        .into_iter()
255        .map(|mut state| {
256            let packets_written = state.sink.close()?;
257            Ok(ExportReport {
258                packets_written,
259                output_path: state.path,
260            })
261        })
262        .collect::<Result<Vec<_>, ExportError>>()?;
263
264    Ok(MultiExportReport { outputs })
265}
266
267/// Export all packets from `input` to a single output (convenience wrapper
268/// around [`export_multi`] for backward compatibility).
269///
270/// # Errors
271/// Returns [`ExportError`] on I/O failure, format detection failure, or
272/// serialisation error.
273pub fn export_file(input: &Path, opts: &ExportOptions) -> Result<ExportReport, ExportError> {
274    // Resolve format up front so we can return a descriptive error before
275    // touching the file system.
276    let format = opts
277        .format
278        .or_else(|| ExportFormat::from_extension(&opts.output))
279        .ok_or_else(|| {
280            ExportError::UnknownFormat(
281                opts.output
282                    .extension()
283                    .and_then(|e| e.to_str())
284                    .unwrap_or("<none>")
285                    .to_owned(),
286            )
287        })?;
288
289    let multi_opts = MultiExportOptions {
290        targets: vec![OutputTarget {
291            path: opts.output.clone(),
292            format: Some(format),
293            compress_payload: opts.compress_payload,
294        }],
295        filter: opts.filter.clone(),
296        bpf_filter: opts.bpf_filter.clone(),
297        unidirectional: opts.unidirectional,
298    };
299    let mut report = export_multi(input, &multi_opts)?;
300    Ok(report.outputs.remove(0))
301}
302
303// ── Sink construction ────────────────────────────────────────────────────────
304
305fn build_sink(target: &OutputTarget) -> Result<Box<dyn PacketSink>, ExportError> {
306    let format = target
307        .format
308        .or_else(|| ExportFormat::from_extension(&target.path))
309        .ok_or_else(|| {
310            ExportError::UnknownFormat(
311                target
312                    .path
313                    .extension()
314                    .and_then(|e| e.to_str())
315                    .unwrap_or("<none>")
316                    .to_owned(),
317            )
318        })?;
319    let sink: Box<dyn PacketSink> = match format {
320        ExportFormat::Json => Box::new(json::JsonSink::create(
321            &target.path,
322            target.compress_payload,
323        )?),
324        ExportFormat::Parquet => Box::new(parquet::ParquetSink::create(
325            &target.path,
326            target.compress_payload,
327        )?),
328        ExportFormat::Avro => Box::new(avro::AvroSink::create(
329            &target.path,
330            target.compress_payload,
331        )?),
332    };
333    Ok(sink)
334}
335
336// ── Record construction ───────────────────────────────────────────────────────
337
338fn build_record(
339    packet: &pcap::PacketData,
340    meta: &PacketMeta,
341    unidirectional: bool,
342) -> PacketRecord {
343    let (src_ip, dst_ip, src_port, dst_port, protocol, flow_id) =
344        if let Some(ref key) = meta.flow_key {
345            (
346                Some(key.src_ip),
347                Some(key.dst_ip),
348                Some(key.src_port),
349                Some(key.dst_port),
350                Some(key.protocol),
351                Some(key.flow_id(unidirectional)),
352            )
353        } else {
354            (None, None, None, None, None, None)
355        };
356
357    // Report tcp_flags only for TCP packets.
358    let tcp_flags = if meta.flow_key.as_ref().is_some_and(|k| k.protocol == 6) {
359        Some(meta.tcp_flags)
360    } else {
361        None
362    };
363
364    PacketRecord {
365        timestamp_ns: meta.timestamp_ns,
366        src_ip,
367        dst_ip,
368        src_port,
369        dst_port,
370        protocol,
371        flow_id,
372        caplen: packet.info.captured_len,
373        origlen: packet.info.original_len,
374        tcp_flags,
375        payload: packet.data.clone(),
376    }
377}