Skip to main content

pcap_toolkit/export/
mod.rs

1//! Phase 5: Structured data export (JSON, Parquet, Avro).
2//!
3//! Reads a PCAP file, applies optional filters, and writes each packet as a
4//! typed record to the requested format.
5//!
6//! ## Pipeline (7.3 channel-based)
7//!
8//! 1. A **producer thread** streams packets from the PCAP file, applies
9//!    structured + BPF filters, and sends [`PacketRecord`]s through a bounded
10//!    [`std::sync::mpsc::sync_channel`].
11//! 2. The **main thread** receives records and writes them to a
12//!    format-specific [`PacketSink`] (JSON, Parquet, or Avro).
13//!
14//! Decoupling I/O (producer) from serialisation (consumer) lets both stages
15//! run concurrently, improving throughput for CPU-bound formats.
16
17pub mod avro;
18pub mod json;
19pub mod parquet;
20
21use std::net::IpAddr;
22use std::path::Path;
23
24use crate::bpf::BpfExpr;
25use crate::error::ExportError;
26use crate::filter::{Filter, PacketMeta};
27use crate::pcap;
28
29// ── PacketSink trait ──────────────────────────────────────────────────────────
30
31/// Common interface for streaming packet writers.
32///
33/// Each format (JSON, Parquet, Avro) provides its own implementation.
34/// Records are pushed one at a time via [`write`]; [`close`] flushes any
35/// internal buffer and returns the total packet count.
36pub trait PacketSink {
37    /// Write a single packet record to the output.
38    fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError>;
39
40    /// Flush all buffered data and finalise the output file.
41    ///
42    /// Returns the total number of records written.
43    fn close(&mut self) -> Result<u64, ExportError>;
44}
45
46/// Channel capacity for the producer → consumer bounded channel.
47const CHANNEL_CAPACITY: usize = 4096;
48
49// ── Public types ─────────────────────────────────────────────────────────────
50
51/// Output format for packet export.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum ExportFormat {
54    /// Newline-delimited JSON (`.jsonl`).
55    Json,
56    /// Apache Parquet columnar format (`.parquet`).
57    Parquet,
58    /// Apache Avro record format (`.avro`).
59    Avro,
60}
61
62impl ExportFormat {
63    /// Infer the format from a file extension, returning `None` if unknown.
64    pub fn from_extension(path: &Path) -> Option<Self> {
65        match path.extension()?.to_str()? {
66            "jsonl" | "json" | "ndjson" => Some(Self::Json),
67            "parquet" => Some(Self::Parquet),
68            "avro" => Some(Self::Avro),
69            _ => None,
70        }
71    }
72
73    /// Parse from a user-supplied string (case-insensitive).
74    pub fn parse(s: &str) -> Option<Self> {
75        match s.to_ascii_lowercase().as_str() {
76            "json" | "jsonl" | "ndjson" => Some(Self::Json),
77            "parquet" => Some(Self::Parquet),
78            "avro" => Some(Self::Avro),
79            _ => None,
80        }
81    }
82}
83
84/// A single output target for fan-out export.
85#[derive(Debug)]
86pub struct OutputTarget {
87    /// Destination file path.
88    pub path: std::path::PathBuf,
89    /// Output format. When `None`, inferred from the file extension.
90    pub format: Option<ExportFormat>,
91    /// Apply Zstd compression to payload bytes.
92    pub compress_payload: bool,
93}
94
95/// Options for [`export_multi`] (fan-out to multiple simultaneous outputs).
96#[derive(Debug)]
97pub struct MultiExportOptions {
98    /// One or more output targets written in a single streaming pass.
99    pub targets: Vec<OutputTarget>,
100    /// Structured packet filter applied to each packet.
101    pub filter: Filter,
102    /// BPF expression filter AND-ed with `filter`.
103    pub bpf_filter: Option<BpfExpr>,
104    /// Compute flow IDs unidirectionally (default: bidirectional).
105    pub unidirectional: bool,
106}
107
108/// Summary returned by [`export_multi`] on success.
109#[derive(Debug)]
110pub struct MultiExportReport {
111    /// Per-target reports, in the same order as [`MultiExportOptions::targets`].
112    pub outputs: Vec<ExportReport>,
113}
114
115/// Options for [`export_file`] (single-output convenience wrapper).
116#[derive(Debug, Default)]
117pub struct ExportOptions {
118    /// Destination file path (extension used for format auto-detection).
119    pub output: std::path::PathBuf,
120    /// Output format. When `None`, inferred from the output file extension.
121    pub format: Option<ExportFormat>,
122    /// Apply Zstd compression to payload bytes (JSON: per-payload field;
123    /// Parquet: column-level compression; Avro: file-level codec).
124    pub compress_payload: bool,
125    /// Structured packet filter applied to each packet.
126    pub filter: Filter,
127    /// BPF expression filter AND-ed with `filter`.
128    pub bpf_filter: Option<BpfExpr>,
129    /// Compute flow IDs unidirectionally (default: bidirectional).
130    pub unidirectional: bool,
131}
132
133/// Summary returned by [`export_file`] on success.
134#[derive(Debug)]
135pub struct ExportReport {
136    /// Total packets written to the output file.
137    pub packets_written: u64,
138    /// Path of the output file.
139    pub output_path: std::path::PathBuf,
140}
141
142/// A single packet record ready for serialisation.
143#[derive(Debug, Clone)]
144pub struct PacketRecord {
145    /// Timestamp in nanoseconds since the Unix epoch.
146    pub timestamp_ns: u64,
147    /// Source IP address (IPv4 or IPv6), if available.
148    pub src_ip: Option<IpAddr>,
149    /// Destination IP address, if available.
150    pub dst_ip: Option<IpAddr>,
151    /// Source port (TCP/UDP only).
152    pub src_port: Option<u16>,
153    /// Destination port (TCP/UDP only).
154    pub dst_port: Option<u16>,
155    /// IP protocol number (6=TCP, 17=UDP, …).
156    pub protocol: Option<u8>,
157    /// Deterministic 64-bit flow ID.
158    pub flow_id: Option<u64>,
159    /// Captured packet length (bytes stored in the file).
160    pub caplen: u32,
161    /// Original wire length.
162    pub origlen: u32,
163    /// TCP control-flags bitmask; `None` for non-TCP traffic.
164    pub tcp_flags: Option<u8>,
165    /// Raw captured bytes (Ethernet frame payload up to snap length).
166    pub payload: Vec<u8>,
167}
168
169// ── Public entry points ──────────────────────────────────────────────────────
170
171/// Export packets from `input` to one or more simultaneous outputs in a single
172/// streaming pass.
173///
174/// A bounded channel decouples the PCAP producer thread from the consumer
175/// thread that fans each record out to all sinks.  Memory usage is
176/// O(channel capacity + per-sink buffer) regardless of capture size.
177///
178/// # Errors
179/// Returns [`ExportError`] on I/O failure, format detection failure, or
180/// serialisation error.  If any sink fails mid-stream the producer is signalled
181/// to stop and the first error is returned.
182pub fn export_multi(
183    input: &Path,
184    opts: &MultiExportOptions,
185) -> Result<MultiExportReport, ExportError> {
186    if opts.targets.is_empty() {
187        return Ok(MultiExportReport {
188            outputs: Vec::new(),
189        });
190    }
191
192    // Build and validate all sinks up front so we fail fast on bad paths/formats.
193    struct SinkState {
194        sink: Box<dyn PacketSink>,
195        path: std::path::PathBuf,
196    }
197    let mut sinks: Vec<SinkState> = opts
198        .targets
199        .iter()
200        .map(|t| {
201            Ok(SinkState {
202                sink: build_sink(t)?,
203                path: t.path.clone(),
204            })
205        })
206        .collect::<Result<Vec<_>, ExportError>>()?;
207
208    let (tx, rx) = std::sync::mpsc::sync_channel::<PacketRecord>(CHANNEL_CAPACITY);
209
210    // Producer: reads PCAP, applies filters, sends PacketRecords.
211    let input_owned = input.to_owned();
212    let filter = opts.filter.clone();
213    let bpf_filter = opts.bpf_filter.clone();
214    let unidirectional = opts.unidirectional;
215
216    let producer = std::thread::spawn(move || -> Result<(), ExportError> {
217        let has_filter = !filter.is_empty() || bpf_filter.is_some();
218        for item in
219            pcap::open_with_payload(&input_owned).map_err(|e| ExportError::Parse(e.to_string()))?
220        {
221            let packet = item.map_err(|e| ExportError::Parse(e.to_string()))?;
222            let meta = PacketMeta::from_packet(
223                packet.info.timestamp_ns,
224                packet.info.captured_len,
225                &packet.data,
226            );
227            if has_filter {
228                let ok = (filter.is_empty() || filter.matches(&meta))
229                    && bpf_filter.as_ref().is_none_or(|b| b.eval(&meta));
230                if !ok {
231                    continue;
232                }
233            }
234            let record = build_record(&packet, &meta, unidirectional);
235            if tx.send(record).is_err() {
236                // Consumer dropped the receiver due to a write error; stop producing.
237                break;
238            }
239        }
240        Ok(())
241    });
242
243    // Consumer: fan each record out to all sinks.
244    for record in rx {
245        for state in &mut sinks {
246            state.sink.write(&record)?;
247        }
248    }
249
250    producer.join().map_err(|_| ExportError::ThreadPanic)??;
251
252    // Close all sinks and collect per-output reports.
253    let outputs = sinks
254        .into_iter()
255        .map(|mut state| {
256            let packets_written = state.sink.close()?;
257            Ok(ExportReport {
258                packets_written,
259                output_path: state.path,
260            })
261        })
262        .collect::<Result<Vec<_>, ExportError>>()?;
263
264    Ok(MultiExportReport { outputs })
265}
266
267/// Export all packets from `input` to a single output (convenience wrapper
268/// around [`export_multi`] for backward compatibility).
269///
270/// # Errors
271/// Returns [`ExportError`] on I/O failure, format detection failure, or
272/// serialisation error.
273pub fn export_file(input: &Path, opts: &ExportOptions) -> Result<ExportReport, ExportError> {
274    // Resolve format up front so we can return a descriptive error before
275    // touching the file system.
276    let format = opts
277        .format
278        .or_else(|| ExportFormat::from_extension(&opts.output))
279        .ok_or_else(|| {
280            ExportError::UnknownFormat(
281                opts.output
282                    .extension()
283                    .and_then(|e| e.to_str())
284                    .unwrap_or("<none>")
285                    .to_owned(),
286            )
287        })?;
288
289    let multi_opts = MultiExportOptions {
290        targets: vec![OutputTarget {
291            path: opts.output.clone(),
292            format: Some(format),
293            compress_payload: opts.compress_payload,
294        }],
295        filter: opts.filter.clone(),
296        bpf_filter: opts.bpf_filter.clone(),
297        unidirectional: opts.unidirectional,
298    };
299    let mut report = export_multi(input, &multi_opts)?;
300    Ok(report.outputs.remove(0))
301}
302
303// ── Sink construction ────────────────────────────────────────────────────────
304
305fn build_sink(target: &OutputTarget) -> Result<Box<dyn PacketSink>, ExportError> {
306    let format = target
307        .format
308        .or_else(|| ExportFormat::from_extension(&target.path))
309        .ok_or_else(|| {
310            ExportError::UnknownFormat(
311                target
312                    .path
313                    .extension()
314                    .and_then(|e| e.to_str())
315                    .unwrap_or("<none>")
316                    .to_owned(),
317            )
318        })?;
319    let sink: Box<dyn PacketSink> = match format {
320        ExportFormat::Json => Box::new(json::JsonSink::create(
321            &target.path,
322            target.compress_payload,
323        )?),
324        ExportFormat::Parquet => Box::new(parquet::ParquetSink::create(
325            &target.path,
326            target.compress_payload,
327        )?),
328        ExportFormat::Avro => Box::new(avro::AvroSink::create(
329            &target.path,
330            target.compress_payload,
331        )?),
332    };
333    Ok(sink)
334}
335
336// ── Record construction ───────────────────────────────────────────────────────
337
338fn build_record(
339    packet: &pcap::PacketData,
340    meta: &PacketMeta,
341    unidirectional: bool,
342) -> PacketRecord {
343    let (src_ip, dst_ip, src_port, dst_port, protocol, flow_id) =
344        if let Some(ref key) = meta.flow_key {
345            // Ports are only meaningful for TCP (6) and UDP (17); emit None for
346            // all other protocols (e.g. ICMP) to avoid misleading zero values.
347            let port = matches!(key.protocol, 6 | 17);
348            (
349                Some(key.src_ip),
350                Some(key.dst_ip),
351                port.then_some(key.src_port),
352                port.then_some(key.dst_port),
353                Some(key.protocol),
354                Some(key.flow_id(unidirectional)),
355            )
356        } else {
357            (None, None, None, None, None, None)
358        };
359
360    // Report tcp_flags only for TCP packets.
361    let tcp_flags = if meta.flow_key.as_ref().is_some_and(|k| k.protocol == 6) {
362        Some(meta.tcp_flags)
363    } else {
364        None
365    };
366
367    PacketRecord {
368        timestamp_ns: meta.timestamp_ns,
369        src_ip,
370        dst_ip,
371        src_port,
372        dst_port,
373        protocol,
374        flow_id,
375        caplen: packet.info.captured_len,
376        origlen: packet.info.original_len,
377        tcp_flags,
378        payload: packet.data.clone(),
379    }
380}
381
382// ── Tests ─────────────────────────────────────────────────────────────────────
383
384#[cfg(test)]
385mod tests {
386    use std::net::{IpAddr, Ipv4Addr};
387
388    use crate::filter::{PacketMeta, TcpFlagsFilter};
389    use crate::flow::FlowKey;
390    use crate::pcap::{PacketData, PacketInfo};
391
392    use super::*;
393
394    fn make_packet(caplen: u32) -> PacketData {
395        PacketData {
396            info: PacketInfo {
397                timestamp_ns: 0,
398                captured_len: caplen,
399                original_len: caplen,
400                flow_key: None,
401            },
402            data: vec![0u8; caplen as usize],
403        }
404    }
405
406    fn make_meta(src: IpAddr, dst: IpAddr, sport: u16, dport: u16, proto: u8) -> PacketMeta {
407        PacketMeta {
408            timestamp_ns: 0,
409            captured_len: 60,
410            flow_key: Some(FlowKey::new(src, dst, sport, dport, proto)),
411            tcp_flags: 0,
412        }
413    }
414
415    fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
416        IpAddr::V4(Ipv4Addr::new(a, b, c, d))
417    }
418
419    #[test]
420    fn test_build_record_tcp_has_ports() {
421        let pkt = make_packet(60);
422        let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 1234, 443, 6);
423        let rec = build_record(&pkt, &meta, false);
424        assert_eq!(rec.src_port, Some(1234));
425        assert_eq!(rec.dst_port, Some(443));
426    }
427
428    #[test]
429    fn test_build_record_udp_has_ports() {
430        let pkt = make_packet(60);
431        let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 5000, 53, 17);
432        let rec = build_record(&pkt, &meta, false);
433        assert_eq!(rec.src_port, Some(5000));
434        assert_eq!(rec.dst_port, Some(53));
435    }
436
437    #[test]
438    fn test_build_record_icmp_ports_are_none() {
439        let pkt = make_packet(60);
440        // ICMP (protocol 1): FlowKey stores src_port=0/dst_port=0 by convention,
441        // but the export record must emit None rather than Some(0).
442        let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 0, 0, 1);
443        let rec = build_record(&pkt, &meta, false);
444        assert_eq!(rec.src_port, None, "ICMP src_port must be None");
445        assert_eq!(rec.dst_port, None, "ICMP dst_port must be None");
446    }
447
448    #[test]
449    fn test_build_record_icmpv6_ports_are_none() {
450        let pkt = make_packet(60);
451        let meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 0, 0, 58);
452        let rec = build_record(&pkt, &meta, false);
453        assert_eq!(rec.src_port, None);
454        assert_eq!(rec.dst_port, None);
455    }
456
457    #[test]
458    fn test_build_record_non_ip_all_flow_fields_none() {
459        let pkt = make_packet(60);
460        let meta = PacketMeta {
461            timestamp_ns: 1_000,
462            captured_len: 60,
463            flow_key: None,
464            tcp_flags: 0,
465        };
466        let rec = build_record(&pkt, &meta, false);
467        assert_eq!(rec.src_ip, None);
468        assert_eq!(rec.dst_ip, None);
469        assert_eq!(rec.src_port, None);
470        assert_eq!(rec.dst_port, None);
471        assert_eq!(rec.protocol, None);
472        assert_eq!(rec.flow_id, None);
473        assert_eq!(rec.tcp_flags, None);
474    }
475
476    #[test]
477    fn test_build_record_tcp_flags_only_for_tcp() {
478        let pkt = make_packet(60);
479
480        // UDP: tcp_flags must be None even when the meta field is non-zero.
481        let mut meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 100, 200, 17);
482        meta.tcp_flags = 0x02; // SYN bit set, but protocol is UDP
483        let rec = build_record(&pkt, &meta, false);
484        assert_eq!(rec.tcp_flags, None);
485
486        // TCP: tcp_flags must be Some.
487        let mut meta = make_meta(v4(1, 2, 3, 4), v4(5, 6, 7, 8), 100, 80, 6);
488        meta.tcp_flags = TcpFlagsFilter::parse("SYN+ACK").unwrap().mask;
489        let rec = build_record(&pkt, &meta, false);
490        assert_eq!(rec.tcp_flags, Some(0x12));
491    }
492}