Skip to main content

flowparser_sflow/
datagram.rs

1use nom::IResult;
2use nom::number::complete::{be_u32, be_u64};
3use serde::{Deserialize, Serialize};
4use std::net::{Ipv4Addr, Ipv6Addr};
5
6use crate::error::{ParseContext, ParseErrorKind, SflowError};
7use crate::samples::{SflowSample, parse_samples};
8
9/// An sFlow address, either IPv4 or IPv6.
10///
11/// Used for both agent addresses in the datagram header and
12/// next-hop addresses in extended router/gateway records.
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub enum AddressType {
15    /// IPv4 address.
16    IPv4(Ipv4Addr),
17    /// IPv6 address.
18    IPv6(Ipv6Addr),
19}
20
21/// A parsed sFlow v5 datagram containing header fields and samples.
22///
23/// Each datagram is sent by an sFlow agent and contains a header
24/// identifying the agent, plus zero or more flow or counter samples.
25#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
26pub struct SflowDatagram {
27    /// sFlow version (always 5).
28    pub version: u32,
29    /// IP address of the sFlow agent.
30    pub agent_address: AddressType,
31    /// Sub-agent identifier (disambiguates multiple data sources).
32    pub sub_agent_id: u32,
33    /// Sequence number incremented per datagram from this agent.
34    pub sequence_number: u32,
35    /// Agent uptime in milliseconds since boot.
36    pub uptime: u32,
37    /// Flow and counter samples contained in this datagram.
38    pub samples: Vec<SflowSample>,
39}
40
41pub(crate) fn parse_address(input: &[u8]) -> IResult<&[u8], AddressType> {
42    let (input, addr_type) = be_u32(input)?;
43    match addr_type {
44        1 => {
45            let (input, a) = be_u32(input)?;
46            Ok((input, AddressType::IPv4(Ipv4Addr::from(a))))
47        }
48        2 => {
49            let (input, hi) = be_u64(input)?;
50            let (input, lo) = be_u64(input)?;
51            let mut octets = [0u8; 16];
52            octets[..8].copy_from_slice(&hi.to_be_bytes());
53            octets[8..].copy_from_slice(&lo.to_be_bytes());
54            Ok((input, AddressType::IPv6(Ipv6Addr::from(octets))))
55        }
56        _ => Err(nom::Err::Error(nom::error::Error::new(
57            input,
58            nom::error::ErrorKind::Switch,
59        ))),
60    }
61}
62
63pub(crate) fn parse_datagram(
64    input: &[u8],
65    max_samples: Option<u32>,
66) -> Result<(&[u8], SflowDatagram), SflowError> {
67    let original = input;
68
69    let (input, version) = be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
70        SflowError::Incomplete {
71            available: original.len(),
72            expected: None,
73            context: ParseContext::DatagramHeaderVersion,
74        }
75    })?;
76
77    if version != 5 {
78        return Err(SflowError::UnsupportedVersion { version });
79    }
80
81    let (input, agent_address) =
82        parse_address(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
83            SflowError::ParseError {
84                offset: original.len() - input.len(),
85                context: ParseContext::AgentAddress,
86                kind: ParseErrorKind::InvalidAddressType,
87            }
88        })?;
89
90    let (input, sub_agent_id) =
91        be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
92            SflowError::Incomplete {
93                available: input.len(),
94                expected: None,
95                context: ParseContext::SubAgentId,
96            }
97        })?;
98
99    let (input, sequence_number) =
100        be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
101            SflowError::Incomplete {
102                available: input.len(),
103                expected: None,
104                context: ParseContext::SequenceNumber,
105            }
106        })?;
107
108    let (input, uptime) = be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
109        SflowError::Incomplete {
110            available: input.len(),
111            expected: None,
112            context: ParseContext::Uptime,
113        }
114    })?;
115
116    let (input, num_samples) =
117        be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
118            SflowError::Incomplete {
119                available: input.len(),
120                expected: None,
121                context: ParseContext::NumSamples,
122            }
123        })?;
124
125    // Enforce max_samples limit before parsing to prevent DoS
126    if let Some(max) = max_samples
127        && num_samples > max
128    {
129        return Err(SflowError::TooManySamples {
130            count: num_samples,
131            max,
132        });
133    }
134
135    let (input, samples) = parse_samples(input, num_samples)?;
136
137    Ok((
138        input,
139        SflowDatagram {
140            version,
141            agent_address,
142            sub_agent_id,
143            sequence_number,
144            uptime,
145            samples,
146        },
147    ))
148}