pcap_toolkit/export/
json.rs1use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use base64::Engine as _;
13use serde::Serialize;
14
15use crate::error::ExportError;
16
17use super::{PacketRecord, PacketSink};
18
19#[derive(Serialize)]
22struct JsonPacket<'a> {
23 timestamp_ns: u64,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 src_ip: Option<&'a str>,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 dst_ip: Option<&'a str>,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 src_port: Option<u16>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 dst_port: Option<u16>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 protocol: Option<u8>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 flow_id: Option<String>,
36 caplen: u32,
37 origlen: u32,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 tcp_flags: Option<u8>,
40 #[serde(skip_serializing_if = "Option::is_none")]
42 payload: Option<String>,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 payload_encoding: Option<&'static str>,
46}
47
48pub struct JsonSink {
55 out: BufWriter<std::fs::File>,
56 compress_payload: bool,
57 count: u64,
58 b64: base64::engine::general_purpose::GeneralPurpose,
59}
60
61impl JsonSink {
62 pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
66 let file = std::fs::File::create(path)?;
67 Ok(Self {
68 out: BufWriter::with_capacity(64 * 1024, file),
69 compress_payload,
70 count: 0,
71 b64: base64::engine::general_purpose::STANDARD,
72 })
73 }
74}
75
76impl PacketSink for JsonSink {
77 fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
78 let src_ip_str = record.src_ip.map(|ip| ip.to_string());
79 let dst_ip_str = record.dst_ip.map(|ip| ip.to_string());
80
81 let payload_encoded = if record.payload.is_empty() {
82 None
83 } else if self.compress_payload {
84 let compressed =
85 zstd::encode_all(record.payload.as_slice(), 3).map_err(ExportError::Io)?;
86 Some(self.b64.encode(&compressed))
87 } else {
88 Some(self.b64.encode(&record.payload))
89 };
90
91 let encoding = if self.compress_payload && payload_encoded.is_some() {
92 Some("zstd+base64")
93 } else if payload_encoded.is_some() {
94 Some("base64")
95 } else {
96 None
97 };
98
99 let json_rec = JsonPacket {
100 timestamp_ns: record.timestamp_ns,
101 src_ip: src_ip_str.as_deref(),
102 dst_ip: dst_ip_str.as_deref(),
103 src_port: record.src_port,
104 dst_port: record.dst_port,
105 protocol: record.protocol,
106 flow_id: record.flow_id.map(|id| format!("{id:016x}")),
107 caplen: record.caplen,
108 origlen: record.origlen,
109 tcp_flags: record.tcp_flags,
110 payload: payload_encoded,
111 payload_encoding: encoding,
112 };
113
114 serde_json::to_writer(&mut self.out, &json_rec)
115 .map_err(|e| ExportError::Json(e.to_string()))?;
116 self.out.write_all(b"\n")?;
117 self.count += 1;
118 Ok(())
119 }
120
121 fn close(&mut self) -> Result<u64, ExportError> {
122 self.out.flush()?;
123 Ok(self.count)
124 }
125}