sflow_parser/parsers/
datagram.rs1use super::error::{ParseError, Result};
6use super::Parser;
7use crate::models::*;
8use std::io::{self, Cursor, Read};
9
10impl<R: Read> Parser<R> {
11 pub(super) fn parse_flow_sample(&mut self) -> Result<FlowSample> {
13 let sequence_number = self.read_u32()?;
14 let source_id = self.parse_data_source()?;
15 let sampling_rate = self.read_u32()?;
16 let sample_pool = self.read_u32()?;
17 let drops = self.read_u32()?;
18 let input = self.parse_interface()?;
19 let output = self.parse_interface()?;
20
21 let num_records = self.read_u32()?;
23 let capacity = num_records.min(1024) as usize;
25 let mut flow_records = Vec::with_capacity(capacity);
26 for _ in 0..num_records {
27 flow_records.push(self.parse_flow_record()?);
28 }
29
30 Ok(FlowSample {
31 sequence_number,
32 source_id,
33 sampling_rate,
34 sample_pool,
35 drops,
36 input,
37 output,
38 flow_records,
39 })
40 }
41
42 pub(super) fn parse_counters_sample(&mut self) -> Result<CountersSample> {
44 let sequence_number = self.read_u32()?;
45 let source_id = self.parse_data_source()?;
46
47 let num_records = self.read_u32()?;
49 let capacity = num_records.min(1024) as usize;
51 let mut counters = Vec::with_capacity(capacity);
52 for _ in 0..num_records {
53 counters.push(self.parse_counter_record()?);
54 }
55
56 Ok(CountersSample {
57 sequence_number,
58 source_id,
59 counters,
60 })
61 }
62
63 pub(super) fn parse_flow_sample_expanded(&mut self) -> Result<FlowSampleExpanded> {
65 let sequence_number = self.read_u32()?;
66 let source_id = self.parse_data_source_expanded()?;
67 let sampling_rate = self.read_u32()?;
68 let sample_pool = self.read_u32()?;
69 let drops = self.read_u32()?;
70 let input = self.parse_interface_expanded()?;
71 let output = self.parse_interface_expanded()?;
72
73 let num_records = self.read_u32()?;
75 let capacity = num_records.min(1024) as usize;
77 let mut flow_records = Vec::with_capacity(capacity);
78 for _ in 0..num_records {
79 flow_records.push(self.parse_flow_record()?);
80 }
81
82 Ok(FlowSampleExpanded {
83 sequence_number,
84 source_id,
85 sampling_rate,
86 sample_pool,
87 drops,
88 input,
89 output,
90 flow_records,
91 })
92 }
93
94 pub(super) fn parse_counters_sample_expanded(&mut self) -> Result<CountersSampleExpanded> {
96 let sequence_number = self.read_u32()?;
97 let source_id = self.parse_data_source_expanded()?;
98
99 let num_records = self.read_u32()?;
101 let capacity = num_records.min(1024) as usize;
103 let mut counters = Vec::with_capacity(capacity);
104 for _ in 0..num_records {
105 counters.push(self.parse_counter_record()?);
106 }
107
108 Ok(CountersSampleExpanded {
109 sequence_number,
110 source_id,
111 counters,
112 })
113 }
114
115 pub(super) fn parse_discarded_packet(&mut self) -> Result<DiscardedPacket> {
117 let sequence_number = self.read_u32()?;
118 let source_id = self.parse_data_source_expanded()?;
119 let drops = self.read_u32()?;
120 let input_ifindex = self.read_u32()?;
121 let output_ifindex = self.read_u32()?;
122
123 let reason_value = self.read_u32()?;
125 let reason = crate::models::DropReason::from_u32(reason_value)
126 .unwrap_or(crate::models::DropReason::Unknown);
127
128 let num_records = self.read_u32()?;
130 let capacity = num_records.min(1024) as usize;
132 let mut flow_records = Vec::with_capacity(capacity);
133 for _ in 0..num_records {
134 flow_records.push(self.parse_flow_record()?);
135 }
136
137 Ok(DiscardedPacket {
138 sequence_number,
139 source_id,
140 drops,
141 input_ifindex,
142 output_ifindex,
143 reason,
144 flow_records,
145 })
146 }
147
148 fn parse_sample_data(&mut self, format: DataFormat, data: Vec<u8>) -> Result<SampleData> {
150 let mut cursor = Cursor::new(data.clone());
151 let mut parser = Parser::new(&mut cursor);
152
153 if format.enterprise() == 0 {
155 match format.format() {
156 1 => {
157 let sample = parser.parse_flow_sample()?;
158 Ok(SampleData::FlowSample(sample))
159 }
160 2 => {
161 let sample = parser.parse_counters_sample()?;
162 Ok(SampleData::CountersSample(sample))
163 }
164 3 => {
165 let sample = parser.parse_flow_sample_expanded()?;
166 Ok(SampleData::FlowSampleExpanded(sample))
167 }
168 4 => {
169 let sample = parser.parse_counters_sample_expanded()?;
170 Ok(SampleData::CountersSampleExpanded(sample))
171 }
172 5 => {
173 let sample = parser.parse_discarded_packet()?;
174 Ok(SampleData::DiscardedPacket(sample))
175 }
176 _ => Ok(SampleData::Unknown { format, data }),
177 }
178 } else if format.enterprise() == 4300 {
179 match format.format() {
181 1002 => Ok(SampleData::RtMetric { format, data }),
182 1003 => Ok(SampleData::RtFlow { format, data }),
183 _ => Ok(SampleData::Unknown { format, data }),
184 }
185 } else {
186 Ok(SampleData::Unknown { format, data })
188 }
189 }
190
191 fn parse_sample_record(&mut self) -> Result<SampleRecord> {
193 let sample_type = self.parse_data_format()?;
194 let sample_data_raw = self.read_opaque()?;
195 let sample_data = self.parse_sample_data(sample_type, sample_data_raw)?;
196
197 Ok(SampleRecord {
198 sample_type,
199 sample_data,
200 })
201 }
202
203 pub fn parse_datagram(&mut self) -> Result<SFlowDatagram> {
205 let version = self.read_u32()?;
207 if version != 5 {
208 return Err(ParseError::InvalidData(format!(
209 "Invalid version: expected 5, got {}",
210 version
211 )));
212 }
213
214 let agent_address = self.parse_address()?;
216
217 let sub_agent_id = self.read_u32()?;
219
220 let sequence_number = self.read_u32()?;
222
223 let uptime = self.read_u32()?;
225
226 let num_samples = self.read_u32()?;
228 let capacity = num_samples.min(1024) as usize;
230 let mut samples = Vec::with_capacity(capacity);
231 for _ in 0..num_samples {
232 samples.push(self.parse_sample_record()?);
233 }
234
235 Ok(SFlowDatagram {
236 version: DatagramVersion::Version5,
237 agent_address,
238 sub_agent_id,
239 sequence_number,
240 uptime,
241 samples,
242 })
243 }
244}
245
246pub fn parse_datagram(data: &[u8]) -> Result<SFlowDatagram> {
248 let mut parser = Parser::new(Cursor::new(data));
249 parser.parse_datagram()
250}
251
252pub fn parse_datagrams(data: &[u8]) -> Result<Vec<SFlowDatagram>> {
255 let mut datagrams = Vec::new();
256 let mut cursor = Cursor::new(data);
257
258 loop {
259 let pos = cursor.position();
260 if pos >= data.len() as u64 {
261 break;
262 }
263
264 match Parser::new(&mut cursor).parse_datagram() {
265 Ok(datagram) => datagrams.push(datagram),
266 Err(ParseError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
267 break;
269 }
270 Err(e) => return Err(e),
271 }
272 }
273
274 Ok(datagrams)
275}