sflow_parser/parsers/
datagram.rs

1//! Datagram and sample parsing
2//!
3//! This module contains top-level parsing functions for sFlow datagrams and samples.
4
5use super::error::{ParseError, Result};
6use super::Parser;
7use crate::models::*;
8use std::io::{self, Cursor, Read};
9
10impl<R: Read> Parser<R> {
11    /// Parse a compact flow sample
12    pub(super) fn parse_flow_sample(&mut self) -> Result<FlowSample> {
13        let sequence_number = self.read_u32()?;
14        let source_id = self.parse_data_source()?;
15        let sampling_rate = self.read_u32()?;
16        let sample_pool = self.read_u32()?;
17        let drops = self.read_u32()?;
18        let input = self.parse_interface()?;
19        let output = self.parse_interface()?;
20
21        // Parse flow records array
22        let num_records = self.read_u32()?;
23        // Limit capacity to prevent OOM attacks - allocate conservatively
24        let capacity = num_records.min(1024) as usize;
25        let mut flow_records = Vec::with_capacity(capacity);
26        for _ in 0..num_records {
27            flow_records.push(self.parse_flow_record()?);
28        }
29
30        Ok(FlowSample {
31            sequence_number,
32            source_id,
33            sampling_rate,
34            sample_pool,
35            drops,
36            input,
37            output,
38            flow_records,
39        })
40    }
41
42    /// Parse a compact counter sample
43    pub(super) fn parse_counters_sample(&mut self) -> Result<CountersSample> {
44        let sequence_number = self.read_u32()?;
45        let source_id = self.parse_data_source()?;
46
47        // Parse counter records array
48        let num_records = self.read_u32()?;
49        // Limit capacity to prevent OOM attacks - allocate conservatively
50        let capacity = num_records.min(1024) as usize;
51        let mut counters = Vec::with_capacity(capacity);
52        for _ in 0..num_records {
53            counters.push(self.parse_counter_record()?);
54        }
55
56        Ok(CountersSample {
57            sequence_number,
58            source_id,
59            counters,
60        })
61    }
62
63    /// Parse an expanded flow sample
64    pub(super) fn parse_flow_sample_expanded(&mut self) -> Result<FlowSampleExpanded> {
65        let sequence_number = self.read_u32()?;
66        let source_id = self.parse_data_source_expanded()?;
67        let sampling_rate = self.read_u32()?;
68        let sample_pool = self.read_u32()?;
69        let drops = self.read_u32()?;
70        let input = self.parse_interface_expanded()?;
71        let output = self.parse_interface_expanded()?;
72
73        // Parse flow records array
74        let num_records = self.read_u32()?;
75        // Limit capacity to prevent OOM attacks - allocate conservatively
76        let capacity = num_records.min(1024) as usize;
77        let mut flow_records = Vec::with_capacity(capacity);
78        for _ in 0..num_records {
79            flow_records.push(self.parse_flow_record()?);
80        }
81
82        Ok(FlowSampleExpanded {
83            sequence_number,
84            source_id,
85            sampling_rate,
86            sample_pool,
87            drops,
88            input,
89            output,
90            flow_records,
91        })
92    }
93
94    /// Parse an expanded counter sample
95    pub(super) fn parse_counters_sample_expanded(&mut self) -> Result<CountersSampleExpanded> {
96        let sequence_number = self.read_u32()?;
97        let source_id = self.parse_data_source_expanded()?;
98
99        // Parse counter records array
100        let num_records = self.read_u32()?;
101        // Limit capacity to prevent OOM attacks - allocate conservatively
102        let capacity = num_records.min(1024) as usize;
103        let mut counters = Vec::with_capacity(capacity);
104        for _ in 0..num_records {
105            counters.push(self.parse_counter_record()?);
106        }
107
108        Ok(CountersSampleExpanded {
109            sequence_number,
110            source_id,
111            counters,
112        })
113    }
114
115    /// Parse a discarded packet sample
116    pub(super) fn parse_discarded_packet(&mut self) -> Result<DiscardedPacket> {
117        let sequence_number = self.read_u32()?;
118        let source_id = self.parse_data_source_expanded()?;
119        let drops = self.read_u32()?;
120        let input_ifindex = self.read_u32()?;
121        let output_ifindex = self.read_u32()?;
122
123        // Parse drop reason
124        let reason_value = self.read_u32()?;
125        let reason = crate::models::DropReason::from_u32(reason_value)
126            .unwrap_or(crate::models::DropReason::Unknown);
127
128        // Parse flow records array
129        let num_records = self.read_u32()?;
130        // Limit capacity to prevent OOM attacks - allocate conservatively
131        let capacity = num_records.min(1024) as usize;
132        let mut flow_records = Vec::with_capacity(capacity);
133        for _ in 0..num_records {
134            flow_records.push(self.parse_flow_record()?);
135        }
136
137        Ok(DiscardedPacket {
138            sequence_number,
139            source_id,
140            drops,
141            input_ifindex,
142            output_ifindex,
143            reason,
144            flow_records,
145        })
146    }
147
148    /// Parse sample data based on format
149    fn parse_sample_data(&mut self, format: DataFormat, data: Vec<u8>) -> Result<SampleData> {
150        let mut cursor = Cursor::new(data.clone());
151        let mut parser = Parser::new(&mut cursor);
152
153        // Standard sFlow formats (enterprise = 0)
154        if format.enterprise() == 0 {
155            match format.format() {
156                1 => {
157                    let sample = parser.parse_flow_sample()?;
158                    Ok(SampleData::FlowSample(sample))
159                }
160                2 => {
161                    let sample = parser.parse_counters_sample()?;
162                    Ok(SampleData::CountersSample(sample))
163                }
164                3 => {
165                    let sample = parser.parse_flow_sample_expanded()?;
166                    Ok(SampleData::FlowSampleExpanded(sample))
167                }
168                4 => {
169                    let sample = parser.parse_counters_sample_expanded()?;
170                    Ok(SampleData::CountersSampleExpanded(sample))
171                }
172                5 => {
173                    let sample = parser.parse_discarded_packet()?;
174                    Ok(SampleData::DiscardedPacket(sample))
175                }
176                _ => Ok(SampleData::Unknown { format, data }),
177            }
178        } else if format.enterprise() == 4300 {
179            // sFlow-RT (InMon Corp) formats
180            match format.format() {
181                1002 => Ok(SampleData::RtMetric { format, data }),
182                1003 => Ok(SampleData::RtFlow { format, data }),
183                _ => Ok(SampleData::Unknown { format, data }),
184            }
185        } else {
186            // Other vendor-specific formats
187            Ok(SampleData::Unknown { format, data })
188        }
189    }
190
191    /// Parse a sample record
192    fn parse_sample_record(&mut self) -> Result<SampleRecord> {
193        let sample_type = self.parse_data_format()?;
194        let sample_data_raw = self.read_opaque()?;
195        let sample_data = self.parse_sample_data(sample_type, sample_data_raw)?;
196
197        Ok(SampleRecord {
198            sample_type,
199            sample_data,
200        })
201    }
202
203    /// Parse an sFlow v5 datagram
204    pub fn parse_datagram(&mut self) -> Result<SFlowDatagram> {
205        // Parse version
206        let version = self.read_u32()?;
207        if version != 5 {
208            return Err(ParseError::InvalidData(format!(
209                "Invalid version: expected 5, got {}",
210                version
211            )));
212        }
213
214        // Parse agent address
215        let agent_address = self.parse_address()?;
216
217        // Parse sub-agent ID
218        let sub_agent_id = self.read_u32()?;
219
220        // Parse sequence number
221        let sequence_number = self.read_u32()?;
222
223        // Parse uptime
224        let uptime = self.read_u32()?;
225
226        // Parse samples array
227        let num_samples = self.read_u32()?;
228        // Limit capacity to prevent OOM attacks - allocate conservatively
229        let capacity = num_samples.min(1024) as usize;
230        let mut samples = Vec::with_capacity(capacity);
231        for _ in 0..num_samples {
232            samples.push(self.parse_sample_record()?);
233        }
234
235        Ok(SFlowDatagram {
236            version: DatagramVersion::Version5,
237            agent_address,
238            sub_agent_id,
239            sequence_number,
240            uptime,
241            samples,
242        })
243    }
244}
245
246/// Parse an sFlow v5 datagram from a byte slice
247pub fn parse_datagram(data: &[u8]) -> Result<SFlowDatagram> {
248    let mut parser = Parser::new(Cursor::new(data));
249    parser.parse_datagram()
250}
251
252/// Parse multiple sFlow v5 datagrams from a byte slice
253/// This is useful when multiple datagrams are concatenated (like in our test file)
254pub fn parse_datagrams(data: &[u8]) -> Result<Vec<SFlowDatagram>> {
255    let mut datagrams = Vec::new();
256    let mut cursor = Cursor::new(data);
257
258    loop {
259        let pos = cursor.position();
260        if pos >= data.len() as u64 {
261            break;
262        }
263
264        match Parser::new(&mut cursor).parse_datagram() {
265            Ok(datagram) => datagrams.push(datagram),
266            Err(ParseError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
267                // End of data
268                break;
269            }
270            Err(e) => return Err(e),
271        }
272    }
273
274    Ok(datagrams)
275}