sflow_parser/parser/
mod.rs

1//! sFlow v5 parser
2//!
3//! This module provides parsing functionality for sFlow v5 datagrams.
4//! All data is in network byte order (big-endian) as per XDR specification.
5
6mod datagram;
7mod error;
8mod parser_counters;
9mod parser_flows;
10
11// Re-export public types
12pub use datagram::{parse_datagram, parse_datagrams};
13pub use error::{ParseError, Result};
14
15use crate::models::*;
16use std::io::Read;
17use std::net::{Ipv4Addr, Ipv6Addr};
18
19/// Parser for sFlow v5 datagrams
20pub struct Parser<R: Read> {
21    reader: R,
22}
23
24impl<R: Read> Parser<R> {
25    /// Create a new parser from a reader
26    pub fn new(reader: R) -> Self {
27        Self { reader }
28    }
29
30    /// Read a u32 in network byte order (big-endian)
31    pub(crate) fn read_u32(&mut self) -> Result<u32> {
32        let mut buf = [0u8; 4];
33        self.reader.read_exact(&mut buf)?;
34        Ok(u32::from_be_bytes(buf))
35    }
36
37    /// Read a u64 in network byte order (big-endian)
38    pub(crate) fn read_u64(&mut self) -> Result<u64> {
39        let mut buf = [0u8; 8];
40        self.reader.read_exact(&mut buf)?;
41        Ok(u64::from_be_bytes(buf))
42    }
43
44    /// Read a u8
45    #[allow(dead_code)]
46    pub(crate) fn read_u8(&mut self) -> Result<u8> {
47        let mut buf = [0u8; 1];
48        self.reader.read_exact(&mut buf)?;
49        Ok(buf[0])
50    }
51
52    /// Read a string (length-prefixed opaque data converted to UTF-8)
53    pub(crate) fn read_string(&mut self) -> Result<String> {
54        let bytes = self.read_opaque()?;
55        Ok(String::from_utf8(bytes)?)
56    }
57
58    /// Read an opaque byte array (length-prefixed)
59    pub(crate) fn read_opaque(&mut self) -> Result<Vec<u8>> {
60        let length = self.read_u32()? as usize;
61
62        // Sanity check: reject unreasonably large allocations (> 100MB)
63        // Valid sFlow packets are typically much smaller
64        const MAX_OPAQUE_SIZE: usize = 100 * 1024 * 1024; // 100MB
65        if length > MAX_OPAQUE_SIZE {
66            return Err(ParseError::InvalidData(format!(
67                "Opaque data length {} exceeds maximum {}",
68                length, MAX_OPAQUE_SIZE
69            )));
70        }
71
72        let mut data = vec![0u8; length];
73        self.reader.read_exact(&mut data)?;
74
75        // XDR requires padding to 4-byte boundary
76        let padding = (4 - (length % 4)) % 4;
77        if padding > 0 {
78            let mut pad = vec![0u8; padding];
79            self.reader.read_exact(&mut pad)?;
80        }
81
82        Ok(data)
83    }
84
85    /// Read a fixed-size byte array
86    pub(crate) fn read_fixed(&mut self, size: usize) -> Result<Vec<u8>> {
87        let mut data = vec![0u8; size];
88        self.reader.read_exact(&mut data)?;
89        Ok(data)
90    }
91
92    /// Parse an address
93    pub(crate) fn parse_address(&mut self) -> Result<Address> {
94        let addr_type = self.read_u32()?;
95
96        match addr_type {
97            0 => Ok(Address::Unknown),
98            1 => {
99                let bytes = self.read_fixed(4)?;
100                let addr = Ipv4Addr::new(bytes[0], bytes[1], bytes[2], bytes[3]);
101                Ok(Address::IPv4(addr))
102            }
103            2 => {
104                let bytes = self.read_fixed(16)?;
105                let addr = Ipv6Addr::from(<[u8; 16]>::try_from(bytes).unwrap());
106                Ok(Address::IPv6(addr))
107            }
108            _ => Err(ParseError::InvalidData(format!(
109                "Invalid address type: {}",
110                addr_type
111            ))),
112        }
113    }
114
115    /// Parse a data format
116    pub(crate) fn parse_data_format(&mut self) -> Result<DataFormat> {
117        let value = self.read_u32()?;
118        Ok(DataFormat(value))
119    }
120
121    /// Parse a data source
122    pub(crate) fn parse_data_source(&mut self) -> Result<DataSource> {
123        let value = self.read_u32()?;
124        Ok(DataSource(value))
125    }
126
127    /// Parse an expanded data source
128    pub(crate) fn parse_data_source_expanded(&mut self) -> Result<DataSourceExpanded> {
129        let source_id_type = self.read_u32()?;
130        let source_id_index = self.read_u32()?;
131        Ok(DataSourceExpanded {
132            source_id_type,
133            source_id_index,
134        })
135    }
136
137    /// Parse an interface
138    pub(crate) fn parse_interface(&mut self) -> Result<Interface> {
139        let value = self.read_u32()?;
140        Ok(Interface(value))
141    }
142
143    /// Parse an expanded interface
144    pub(crate) fn parse_interface_expanded(&mut self) -> Result<InterfaceExpanded> {
145        let format = self.read_u32()?;
146        let value = self.read_u32()?;
147        Ok(InterfaceExpanded { format, value })
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use std::io::Cursor;
155
156    #[test]
157    fn test_data_format() {
158        let format = DataFormat::new(0, 1);
159        assert_eq!(format.enterprise(), 0);
160        assert_eq!(format.format(), 1);
161
162        let format = DataFormat::new(4413, 5);
163        assert_eq!(format.enterprise(), 4413);
164        assert_eq!(format.format(), 5);
165    }
166
167    #[test]
168    fn test_data_source() {
169        let source = DataSource::new(0, 42);
170        assert_eq!(source.source_type(), 0);
171        assert_eq!(source.index(), 42);
172
173        let source = DataSource::new(1, 100);
174        assert_eq!(source.source_type(), 1);
175        assert_eq!(source.index(), 100);
176    }
177
178    #[test]
179    fn test_interface() {
180        // Single interface
181        let iface = Interface(42);
182        assert!(iface.is_single());
183        assert_eq!(iface.value(), 42);
184
185        // Discarded packet
186        let iface = Interface(0x40000001);
187        assert!(iface.is_discarded());
188        assert_eq!(iface.value(), 1);
189
190        // Multiple interfaces
191        let iface = Interface(0x80000007);
192        assert!(iface.is_multiple());
193        assert_eq!(iface.value(), 7);
194    }
195
196    #[test]
197    fn test_parse_u32() {
198        let data = vec![0x00, 0x00, 0x00, 0x05];
199        let mut parser = Parser::new(Cursor::new(data));
200        assert_eq!(parser.read_u32().unwrap(), 5);
201    }
202
203    #[test]
204    fn test_parse_address_ipv4() {
205        let data = vec![
206            0x00, 0x00, 0x00, 0x01, // type = IPv4
207            0xC0, 0xA8, 0x01, 0x01, // 192.168.1.1
208        ];
209        let mut parser = Parser::new(Cursor::new(data));
210        let addr = parser.parse_address().unwrap();
211        assert_eq!(addr, Address::IPv4(Ipv4Addr::new(192, 168, 1, 1)));
212    }
213
214    #[test]
215    fn test_parse_address_unknown() {
216        let data = vec![0x00, 0x00, 0x00, 0x00]; // type = Unknown
217        let mut parser = Parser::new(Cursor::new(data));
218        let addr = parser.parse_address().unwrap();
219        assert_eq!(addr, Address::Unknown);
220    }
221}