1use std::path::Path;
11use std::sync::LazyLock;
12
13use apache_avro::{Codec, Schema, Writer, ZstandardSettings};
14use serde::Serialize;
15
16use crate::error::ExportError;
17
18use super::{PacketRecord, PacketSink};
19
20pub const AVRO_SCHEMA: &str = r#"
24{
25 "type": "record",
26 "name": "Packet",
27 "namespace": "pcap_toolkit",
28 "doc": "A single captured network packet with parsed layer fields.",
29 "fields": [
30 {"name": "timestamp_ns", "type": "long", "doc": "Capture timestamp, nanoseconds since Unix epoch."},
31 {"name": "src_ip", "type": ["null", "string"], "default": null, "doc": "Source IP address (IPv4 or IPv6 string)."},
32 {"name": "dst_ip", "type": ["null", "string"], "default": null, "doc": "Destination IP address."},
33 {"name": "src_port", "type": ["null", "int"], "default": null, "doc": "Source port (TCP/UDP only)."},
34 {"name": "dst_port", "type": ["null", "int"], "default": null, "doc": "Destination port (TCP/UDP only)."},
35 {"name": "protocol", "type": ["null", "int"], "default": null, "doc": "IP protocol number (6=TCP, 17=UDP, …)."},
36 {"name": "flow_id", "type": ["null", "long"], "default": null, "doc": "Deterministic 64-bit flow identifier."},
37 {"name": "caplen", "type": "int", "doc": "Captured bytes (wire length may differ)."},
38 {"name": "origlen", "type": "int", "doc": "Original wire length."},
39 {"name": "tcp_flags", "type": ["null", "int"], "default": null, "doc": "TCP control flags bitmask."},
40 {"name": "payload", "type": ["null", "bytes"], "default": null, "doc": "Raw captured payload bytes."}
41 ]
42}
43"#;
44
45#[derive(Serialize)]
52struct AvroPacket {
53 timestamp_ns: i64,
54 src_ip: Option<String>,
55 dst_ip: Option<String>,
56 src_port: Option<i32>,
57 dst_port: Option<i32>,
58 protocol: Option<i32>,
59 flow_id: Option<i64>,
60 caplen: i32,
61 origlen: i32,
62 tcp_flags: Option<i32>,
63 payload: Option<serde_bytes::ByteBuf>,
64}
65
66static PARSED_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
69 Schema::parse_str(AVRO_SCHEMA).expect("built-in Avro schema must parse without error")
70});
71
72pub struct AvroSink {
80 writer: Writer<'static, std::fs::File>,
81 count: u64,
82}
83
84impl AvroSink {
85 pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
89 let schema_path = path.with_extension("avsc");
90 std::fs::write(&schema_path, AVRO_SCHEMA.trim())?;
91
92 let codec = if compress_payload {
93 Codec::Zstandard(ZstandardSettings::new(3))
94 } else {
95 Codec::Null
96 };
97
98 let file = std::fs::File::create(path)?;
99 let writer = Writer::with_codec(&PARSED_SCHEMA, file, codec);
100 Ok(Self { writer, count: 0 })
101 }
102}
103
104impl PacketSink for AvroSink {
105 fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
106 let avro_rec = AvroPacket {
107 timestamp_ns: record.timestamp_ns as i64,
108 src_ip: record.src_ip.map(|ip| ip.to_string()),
109 dst_ip: record.dst_ip.map(|ip| ip.to_string()),
110 src_port: record.src_port.map(|p| p as i32),
111 dst_port: record.dst_port.map(|p| p as i32),
112 protocol: record.protocol.map(|p| p as i32),
113 flow_id: record.flow_id.map(|id| id as i64),
114 caplen: record.caplen as i32,
115 origlen: record.origlen as i32,
116 tcp_flags: record.tcp_flags.map(|f| f as i32),
117 payload: if record.payload.is_empty() {
118 None
119 } else {
120 Some(serde_bytes::ByteBuf::from(record.payload.clone()))
121 },
122 };
123 self.writer
124 .append_ser(avro_rec)
125 .map_err(|e| ExportError::Avro(e.to_string()))?;
126 self.count += 1;
127 Ok(())
128 }
129
130 fn close(&mut self) -> Result<u64, ExportError> {
131 self.writer
132 .flush()
133 .map_err(|e| ExportError::Avro(e.to_string()))?;
134 Ok(self.count)
135 }
136}
137
138pub fn schema_json() -> Result<String, ExportError> {
140 let schema = Schema::parse_str(AVRO_SCHEMA).map_err(|e| ExportError::Avro(e.to_string()))?;
141 serde_json::to_string_pretty(&schema).map_err(|e| ExportError::Avro(e.to_string()))
142}