Skip to main content

pcap_toolkit/export/
json.rs

1//! JSONL (newline-delimited JSON) writer for packet export.
2//!
3//! Each packet is serialised as a single JSON object on its own line.
4//! The output file uses the `.jsonl` extension by convention.
5//!
6//! When payload compression is enabled the payload bytes are Zstd-compressed
7//! before Base64 encoding, reducing storage for large payloads.
8
9use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use base64::Engine as _;
13use serde::Serialize;
14
15use crate::error::ExportError;
16
17use super::{PacketRecord, PacketSink};
18
19// ── Serialisable record ───────────────────────────────────────────────────────
20
21#[derive(Serialize)]
22struct JsonPacket<'a> {
23    timestamp_ns: u64,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    src_ip: Option<&'a str>,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    dst_ip: Option<&'a str>,
28    #[serde(skip_serializing_if = "Option::is_none")]
29    src_port: Option<u16>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    dst_port: Option<u16>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    protocol: Option<u8>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    flow_id: Option<String>,
36    caplen: u32,
37    origlen: u32,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    tcp_flags: Option<u8>,
40    /// Raw payload encoded as Base64 (or Base64 of Zstd-compressed bytes).
41    #[serde(skip_serializing_if = "Option::is_none")]
42    payload: Option<String>,
43    /// Present when payload compression is active, to distinguish the encoding.
44    #[serde(skip_serializing_if = "Option::is_none")]
45    payload_encoding: Option<&'static str>,
46}
47
48// ── Streaming sink ────────────────────────────────────────────────────────────
49
50/// Streaming JSONL writer that implements [`PacketSink`].
51///
52/// Each [`PacketSink::write`] call serialises one packet record immediately
53/// without buffering all records in memory first.
54pub struct JsonSink {
55    out: BufWriter<std::fs::File>,
56    compress_payload: bool,
57    count: u64,
58    b64: base64::engine::general_purpose::GeneralPurpose,
59}
60
61impl JsonSink {
62    /// Create a new `JsonSink` writing to `path`.
63    ///
64    /// Uses a 64 KiB write buffer to reduce syscall overhead.
65    pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
66        let file = std::fs::File::create(path)?;
67        Ok(Self {
68            out: BufWriter::with_capacity(64 * 1024, file),
69            compress_payload,
70            count: 0,
71            b64: base64::engine::general_purpose::STANDARD,
72        })
73    }
74}
75
76impl PacketSink for JsonSink {
77    fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
78        let src_ip_str = record.src_ip.map(|ip| ip.to_string());
79        let dst_ip_str = record.dst_ip.map(|ip| ip.to_string());
80
81        let payload_encoded = if record.payload.is_empty() {
82            None
83        } else if self.compress_payload {
84            let compressed =
85                zstd::encode_all(record.payload.as_slice(), 3).map_err(ExportError::Io)?;
86            Some(self.b64.encode(&compressed))
87        } else {
88            Some(self.b64.encode(&record.payload))
89        };
90
91        let encoding = if self.compress_payload && payload_encoded.is_some() {
92            Some("zstd+base64")
93        } else if payload_encoded.is_some() {
94            Some("base64")
95        } else {
96            None
97        };
98
99        let json_rec = JsonPacket {
100            timestamp_ns: record.timestamp_ns,
101            src_ip: src_ip_str.as_deref(),
102            dst_ip: dst_ip_str.as_deref(),
103            src_port: record.src_port,
104            dst_port: record.dst_port,
105            protocol: record.protocol,
106            flow_id: record.flow_id.map(|id| format!("{id:016x}")),
107            caplen: record.caplen,
108            origlen: record.origlen,
109            tcp_flags: record.tcp_flags,
110            payload: payload_encoded,
111            payload_encoding: encoding,
112        };
113
114        serde_json::to_writer(&mut self.out, &json_rec)
115            .map_err(|e| ExportError::Json(e.to_string()))?;
116        self.out.write_all(b"\n")?;
117        self.count += 1;
118        Ok(())
119    }
120
121    fn close(&mut self) -> Result<u64, ExportError> {
122        self.out.flush()?;
123        Ok(self.count)
124    }
125}