Skip to main content

pcap_toolkit/export/
avro.rs

1//! Apache Avro writer for packet export.
2//!
3//! Each packet becomes one Avro record. The Avro schema is written to a
4//! companion `.avsc` file alongside the data file so the dataset is
5//! self-describing.
6//!
7//! When `compress_payload` is `true` the Avro file uses the Zstd codec,
8//! which compresses all data including payload bytes.
9
10use std::path::Path;
11use std::sync::LazyLock;
12
13use apache_avro::{Codec, Schema, Writer, ZstandardSettings};
14use serde::Serialize;
15
16use crate::error::ExportError;
17
18use super::{PacketRecord, PacketSink};
19
20// ── Avro schema ───────────────────────────────────────────────────────────────
21
22/// Avro schema definition — kept in sync with [`AvroPacket`].
23pub const AVRO_SCHEMA: &str = r#"
24{
25  "type": "record",
26  "name": "Packet",
27  "namespace": "pcap_toolkit",
28  "doc": "A single captured network packet with parsed layer fields.",
29  "fields": [
30    {"name": "timestamp_ns", "type": "long",               "doc": "Capture timestamp, nanoseconds since Unix epoch."},
31    {"name": "src_ip",       "type": ["null", "string"],   "default": null, "doc": "Source IP address (IPv4 or IPv6 string)."},
32    {"name": "dst_ip",       "type": ["null", "string"],   "default": null, "doc": "Destination IP address."},
33    {"name": "src_port",     "type": ["null", "int"],      "default": null, "doc": "Source port (TCP/UDP only)."},
34    {"name": "dst_port",     "type": ["null", "int"],      "default": null, "doc": "Destination port (TCP/UDP only)."},
35    {"name": "protocol",     "type": ["null", "int"],      "default": null, "doc": "IP protocol number (6=TCP, 17=UDP, …)."},
36    {"name": "flow_id",      "type": ["null", "long"],     "default": null, "doc": "Deterministic 64-bit flow identifier."},
37    {"name": "caplen",       "type": "int",                                 "doc": "Captured bytes (wire length may differ)."},
38    {"name": "origlen",      "type": "int",                                 "doc": "Original wire length."},
39    {"name": "tcp_flags",    "type": ["null", "int"],      "default": null, "doc": "TCP control flags bitmask."},
40    {"name": "payload",      "type": ["null", "bytes"],    "default": null, "doc": "Raw captured payload bytes."}
41  ]
42}
43"#;
44
45// ── Serialisable record ───────────────────────────────────────────────────────
46
47/// Serde-serialisable representation of one packet for `apache-avro`.
48///
49/// Field types match the Avro schema above.  `apache-avro` maps `Option<T>`
50/// to `["null", T]` unions automatically.
51#[derive(Serialize)]
52struct AvroPacket {
53    timestamp_ns: i64,
54    src_ip: Option<String>,
55    dst_ip: Option<String>,
56    src_port: Option<i32>,
57    dst_port: Option<i32>,
58    protocol: Option<i32>,
59    flow_id: Option<i64>,
60    caplen: i32,
61    origlen: i32,
62    tcp_flags: Option<i32>,
63    payload: Option<serde_bytes::ByteBuf>,
64}
65
66// ── Parsed schema (static, parsed once) ──────────────────────────────────────
67
68static PARSED_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
69    Schema::parse_str(AVRO_SCHEMA).expect("built-in Avro schema must parse without error")
70});
71
72// ── Streaming sink ────────────────────────────────────────────────────────────
73
74/// Streaming Avro writer that implements [`PacketSink`].
75///
76/// Each [`PacketSink::write`] call appends one record to the Avro block
77/// buffer managed by `apache-avro`. The companion `.avsc` schema file is
78/// written during [`AvroSink::create`].
79pub struct AvroSink {
80    writer: Writer<'static, std::fs::File>,
81    count: u64,
82}
83
84impl AvroSink {
85    /// Create a new `AvroSink` writing to `path`.
86    ///
87    /// A companion `<path>.avsc` schema file is written immediately.
88    pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
89        let schema_path = path.with_extension("avsc");
90        std::fs::write(&schema_path, AVRO_SCHEMA.trim())?;
91
92        let codec = if compress_payload {
93            Codec::Zstandard(ZstandardSettings::new(3))
94        } else {
95            Codec::Null
96        };
97
98        let file = std::fs::File::create(path)?;
99        let writer = Writer::with_codec(&PARSED_SCHEMA, file, codec);
100        Ok(Self { writer, count: 0 })
101    }
102}
103
104impl PacketSink for AvroSink {
105    fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
106        let avro_rec = AvroPacket {
107            timestamp_ns: record.timestamp_ns as i64,
108            src_ip: record.src_ip.map(|ip| ip.to_string()),
109            dst_ip: record.dst_ip.map(|ip| ip.to_string()),
110            src_port: record.src_port.map(|p| p as i32),
111            dst_port: record.dst_port.map(|p| p as i32),
112            protocol: record.protocol.map(|p| p as i32),
113            flow_id: record.flow_id.map(|id| id as i64),
114            caplen: record.caplen as i32,
115            origlen: record.origlen as i32,
116            tcp_flags: record.tcp_flags.map(|f| f as i32),
117            payload: if record.payload.is_empty() {
118                None
119            } else {
120                Some(serde_bytes::ByteBuf::from(record.payload.clone()))
121            },
122        };
123        self.writer
124            .append_ser(avro_rec)
125            .map_err(|e| ExportError::Avro(e.to_string()))?;
126        self.count += 1;
127        Ok(())
128    }
129
130    fn close(&mut self) -> Result<u64, ExportError> {
131        self.writer
132            .flush()
133            .map_err(|e| ExportError::Avro(e.to_string()))?;
134        Ok(self.count)
135    }
136}
137
138/// Return a pretty-printed version of the Avro schema (for diagnostics).
139pub fn schema_json() -> Result<String, ExportError> {
140    let schema = Schema::parse_str(AVRO_SCHEMA).map_err(|e| ExportError::Avro(e.to_string()))?;
141    serde_json::to_string_pretty(&schema).map_err(|e| ExportError::Avro(e.to_string()))
142}