use crate::config::schema::{IPFixConfig, IPFixFlowSet as ConfigIPFixFlowSet};
use crate::error::{NetflowError, Result};
use crate::generator::field_serializer::{
get_field_value, ipfix_field_id_to_name, serialize_field_value,
};
use std::time::{SystemTime, UNIX_EPOCH};
pub fn build_ipfix_packets(
config: IPFixConfig,
override_sequence_number: Option<u32>,
send_templates: bool,
) -> Result<(Vec<Vec<u8>>, u32)> {
let mut packets = Vec::new();
let (export_time, mut sequence_number, observation_domain_id) =
get_header_values(&config, override_sequence_number)?;
let mut templates = Vec::new();
let mut data_flowsets = Vec::new();
for flowset in &config.flowsets {
match flowset {
ConfigIPFixFlowSet::Template {
template_id,
fields,
} => {
templates.push((*template_id, fields.clone()));
}
ConfigIPFixFlowSet::Data {
template_id,
records,
} => {
data_flowsets.push((*template_id, records.clone()));
}
}
}
if !templates.is_empty() && send_templates {
let template_packet = build_template_packet(
export_time,
sequence_number,
observation_domain_id,
&templates,
)?;
packets.push(template_packet);
}
for (template_id, records) in data_flowsets {
let template_fields = templates
.iter()
.find(|(id, _)| *id == template_id)
.map(|(_, fields)| fields)
.ok_or_else(|| {
NetflowError::Generation(format!(
"Data flowset references undefined template ID: {}",
template_id
))
})?;
let data_packet = build_data_packet(
export_time,
sequence_number,
observation_domain_id,
template_id,
template_fields,
&records,
)?;
packets.push(data_packet);
let num_records = u32::try_from(records.len()).map_err(|_| {
NetflowError::Generation("Too many records (max 4294967295)".to_string())
})?;
sequence_number = sequence_number
.checked_add(num_records)
.ok_or_else(|| NetflowError::Generation("Sequence number overflow".to_string()))?;
}
if packets.is_empty() {
return Err(NetflowError::Generation(
"IPFIX configuration must contain at least one template or data flowset".to_string(),
));
}
Ok((packets, sequence_number))
}
fn get_header_values(
config: &IPFixConfig,
override_sequence_number: Option<u32>,
) -> Result<(u32, u32, u32)> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| NetflowError::Generation(format!("Failed to get system time: {}", e)))?;
let export_time = if let Some(ref h) = config.header {
h.export_time
.unwrap_or_else(|| u32::try_from(now.as_secs()).unwrap_or(u32::MAX))
} else {
u32::try_from(now.as_secs()).unwrap_or(u32::MAX)
};
let sequence_number = if let Some(override_seq) = override_sequence_number {
override_seq
} else if let Some(ref h) = config.header {
h.sequence_number.unwrap_or(0)
} else {
0
};
let observation_domain_id = if let Some(ref h) = config.header {
h.observation_domain_id.unwrap_or(1)
} else {
1
};
Ok((export_time, sequence_number, observation_domain_id))
}
fn build_template_packet(
export_time: u32,
sequence_number: u32,
observation_domain_id: u32,
templates: &[(u16, Vec<crate::config::schema::IPFixTemplateField>)],
) -> Result<Vec<u8>> {
let mut packet = Vec::new();
packet.extend_from_slice(&10u16.to_be_bytes());
let length_pos = packet.len();
packet.extend_from_slice(&0u16.to_be_bytes());
packet.extend_from_slice(&export_time.to_be_bytes());
packet.extend_from_slice(&sequence_number.to_be_bytes());
packet.extend_from_slice(&observation_domain_id.to_be_bytes());
for (template_id, fields) in templates {
let set_id = 2u16; packet.extend_from_slice(&set_id.to_be_bytes());
let set_length_pos = packet.len();
packet.extend_from_slice(&0u16.to_be_bytes());
packet.extend_from_slice(&template_id.to_be_bytes());
let field_count = u16::try_from(fields.len()).map_err(|_| {
NetflowError::Generation("Too many fields in template (max 65535)".to_string())
})?;
packet.extend_from_slice(&field_count.to_be_bytes());
for field in fields {
let field_type = field_name_to_id(&field.field_type).ok_or_else(|| {
NetflowError::Generation(format!("Unknown field type: {}", field.field_type))
})?;
packet.extend_from_slice(&field_type.to_be_bytes());
packet.extend_from_slice(&field.field_length.to_be_bytes());
}
while packet
.len()
.checked_sub(set_length_pos)
.and_then(|v| v.checked_add(2))
.map(|v| v % 4 != 0)
.unwrap_or(false)
{
packet.push(0);
}
let set_length = packet
.len()
.checked_sub(set_length_pos)
.and_then(|v| v.checked_add(2))
.and_then(|v| u16::try_from(v).ok())
.ok_or_else(|| NetflowError::Generation("Set length overflow".to_string()))?;
let end_pos = set_length_pos
.checked_add(2)
.ok_or_else(|| NetflowError::Generation("Array index overflow".to_string()))?;
packet[set_length_pos..end_pos].copy_from_slice(&set_length.to_be_bytes());
}
let total_length = u16::try_from(packet.len())
.map_err(|_| NetflowError::Generation("Packet length exceeds u16::MAX".to_string()))?;
let end_pos = length_pos
.checked_add(2)
.ok_or_else(|| NetflowError::Generation("Array index overflow".to_string()))?;
packet[length_pos..end_pos].copy_from_slice(&total_length.to_be_bytes());
Ok(packet)
}
fn build_data_packet(
export_time: u32,
sequence_number: u32,
observation_domain_id: u32,
template_id: u16,
template_fields: &[crate::config::schema::IPFixTemplateField],
records: &[serde_yaml::Value],
) -> Result<Vec<u8>> {
let mut packet = Vec::new();
packet.extend_from_slice(&10u16.to_be_bytes());
let length_pos = packet.len();
packet.extend_from_slice(&0u16.to_be_bytes());
packet.extend_from_slice(&export_time.to_be_bytes());
packet.extend_from_slice(&sequence_number.to_be_bytes());
packet.extend_from_slice(&observation_domain_id.to_be_bytes());
packet.extend_from_slice(&template_id.to_be_bytes());
let set_length_pos = packet.len();
packet.extend_from_slice(&0u16.to_be_bytes());
for record in records {
for field in template_fields {
let field_type = field_name_to_id(&field.field_type).ok_or_else(|| {
NetflowError::Generation(format!("Unknown field type: {}", field.field_type))
})?;
let field_name = ipfix_field_id_to_name(field_type);
let value =
get_field_value(record, field_name).unwrap_or(serde_yaml::Value::Number(0.into()));
let bytes = serialize_field_value(&value, field.field_length);
packet.extend_from_slice(&bytes);
}
}
while packet
.len()
.checked_sub(set_length_pos)
.and_then(|v| v.checked_add(2))
.map(|v| v % 4 != 0)
.unwrap_or(false)
{
packet.push(0);
}
let set_length = packet
.len()
.checked_sub(set_length_pos)
.and_then(|v| v.checked_add(2))
.and_then(|v| u16::try_from(v).ok())
.ok_or_else(|| NetflowError::Generation("Set length overflow".to_string()))?;
let set_end_pos = set_length_pos
.checked_add(2)
.ok_or_else(|| NetflowError::Generation("Array index overflow".to_string()))?;
packet[set_length_pos..set_end_pos].copy_from_slice(&set_length.to_be_bytes());
let total_length = u16::try_from(packet.len())
.map_err(|_| NetflowError::Generation("Packet length exceeds u16::MAX".to_string()))?;
let length_end_pos = length_pos
.checked_add(2)
.ok_or_else(|| NetflowError::Generation("Array index overflow".to_string()))?;
packet[length_pos..length_end_pos].copy_from_slice(&total_length.to_be_bytes());
Ok(packet)
}
fn field_name_to_id(name: &str) -> Option<u16> {
match name {
"octetDeltaCount" => Some(1),
"packetDeltaCount" => Some(2),
"deltaFlowCount" => Some(3),
"protocolIdentifier" => Some(4),
"ipClassOfService" => Some(5),
"tcpControlBits" => Some(6),
"sourceTransportPort" => Some(7),
"sourceIPv4Address" => Some(8),
"sourceIPv4PrefixLength" => Some(9),
"ingressInterface" => Some(10),
"destinationTransportPort" => Some(11),
"destinationIPv4Address" => Some(12),
"destinationIPv4PrefixLength" => Some(13),
"egressInterface" => Some(14),
"ipNextHopIPv4Address" => Some(15),
"bgpSourceAsNumber" => Some(16),
"bgpDestinationAsNumber" => Some(17),
"bgpNextHopIPv4Address" => Some(18),
"flowEndSysUpTime" => Some(21),
"flowStartSysUpTime" => Some(22),
_ => None,
}
}