use crate::config::schema::{V9Config, V9FlowSet as ConfigV9FlowSet};
use crate::error::{NetflowError, Result};
use crate::generator::field_serializer::{
get_field_value, serialize_field_value, v9_field_id_to_name,
};
use std::time::{SystemTime, UNIX_EPOCH};
pub fn build_v9_packets(config: V9Config) -> Result<Vec<Vec<u8>>> {
let mut packets = Vec::new();
let (sys_up_time, unix_secs, mut sequence_number, source_id) = get_header_values(&config)?;
let mut templates = Vec::new();
let mut data_flowsets = Vec::new();
for flowset in &config.flowsets {
match flowset {
ConfigV9FlowSet::Template {
template_id,
fields,
} => {
templates.push((*template_id, fields.clone()));
}
ConfigV9FlowSet::Data {
template_id,
records,
} => {
data_flowsets.push((*template_id, records.clone()));
}
}
}
if !templates.is_empty() {
let template_packet = build_template_packet(
sys_up_time,
unix_secs,
sequence_number,
source_id,
&templates,
)?;
packets.push(template_packet);
sequence_number += 1;
}
for (template_id, records) in data_flowsets {
let template_fields = templates
.iter()
.find(|(id, _)| *id == template_id)
.map(|(_, fields)| fields)
.ok_or_else(|| {
NetflowError::Generation(format!(
"Data flowset references undefined template ID: {}",
template_id
))
})?;
let data_packet = build_data_packet(
sys_up_time,
unix_secs,
sequence_number,
source_id,
template_id,
template_fields,
&records,
)?;
packets.push(data_packet);
sequence_number += 1;
}
if packets.is_empty() {
return Err(NetflowError::Generation(
"V9 configuration must contain at least one template or data flowset".to_string(),
));
}
Ok(packets)
}
fn get_header_values(config: &V9Config) -> Result<(u32, u32, u32, u32)> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| NetflowError::Generation(format!("Failed to get system time: {}", e)))?;
let unix_secs = if let Some(ref h) = config.header {
h.unix_secs
.unwrap_or_else(|| u32::try_from(now.as_secs()).unwrap_or(u32::MAX))
} else {
u32::try_from(now.as_secs()).unwrap_or(u32::MAX)
};
let sys_up_time = if let Some(ref h) = config.header {
h.sys_up_time.unwrap_or(360000)
} else {
360000
};
let sequence_number = if let Some(ref h) = config.header {
h.sequence_number.unwrap_or(0)
} else {
0
};
let source_id = if let Some(ref h) = config.header {
h.source_id.unwrap_or(1)
} else {
1
};
Ok((sys_up_time, unix_secs, sequence_number, source_id))
}
fn build_template_packet(
sys_up_time: u32,
unix_secs: u32,
sequence_number: u32,
source_id: u32,
templates: &[(u16, Vec<crate::config::schema::V9TemplateField>)],
) -> Result<Vec<u8>> {
let mut packet = Vec::new();
packet.extend_from_slice(&9u16.to_be_bytes()); let count = u16::try_from(templates.len())
.map_err(|_| NetflowError::Generation("Too many templates (max 65535)".to_string()))?;
packet.extend_from_slice(&count.to_be_bytes()); packet.extend_from_slice(&sys_up_time.to_be_bytes());
packet.extend_from_slice(&unix_secs.to_be_bytes());
packet.extend_from_slice(&sequence_number.to_be_bytes());
packet.extend_from_slice(&source_id.to_be_bytes());
for (template_id, fields) in templates {
let flowset_id = 0u16; packet.extend_from_slice(&flowset_id.to_be_bytes());
let length_pos = packet.len();
packet.extend_from_slice(&0u16.to_be_bytes());
packet.extend_from_slice(&template_id.to_be_bytes());
let field_count = u16::try_from(fields.len()).map_err(|_| {
NetflowError::Generation("Too many fields in template (max 65535)".to_string())
})?;
packet.extend_from_slice(&field_count.to_be_bytes());
for field in fields {
let field_type = field_name_to_id(&field.field_type).ok_or_else(|| {
NetflowError::Generation(format!("Unknown field type: {}", field.field_type))
})?;
packet.extend_from_slice(&field_type.to_be_bytes());
packet.extend_from_slice(&field.field_length.to_be_bytes());
}
let flowset_length = packet
.len()
.checked_sub(length_pos)
.and_then(|v| v.checked_add(2))
.and_then(|v| u16::try_from(v).ok())
.ok_or_else(|| NetflowError::Generation("Flowset length overflow".to_string()))?;
let end_pos = length_pos
.checked_add(2)
.ok_or_else(|| NetflowError::Generation("Array index overflow".to_string()))?;
packet[length_pos..end_pos].copy_from_slice(&flowset_length.to_be_bytes());
}
Ok(packet)
}
fn build_data_packet(
sys_up_time: u32,
unix_secs: u32,
sequence_number: u32,
source_id: u32,
template_id: u16,
template_fields: &[crate::config::schema::V9TemplateField],
records: &[serde_yaml::Value],
) -> Result<Vec<u8>> {
let mut packet = Vec::new();
packet.extend_from_slice(&9u16.to_be_bytes()); packet.extend_from_slice(&1u16.to_be_bytes()); packet.extend_from_slice(&sys_up_time.to_be_bytes());
packet.extend_from_slice(&unix_secs.to_be_bytes());
packet.extend_from_slice(&sequence_number.to_be_bytes());
packet.extend_from_slice(&source_id.to_be_bytes());
packet.extend_from_slice(&template_id.to_be_bytes());
let length_pos = packet.len();
packet.extend_from_slice(&0u16.to_be_bytes());
for record in records {
for field in template_fields {
let field_type = field_name_to_id(&field.field_type).ok_or_else(|| {
NetflowError::Generation(format!("Unknown field type: {}", field.field_type))
})?;
let field_name = v9_field_id_to_name(field_type);
let value =
get_field_value(record, field_name).unwrap_or(serde_yaml::Value::Number(0.into()));
let bytes = serialize_field_value(&value, field.field_length);
packet.extend_from_slice(&bytes);
}
}
while packet
.len()
.checked_sub(length_pos)
.and_then(|v| v.checked_add(2))
.map(|v| v % 4 != 0)
.unwrap_or(false)
{
packet.push(0);
}
let flowset_length = packet
.len()
.checked_sub(length_pos)
.and_then(|v| v.checked_add(2))
.and_then(|v| u16::try_from(v).ok())
.ok_or_else(|| NetflowError::Generation("Flowset length overflow".to_string()))?;
let end_pos = length_pos
.checked_add(2)
.ok_or_else(|| NetflowError::Generation("Array index overflow".to_string()))?;
packet[length_pos..end_pos].copy_from_slice(&flowset_length.to_be_bytes());
Ok(packet)
}
fn field_name_to_id(name: &str) -> Option<u16> {
match name {
"IN_BYTES" => Some(1),
"IN_PKTS" => Some(2),
"FLOWS" => Some(3),
"PROTOCOL" => Some(4),
"SRC_TOS" => Some(5),
"TCP_FLAGS" => Some(6),
"L4_SRC_PORT" => Some(7),
"IPV4_SRC_ADDR" => Some(8),
"SRC_MASK" => Some(9),
"INPUT_SNMP" => Some(10),
"L4_DST_PORT" => Some(11),
"IPV4_DST_ADDR" => Some(12),
"DST_MASK" => Some(13),
"OUTPUT_SNMP" => Some(14),
"IPV4_NEXT_HOP" => Some(15),
"SRC_AS" => Some(16),
"DST_AS" => Some(17),
"BGP_IPV4_NEXT_HOP" => Some(18),
"MUL_DST_PKTS" => Some(19),
"MUL_DST_BYTES" => Some(20),
"LAST_SWITCHED" => Some(21),
"FIRST_SWITCHED" => Some(22),
"OUT_BYTES" => Some(23),
"OUT_PKTS" => Some(24),
_ => None,
}
}