Skip to main content

flowparser_sflow/
datagram.rs

1use nom::IResult;
2use nom::number::complete::{be_u32, be_u64};
3use serde::{Deserialize, Serialize};
4use std::net::{Ipv4Addr, Ipv6Addr};
5
6use crate::error::SflowError;
7use crate::samples::{SflowSample, parse_samples};
8
9/// An sFlow address, either IPv4 or IPv6.
10///
11/// Used for both agent addresses in the datagram header and
12/// next-hop addresses in extended router/gateway records.
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub enum AddressType {
15    /// IPv4 address.
16    IPv4(Ipv4Addr),
17    /// IPv6 address.
18    IPv6(Ipv6Addr),
19}
20
21/// A parsed sFlow v5 datagram containing header fields and samples.
22///
23/// Each datagram is sent by an sFlow agent and contains a header
24/// identifying the agent, plus zero or more flow or counter samples.
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26pub struct SflowDatagram {
27    /// sFlow version (always 5).
28    pub version: u32,
29    /// IP address of the sFlow agent.
30    pub agent_address: AddressType,
31    /// Sub-agent identifier (disambiguates multiple data sources).
32    pub sub_agent_id: u32,
33    /// Sequence number incremented per datagram from this agent.
34    pub sequence_number: u32,
35    /// Agent uptime in milliseconds since boot.
36    pub uptime: u32,
37    /// Flow and counter samples contained in this datagram.
38    pub samples: Vec<SflowSample>,
39}
40
41pub(crate) fn parse_address(input: &[u8]) -> IResult<&[u8], AddressType> {
42    let (input, addr_type) = be_u32(input)?;
43    match addr_type {
44        1 => {
45            let (input, a) = be_u32(input)?;
46            Ok((input, AddressType::IPv4(Ipv4Addr::from(a))))
47        }
48        2 => {
49            let (input, hi) = be_u64(input)?;
50            let (input, lo) = be_u64(input)?;
51            let mut octets = [0u8; 16];
52            octets[..8].copy_from_slice(&hi.to_be_bytes());
53            octets[8..].copy_from_slice(&lo.to_be_bytes());
54            Ok((input, AddressType::IPv6(Ipv6Addr::from(octets))))
55        }
56        _ => Err(nom::Err::Error(nom::error::Error::new(
57            input,
58            nom::error::ErrorKind::Switch,
59        ))),
60    }
61}
62
63pub(crate) fn parse_datagram(
64    input: &[u8],
65    max_samples: Option<u32>,
66) -> Result<(&[u8], SflowDatagram), SflowError> {
67    let original = input;
68
69    let (input, version) = be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
70        SflowError::Incomplete {
71            available: original.len(),
72            context: "datagram header version".to_string(),
73        }
74    })?;
75
76    if version != 5 {
77        return Err(SflowError::UnsupportedVersion { version });
78    }
79
80    let (input, agent_address) =
81        parse_address(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
82            SflowError::ParseError {
83                offset: original.len() - input.len(),
84                context: "agent address".to_string(),
85                kind: "invalid address type".to_string(),
86            }
87        })?;
88
89    let (input, sub_agent_id) =
90        be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
91            SflowError::Incomplete {
92                available: input.len(),
93                context: "sub_agent_id".to_string(),
94            }
95        })?;
96
97    let (input, sequence_number) =
98        be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
99            SflowError::Incomplete {
100                available: input.len(),
101                context: "sequence_number".to_string(),
102            }
103        })?;
104
105    let (input, uptime) = be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
106        SflowError::Incomplete {
107            available: input.len(),
108            context: "uptime".to_string(),
109        }
110    })?;
111
112    let (input, num_samples) =
113        be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
114            SflowError::Incomplete {
115                available: input.len(),
116                context: "num_samples".to_string(),
117            }
118        })?;
119
120    // Enforce max_samples limit before parsing to prevent DoS
121    if let Some(max) = max_samples
122        && num_samples > max
123    {
124        return Err(SflowError::TooManySamples {
125            count: num_samples,
126            max,
127        });
128    }
129
130    let (input, samples) = parse_samples(input, num_samples)?;
131
132    Ok((
133        input,
134        SflowDatagram {
135            version,
136            agent_address,
137            sub_agent_id,
138            sequence_number,
139            uptime,
140            samples,
141        },
142    ))
143}