use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::{ArrayRef, Float64Array, Int32Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use serde_json::Value;
const SCHEMA_JSON: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schemas/netflow/v1/schema.json"
));
pub const REQUIRED_FIELDS: &[&str] = &[
"timestamp",
"src_ip",
"dest_ip",
"src_port",
"dest_port",
"fwd_bytes",
"flow_dur",
];
pub const PROTOCOL_TCP: u8 = 6;
pub const PROTOCOL_UDP: u8 = 17;
pub const PROTOCOL_ICMP: u8 = 1;
pub fn canonical_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("timestamp", DataType::Int64, false), Field::new("src_ip", DataType::Utf8, false),
Field::new("dest_ip", DataType::Utf8, false),
Field::new("src_port", DataType::Int32, false),
Field::new("dest_port", DataType::Int32, false),
Field::new("fwd_bytes", DataType::Int64, false),
Field::new("bwd_bytes", DataType::Int64, false),
Field::new("fwd_pkts", DataType::Int64, true),
Field::new("bwd_pkts", DataType::Int64, true),
Field::new("flow_dur", DataType::Float64, false), Field::new("protocol", DataType::Int32, true),
]))
}
pub struct CanonicalFlow {
pub timestamp: i64,
pub src_ip: String,
pub dest_ip: String,
pub src_port: i32,
pub dest_port: i32,
pub fwd_bytes: i64,
pub bwd_bytes: i64,
pub fwd_pkts: Option<i64>,
pub bwd_pkts: Option<i64>,
pub flow_dur: f64,
pub protocol: Option<i32>,
}
pub fn flows_to_batch(flows: &[CanonicalFlow]) -> std::result::Result<RecordBatch, ArrowError> {
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from_iter_values(
flows.iter().map(|f| f.timestamp),
)),
Arc::new(StringArray::from_iter_values(
flows.iter().map(|f| f.src_ip.as_str()),
)),
Arc::new(StringArray::from_iter_values(
flows.iter().map(|f| f.dest_ip.as_str()),
)),
Arc::new(Int32Array::from_iter_values(
flows.iter().map(|f| f.src_port),
)),
Arc::new(Int32Array::from_iter_values(
flows.iter().map(|f| f.dest_port),
)),
Arc::new(Int64Array::from_iter_values(
flows.iter().map(|f| f.fwd_bytes),
)),
Arc::new(Int64Array::from_iter_values(
flows.iter().map(|f| f.bwd_bytes),
)),
Arc::new(Int64Array::from(
flows.iter().map(|f| f.fwd_pkts).collect::<Vec<_>>(),
)),
Arc::new(Int64Array::from(
flows.iter().map(|f| f.bwd_pkts).collect::<Vec<_>>(),
)),
Arc::new(Float64Array::from_iter_values(
flows.iter().map(|f| f.flow_dur),
)),
Arc::new(Int32Array::from(
flows.iter().map(|f| f.protocol).collect::<Vec<_>>(),
)),
];
RecordBatch::try_new(canonical_schema(), columns)
}
pub struct SchemaSpec {
pub aliases: Vec<(String, Vec<String>)>,
pub duration_divisors: HashMap<String, f64>,
pub passthrough: Vec<String>,
}
pub fn load_schema_spec() -> SchemaSpec {
let root: Value = serde_json::from_str(SCHEMA_JSON).expect("embedded schema JSON is valid");
let fields = &root["canonical_fields"];
let mut aliases = Vec::new();
for section in ["required", "optional"] {
if let Some(map) = fields[section].as_object() {
for (canonical, spec) in map {
let names: Vec<String> = spec["aliases"]
.as_array()
.map(|a| {
a.iter()
.filter_map(|v| v.as_str())
.map(normalize_name)
.collect()
})
.unwrap_or_default();
aliases.push((canonical.clone(), names));
}
}
}
let mut duration_divisors = HashMap::new();
if let Some(units) = fields["required"]["flow_dur"]["unit_detection"].as_object() {
for (unit, names) in units {
let divisor = match unit.as_str() {
"seconds" => 1.0,
"milliseconds" => 1e3,
"microseconds" => 1e6,
"nanoseconds" => 1e9,
_ => continue,
};
if let Some(arr) = names.as_array() {
for name in arr.iter().filter_map(|v| v.as_str()) {
duration_divisors.insert(normalize_name(name), divisor);
}
}
}
}
let mut passthrough: Vec<String> = fields["label_fields"]
.as_array()
.map(|a| {
a.iter()
.filter_map(|v| v.as_str())
.map(String::from)
.collect()
})
.unwrap_or_default();
passthrough.push("label".to_string());
SchemaSpec {
aliases,
duration_divisors,
passthrough,
}
}
pub fn normalize_name(name: &str) -> String {
name.trim().to_lowercase().replace([' ', '-'], "_")
}
pub fn protocol_number(name: &str) -> Option<i32> {
match name.trim().to_lowercase().as_str() {
"tcp" => Some(6),
"udp" => Some(17),
"icmp" => Some(1),
"icmpv6" => Some(58),
"gre" => Some(47),
"esp" => Some(50),
other => other.parse::<i32>().ok(),
}
}
impl SchemaSpec {
pub fn resolve_columns(&self, source_names: &[String]) -> HashMap<String, String> {
let normalized: HashMap<String, &String> = source_names
.iter()
.map(|n| (normalize_name(n), n))
.collect();
let mut resolved = HashMap::new();
for (canonical, aliases) in &self.aliases {
for alias in aliases {
if let Some(source) = normalized.get(alias) {
resolved.insert(canonical.clone(), (*source).clone());
break;
}
}
}
resolved
}
}