use super::error::{ParseError, Result};
use super::Parser;
use crate::models::*;
use std::io::{self, Cursor, Read};
impl<R: Read> Parser<R> {
pub(super) fn parse_flow_sample(&mut self) -> Result<FlowSample> {
let sequence_number = self.read_u32()?;
let source_id = self.parse_data_source()?;
let sampling_rate = self.read_u32()?;
let sample_pool = self.read_u32()?;
let drops = self.read_u32()?;
let input = self.parse_interface()?;
let output = self.parse_interface()?;
let num_records = self.read_u32()?;
let capacity = num_records.min(1024) as usize;
let mut flow_records = Vec::with_capacity(capacity);
for _ in 0..num_records {
flow_records.push(self.parse_flow_record()?);
}
Ok(FlowSample {
sequence_number,
source_id,
sampling_rate,
sample_pool,
drops,
input,
output,
flow_records,
})
}
pub(super) fn parse_counters_sample(&mut self) -> Result<CountersSample> {
let sequence_number = self.read_u32()?;
let source_id = self.parse_data_source()?;
let num_records = self.read_u32()?;
let capacity = num_records.min(1024) as usize;
let mut counters = Vec::with_capacity(capacity);
for _ in 0..num_records {
counters.push(self.parse_counter_record()?);
}
Ok(CountersSample {
sequence_number,
source_id,
counters,
})
}
pub(super) fn parse_flow_sample_expanded(&mut self) -> Result<FlowSampleExpanded> {
let sequence_number = self.read_u32()?;
let source_id = self.parse_data_source_expanded()?;
let sampling_rate = self.read_u32()?;
let sample_pool = self.read_u32()?;
let drops = self.read_u32()?;
let input = self.parse_interface_expanded()?;
let output = self.parse_interface_expanded()?;
let num_records = self.read_u32()?;
let capacity = num_records.min(1024) as usize;
let mut flow_records = Vec::with_capacity(capacity);
for _ in 0..num_records {
flow_records.push(self.parse_flow_record()?);
}
Ok(FlowSampleExpanded {
sequence_number,
source_id,
sampling_rate,
sample_pool,
drops,
input,
output,
flow_records,
})
}
pub(super) fn parse_counters_sample_expanded(&mut self) -> Result<CountersSampleExpanded> {
let sequence_number = self.read_u32()?;
let source_id = self.parse_data_source_expanded()?;
let num_records = self.read_u32()?;
let capacity = num_records.min(1024) as usize;
let mut counters = Vec::with_capacity(capacity);
for _ in 0..num_records {
counters.push(self.parse_counter_record()?);
}
Ok(CountersSampleExpanded {
sequence_number,
source_id,
counters,
})
}
pub(super) fn parse_discarded_packet(&mut self) -> Result<DiscardedPacket> {
let sequence_number = self.read_u32()?;
let source_id = self.parse_data_source_expanded()?;
let drops = self.read_u32()?;
let input_ifindex = self.read_u32()?;
let output_ifindex = self.read_u32()?;
let reason_value = self.read_u32()?;
let reason = crate::models::DropReason::from_u32(reason_value)
.unwrap_or(crate::models::DropReason::Unknown);
let num_records = self.read_u32()?;
let capacity = num_records.min(1024) as usize;
let mut flow_records = Vec::with_capacity(capacity);
for _ in 0..num_records {
flow_records.push(self.parse_flow_record()?);
}
Ok(DiscardedPacket {
sequence_number,
source_id,
drops,
input_ifindex,
output_ifindex,
reason,
flow_records,
})
}
fn parse_sample_data(&mut self, format: DataFormat, data: Vec<u8>) -> Result<SampleData> {
let mut cursor = Cursor::new(data.clone());
let mut parser = Parser::new(&mut cursor);
if format.enterprise() == 0 {
match format.format() {
1 => {
let sample = parser.parse_flow_sample()?;
Ok(SampleData::FlowSample(sample))
}
2 => {
let sample = parser.parse_counters_sample()?;
Ok(SampleData::CountersSample(sample))
}
3 => {
let sample = parser.parse_flow_sample_expanded()?;
Ok(SampleData::FlowSampleExpanded(sample))
}
4 => {
let sample = parser.parse_counters_sample_expanded()?;
Ok(SampleData::CountersSampleExpanded(sample))
}
5 => {
let sample = parser.parse_discarded_packet()?;
Ok(SampleData::DiscardedPacket(sample))
}
_ => Ok(SampleData::Unknown { format, data }),
}
} else if format.enterprise() == 4300 {
match format.format() {
1002 => Ok(SampleData::RtMetric { format, data }),
1003 => Ok(SampleData::RtFlow { format, data }),
_ => Ok(SampleData::Unknown { format, data }),
}
} else {
Ok(SampleData::Unknown { format, data })
}
}
fn parse_sample_record(&mut self) -> Result<SampleRecord> {
let sample_type = self.parse_data_format()?;
let sample_data_raw = self.read_opaque()?;
let sample_data = self.parse_sample_data(sample_type, sample_data_raw)?;
Ok(SampleRecord {
sample_type,
sample_data,
})
}
pub fn parse_datagram(&mut self) -> Result<SFlowDatagram> {
let version = self.read_u32()?;
if version != 5 {
return Err(ParseError::InvalidData(format!(
"Invalid version: expected 5, got {}",
version
)));
}
let agent_address = self.parse_address()?;
let sub_agent_id = self.read_u32()?;
let sequence_number = self.read_u32()?;
let uptime = self.read_u32()?;
let num_samples = self.read_u32()?;
let capacity = num_samples.min(1024) as usize;
let mut samples = Vec::with_capacity(capacity);
for _ in 0..num_samples {
samples.push(self.parse_sample_record()?);
}
Ok(SFlowDatagram {
version: DatagramVersion::Version5,
agent_address,
sub_agent_id,
sequence_number,
uptime,
samples,
})
}
}
pub fn parse_datagram(data: &[u8]) -> Result<SFlowDatagram> {
let mut parser = Parser::new(Cursor::new(data));
parser.parse_datagram()
}
pub fn parse_datagrams(data: &[u8]) -> Result<Vec<SFlowDatagram>> {
let mut datagrams = Vec::new();
let mut cursor = Cursor::new(data);
loop {
let pos = cursor.position();
if pos >= data.len() as u64 {
break;
}
match Parser::new(&mut cursor).parse_datagram() {
Ok(datagram) => datagrams.push(datagram),
Err(ParseError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
break;
}
Err(e) => return Err(e),
}
}
Ok(datagrams)
}