use std::path::Path;
use std::sync::LazyLock;
use apache_avro::{Codec, Schema, Writer, ZstandardSettings};
use serde::Serialize;
use crate::error::ExportError;
use super::{PacketRecord, PacketSink};
pub const AVRO_SCHEMA: &str = r#"
{
"type": "record",
"name": "Packet",
"namespace": "pcap_toolkit",
"doc": "A single captured network packet with parsed layer fields.",
"fields": [
{"name": "timestamp_ns", "type": "long", "doc": "Capture timestamp, nanoseconds since Unix epoch."},
{"name": "src_ip", "type": ["null", "string"], "default": null, "doc": "Source IP address (IPv4 or IPv6 string)."},
{"name": "dst_ip", "type": ["null", "string"], "default": null, "doc": "Destination IP address."},
{"name": "src_port", "type": ["null", "int"], "default": null, "doc": "Source port (TCP/UDP only)."},
{"name": "dst_port", "type": ["null", "int"], "default": null, "doc": "Destination port (TCP/UDP only)."},
{"name": "protocol", "type": ["null", "int"], "default": null, "doc": "IP protocol number (6=TCP, 17=UDP, …)."},
{"name": "flow_id", "type": ["null", "long"], "default": null, "doc": "Deterministic 64-bit flow identifier."},
{"name": "caplen", "type": "int", "doc": "Captured bytes (wire length may differ)."},
{"name": "origlen", "type": "int", "doc": "Original wire length."},
{"name": "tcp_flags", "type": ["null", "int"], "default": null, "doc": "TCP control flags bitmask."},
{"name": "payload", "type": ["null", "bytes"], "default": null, "doc": "Raw captured payload bytes."}
]
}
"#;
#[derive(Serialize)]
struct AvroPacket {
timestamp_ns: i64,
src_ip: Option<String>,
dst_ip: Option<String>,
src_port: Option<i32>,
dst_port: Option<i32>,
protocol: Option<i32>,
flow_id: Option<i64>,
caplen: i32,
origlen: i32,
tcp_flags: Option<i32>,
payload: Option<serde_bytes::ByteBuf>,
}
static PARSED_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
Schema::parse_str(AVRO_SCHEMA).expect("built-in Avro schema must parse without error")
});
pub struct AvroSink {
writer: Writer<'static, std::fs::File>,
count: u64,
}
impl AvroSink {
pub fn create(path: &Path, compress_payload: bool) -> Result<Self, ExportError> {
let schema_path = path.with_extension("avsc");
std::fs::write(&schema_path, AVRO_SCHEMA.trim())?;
let codec = if compress_payload {
Codec::Zstandard(ZstandardSettings::new(3))
} else {
Codec::Null
};
let file = std::fs::File::create(path)?;
let writer = Writer::with_codec(&PARSED_SCHEMA, file, codec);
Ok(Self { writer, count: 0 })
}
}
impl PacketSink for AvroSink {
fn write(&mut self, record: &PacketRecord) -> Result<(), ExportError> {
let avro_rec = AvroPacket {
timestamp_ns: record.timestamp_ns as i64,
src_ip: record.src_ip.map(|ip| ip.to_string()),
dst_ip: record.dst_ip.map(|ip| ip.to_string()),
src_port: record.src_port.map(|p| p as i32),
dst_port: record.dst_port.map(|p| p as i32),
protocol: record.protocol.map(|p| p as i32),
flow_id: record.flow_id.map(|id| id as i64),
caplen: record.caplen as i32,
origlen: record.origlen as i32,
tcp_flags: record.tcp_flags.map(|f| f as i32),
payload: if record.payload.is_empty() {
None
} else {
Some(serde_bytes::ByteBuf::from(record.payload.clone()))
},
};
self.writer
.append_ser(avro_rec)
.map_err(|e| ExportError::Avro(e.to_string()))?;
self.count += 1;
Ok(())
}
fn close(&mut self) -> Result<u64, ExportError> {
self.writer
.flush()
.map_err(|e| ExportError::Avro(e.to_string()))?;
Ok(self.count)
}
}
pub fn schema_json() -> Result<String, ExportError> {
let schema = Schema::parse_str(AVRO_SCHEMA).map_err(|e| ExportError::Avro(e.to_string()))?;
serde_json::to_string_pretty(&schema).map_err(|e| ExportError::Avro(e.to_string()))
}