use crate::protocol::ProtocolTypes;
use crate::{NetflowError, NetflowPacket, ParsedNetflow};
use nom::error::{Error, ErrorKind};
use serde::Serialize;
use std::net::Ipv4Addr;
pub struct V7Parser;
impl V7Parser {
pub(crate) fn parse(packet: &[u8]) -> ParsedNetflow<'_> {
match V7::parse_direct(packet) {
Ok((remaining, v7)) => ParsedNetflow::Success {
packet: NetflowPacket::V7(v7),
remaining,
},
Err(e) => ParsedNetflow::Error {
error: NetflowError::Partial {
message: format!("V7 parse error: {}", e),
},
},
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
pub struct V7 {
pub header: Header,
pub flowsets: Vec<FlowSet>,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize)]
pub struct Header {
pub version: u16,
pub count: u16,
pub sys_up_time: u32,
pub unix_secs: u32,
pub unix_nsecs: u32,
pub flow_sequence: u32,
pub reserved: u32,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
pub struct FlowSet {
pub src_addr: Ipv4Addr,
pub dst_addr: Ipv4Addr,
pub next_hop: Ipv4Addr,
pub input: u16,
pub output: u16,
pub d_pkts: u32,
pub d_octets: u32,
pub first: u32,
pub last: u32,
pub src_port: u16,
pub dst_port: u16,
pub flags_fields_valid: u8,
pub tcp_flags: u8,
pub protocol_number: u8,
pub protocol_type: ProtocolTypes,
pub tos: u8,
pub src_as: u16,
pub dst_as: u16,
pub src_mask: u8,
pub dst_mask: u8,
pub flags_fields_invalid: u16,
pub router_src: Ipv4Addr,
}
const HEADER_SIZE: usize = 22;
const FLOW_SIZE: usize = 52;
const MAX_FLOWS: u16 = 28;
impl V7 {
#[inline]
pub fn parse_direct(input: &[u8]) -> nom::IResult<&[u8], V7> {
if input.len() < HEADER_SIZE {
return Err(nom::Err::Error(Error::new(input, ErrorKind::Eof)));
}
let count = u16::from_be_bytes([input[0], input[1]]);
if count == 0 || count > MAX_FLOWS {
return Err(nom::Err::Error(Error::new(input, ErrorKind::TooLarge)));
}
let header = Header {
version: 7,
count,
sys_up_time: u32::from_be_bytes([input[2], input[3], input[4], input[5]]),
unix_secs: u32::from_be_bytes([input[6], input[7], input[8], input[9]]),
unix_nsecs: u32::from_be_bytes([input[10], input[11], input[12], input[13]]),
flow_sequence: u32::from_be_bytes([input[14], input[15], input[16], input[17]]),
reserved: u32::from_be_bytes([input[18], input[19], input[20], input[21]]),
};
let total = (count as usize)
.checked_mul(FLOW_SIZE)
.and_then(|flows_len| flows_len.checked_add(HEADER_SIZE))
.ok_or_else(|| nom::Err::Error(Error::new(input, ErrorKind::TooLarge)))?;
if input.len() < total {
return Err(nom::Err::Error(Error::new(input, ErrorKind::Eof)));
}
let mut flowsets = Vec::with_capacity(count as usize);
let mut offset = HEADER_SIZE;
for _ in 0..count {
let b = &input[offset..offset + FLOW_SIZE];
let protocol_number = b[38];
flowsets.push(FlowSet {
src_addr: Ipv4Addr::new(b[0], b[1], b[2], b[3]),
dst_addr: Ipv4Addr::new(b[4], b[5], b[6], b[7]),
next_hop: Ipv4Addr::new(b[8], b[9], b[10], b[11]),
input: u16::from_be_bytes([b[12], b[13]]),
output: u16::from_be_bytes([b[14], b[15]]),
d_pkts: u32::from_be_bytes([b[16], b[17], b[18], b[19]]),
d_octets: u32::from_be_bytes([b[20], b[21], b[22], b[23]]),
first: u32::from_be_bytes([b[24], b[25], b[26], b[27]]),
last: u32::from_be_bytes([b[28], b[29], b[30], b[31]]),
src_port: u16::from_be_bytes([b[32], b[33]]),
dst_port: u16::from_be_bytes([b[34], b[35]]),
flags_fields_valid: b[36],
tcp_flags: b[37],
protocol_number,
protocol_type: ProtocolTypes::from(protocol_number),
tos: b[39],
src_as: u16::from_be_bytes([b[40], b[41]]),
dst_as: u16::from_be_bytes([b[42], b[43]]),
src_mask: b[44],
dst_mask: b[45],
flags_fields_invalid: u16::from_be_bytes([b[46], b[47]]),
router_src: Ipv4Addr::new(b[48], b[49], b[50], b[51]),
});
offset += FLOW_SIZE;
}
Ok((&input[total..], V7 { header, flowsets }))
}
pub fn to_be_bytes(&self) -> Vec<u8> {
let to_emit = self.flowsets.len().min(MAX_FLOWS as usize);
let mut result = Vec::with_capacity(2 + HEADER_SIZE + to_emit * FLOW_SIZE);
let count = to_emit as u16;
result.extend_from_slice(&self.header.version.to_be_bytes());
result.extend_from_slice(&count.to_be_bytes());
result.extend_from_slice(&self.header.sys_up_time.to_be_bytes());
result.extend_from_slice(&self.header.unix_secs.to_be_bytes());
result.extend_from_slice(&self.header.unix_nsecs.to_be_bytes());
result.extend_from_slice(&self.header.flow_sequence.to_be_bytes());
result.extend_from_slice(&self.header.reserved.to_be_bytes());
for set in self.flowsets.iter().take(to_emit) {
result.extend_from_slice(&set.src_addr.octets());
result.extend_from_slice(&set.dst_addr.octets());
result.extend_from_slice(&set.next_hop.octets());
result.extend_from_slice(&set.input.to_be_bytes());
result.extend_from_slice(&set.output.to_be_bytes());
result.extend_from_slice(&set.d_pkts.to_be_bytes());
result.extend_from_slice(&set.d_octets.to_be_bytes());
result.extend_from_slice(&set.first.to_be_bytes());
result.extend_from_slice(&set.last.to_be_bytes());
result.extend_from_slice(&set.src_port.to_be_bytes());
result.extend_from_slice(&set.dst_port.to_be_bytes());
result.extend_from_slice(&set.flags_fields_valid.to_be_bytes());
result.extend_from_slice(&set.tcp_flags.to_be_bytes());
result.extend_from_slice(&set.protocol_number.to_be_bytes());
result.extend_from_slice(&set.tos.to_be_bytes());
result.extend_from_slice(&set.src_as.to_be_bytes());
result.extend_from_slice(&set.dst_as.to_be_bytes());
result.extend_from_slice(&set.src_mask.to_be_bytes());
result.extend_from_slice(&set.dst_mask.to_be_bytes());
result.extend_from_slice(&set.flags_fields_invalid.to_be_bytes());
result.extend_from_slice(&set.router_src.octets());
}
result
}
}