sflow_parser/parsers/
mod.rs

1//! sFlow v5 parser
2//!
3//! This module provides parsing functionality for sFlow v5 datagrams.
4//! All data is in network byte order (big-endian) as per XDR specification.
5
6mod datagram;
7mod error;
8mod parser_counters;
9mod parser_flows;
10
11// Re-export public types
12pub use datagram::{parse_datagram, parse_datagrams};
13pub use error::{ParseError, Result};
14
15use crate::models::*;
16use std::io::Read;
17use std::net::{Ipv4Addr, Ipv6Addr};
18
19/// Parser for sFlow v5 datagrams
20pub struct Parser<R: Read> {
21    reader: R,
22}
23
24impl<R: Read> Parser<R> {
25    /// Create a new parser from a reader
26    pub fn new(reader: R) -> Self {
27        Self { reader }
28    }
29
30    /// Read a u32 in network byte order (big-endian)
31    pub(crate) fn read_u32(&mut self) -> Result<u32> {
32        let mut buf = [0u8; 4];
33        self.reader.read_exact(&mut buf)?;
34        Ok(u32::from_be_bytes(buf))
35    }
36
37    /// Read a u64 in network byte order (big-endian)
38    pub(crate) fn read_u64(&mut self) -> Result<u64> {
39        let mut buf = [0u8; 8];
40        self.reader.read_exact(&mut buf)?;
41        Ok(u64::from_be_bytes(buf))
42    }
43
44    /// Read an i32 in network byte order (big-endian)
45    pub(crate) fn read_i32(&mut self) -> Result<i32> {
46        let mut buf = [0u8; 4];
47        self.reader.read_exact(&mut buf)?;
48        Ok(i32::from_be_bytes(buf))
49    }
50
51    /// Read a u8
52    #[allow(dead_code)]
53    pub(crate) fn read_u8(&mut self) -> Result<u8> {
54        let mut buf = [0u8; 1];
55        self.reader.read_exact(&mut buf)?;
56        Ok(buf[0])
57    }
58
59    /// Read a string (length-prefixed opaque data converted to UTF-8)
60    pub(crate) fn read_string(&mut self) -> Result<String> {
61        let bytes = self.read_opaque()?;
62        Ok(String::from_utf8(bytes)?)
63    }
64
65    /// Read an opaque byte array (length-prefixed)
66    pub(crate) fn read_opaque(&mut self) -> Result<Vec<u8>> {
67        let length = self.read_u32()? as usize;
68
69        // Sanity check: reject unreasonably large allocations (> 100MB)
70        // Valid sFlow packets are typically much smaller
71        const MAX_OPAQUE_SIZE: usize = 100 * 1024 * 1024; // 100MB
72        if length > MAX_OPAQUE_SIZE {
73            return Err(ParseError::InvalidData(format!(
74                "Opaque data length {} exceeds maximum {}",
75                length, MAX_OPAQUE_SIZE
76            )));
77        }
78
79        let mut data = vec![0u8; length];
80        self.reader.read_exact(&mut data)?;
81
82        // XDR requires padding to 4-byte boundary
83        let padding = (4 - (length % 4)) % 4;
84        if padding > 0 {
85            let mut pad = vec![0u8; padding];
86            self.reader.read_exact(&mut pad)?;
87        }
88
89        Ok(data)
90    }
91
92    /// Read a fixed-size byte array
93    pub(crate) fn read_fixed(&mut self, size: usize) -> Result<Vec<u8>> {
94        let mut data = vec![0u8; size];
95        self.reader.read_exact(&mut data)?;
96        Ok(data)
97    }
98
99    /// Parse an address
100    pub(crate) fn parse_address(&mut self) -> Result<Address> {
101        let addr_type = self.read_u32()?;
102
103        match addr_type {
104            0 => Ok(Address::Unknown),
105            1 => {
106                let bytes = self.read_fixed(4)?;
107                let addr = Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]);
108                Ok(Address::IPv4(addr))
109            }
110            2 => {
111                let bytes = self.read_fixed(16)?;
112                let addr = Ipv6Addr::from(<[u8; 16]>::try_from(bytes).unwrap());
113                Ok(Address::IPv6(addr))
114            }
115            _ => Err(ParseError::InvalidData(format!(
116                "Invalid address type: {}",
117                addr_type
118            ))),
119        }
120    }
121
122    /// Parse a data format
123    pub(crate) fn parse_data_format(&mut self) -> Result<DataFormat> {
124        let value = self.read_u32()?;
125        Ok(DataFormat(value))
126    }
127
128    /// Parse a data source
129    pub(crate) fn parse_data_source(&mut self) -> Result<DataSource> {
130        let value = self.read_u32()?;
131        Ok(DataSource(value))
132    }
133
134    /// Parse an expanded data source
135    pub(crate) fn parse_data_source_expanded(&mut self) -> Result<DataSourceExpanded> {
136        let source_id_type = self.read_u32()?;
137        let source_id_index = self.read_u32()?;
138        Ok(DataSourceExpanded {
139            source_id_type,
140            source_id_index,
141        })
142    }
143
144    /// Parse an interface
145    pub(crate) fn parse_interface(&mut self) -> Result<Interface> {
146        let value = self.read_u32()?;
147        Ok(Interface(value))
148    }
149
150    /// Parse an expanded interface
151    pub(crate) fn parse_interface_expanded(&mut self) -> Result<InterfaceExpanded> {
152        let format = self.read_u32()?;
153        let value = self.read_u32()?;
154        Ok(InterfaceExpanded { format, value })
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use std::io::Cursor;
162
163    #[test]
164    fn test_data_format() {
165        let format = DataFormat::new(0, 1);
166        assert_eq!(format.enterprise(), 0);
167        assert_eq!(format.format(), 1);
168
169        let format = DataFormat::new(4413, 5);
170        assert_eq!(format.enterprise(), 4413);
171        assert_eq!(format.format(), 5);
172    }
173
174    #[test]
175    fn test_data_source() {
176        let source = DataSource::new(0, 42);
177        assert_eq!(source.source_type(), 0);
178        assert_eq!(source.index(), 42);
179
180        let source = DataSource::new(1, 100);
181        assert_eq!(source.source_type(), 1);
182        assert_eq!(source.index(), 100);
183    }
184
185    #[test]
186    fn test_interface() {
187        // Single interface
188        let iface = Interface(42);
189        assert!(iface.is_single());
190        assert_eq!(iface.value(), 42);
191
192        // Discarded packet - ACL drop
193        // ERRATUM: Spec page 28 example corrected from 0x40000001 to 0x40000102
194        let iface = Interface(0x40000102);
195        assert!(iface.is_discarded());
196        assert_eq!(iface.value(), 0x102);
197
198        // Discarded packet - generic example
199        let iface = Interface(0x40000001);
200        assert!(iface.is_discarded());
201        assert_eq!(iface.value(), 1);
202
203        // Multiple interfaces
204        let iface = Interface(0x80000007);
205        assert!(iface.is_multiple());
206        assert_eq!(iface.value(), 7);
207    }
208
209    #[test]
210    fn test_parse_u32() {
211        let data = vec![0x00, 0x00, 0x00, 0x05];
212        let mut parser = Parser::new(Cursor::new(data));
213        assert_eq!(parser.read_u32().unwrap(), 5);
214    }
215
216    #[test]
217    fn test_parse_address_ipv4() {
218        let data = vec![
219            0x00, 0x00, 0x00, 0x01, // type = IPv4
220            0xC0, 0xA8, 0x01, 0x01, // 192.168.1.1
221        ];
222        let mut parser = Parser::new(Cursor::new(data));
223        let addr = parser.parse_address().unwrap();
224        assert_eq!(addr, Address::IPv4(Ipv4Addr::new(192, 168, 1, 1)));
225    }
226
227    #[test]
228    fn test_parse_address_unknown() {
229        let data = vec![0x00, 0x00, 0x00, 0x00]; // type = Unknown
230        let mut parser = Parser::new(Cursor::new(data));
231        let addr = parser.parse_address().unwrap();
232        assert_eq!(addr, Address::Unknown);
233    }
234}