Skip to main content

flowparser_sflow/samples/
mod.rs

1pub mod counter_sample;
2pub mod flow_sample;
3
4use nom::number::complete::be_u32;
5use serde::{Deserialize, Serialize};
6
7use crate::error::SflowError;
8pub use counter_sample::{CounterSample, ExpandedCounterSample};
9pub use flow_sample::{ExpandedFlowSample, FlowSample};
10
11/// An sFlow sample carried within a datagram.
12///
13/// Each datagram can contain a mix of flow samples (packet-level data)
14/// and counter samples (interface statistics). Expanded variants use
15/// separate fields for source ID type and index instead of a packed u32.
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
17pub enum SflowSample {
18    /// Standard flow sample (enterprise=0, format=1).
19    Flow(FlowSample),
20    /// Standard counter sample (enterprise=0, format=2).
21    Counter(CounterSample),
22    /// Expanded flow sample with unpacked source ID (enterprise=0, format=3).
23    ExpandedFlow(ExpandedFlowSample),
24    /// Expanded counter sample with unpacked source ID (enterprise=0, format=4).
25    ExpandedCounter(ExpandedCounterSample),
26    /// Unrecognized sample type, preserved as raw bytes.
27    Unknown {
28        /// Enterprise code from the sample header.
29        enterprise: u32,
30        /// Format code from the sample header.
31        format: u32,
32        /// Raw sample data.
33        data: Vec<u8>,
34    },
35}
36
37pub(crate) fn parse_samples(
38    mut input: &[u8],
39    num_samples: u32,
40) -> Result<(&[u8], Vec<SflowSample>), SflowError> {
41    // Cap capacity to prevent DoS: each sample needs at least 8 bytes (format + length)
42    let cap = (num_samples as usize).min(input.len() / 8);
43    let mut samples = Vec::with_capacity(cap);
44
45    for _ in 0..num_samples {
46        let (rest, data_format) =
47            be_u32(input).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
48                SflowError::Incomplete {
49                    available: input.len(),
50                    context: "sample data_format".to_string(),
51                }
52            })?;
53
54        let enterprise = data_format >> 12;
55        let format = data_format & 0xFFF;
56
57        let (rest, sample_length) =
58            be_u32(rest).map_err(|_: nom::Err<nom::error::Error<&[u8]>>| {
59                SflowError::Incomplete {
60                    available: rest.len(),
61                    context: "sample length".to_string(),
62                }
63            })?;
64
65        let sample_length = sample_length as usize;
66        if rest.len() < sample_length {
67            return Err(SflowError::Incomplete {
68                available: rest.len(),
69                context: format!("sample data (need {sample_length} bytes)"),
70            });
71        }
72
73        let sample_data = &rest[..sample_length];
74        let after_sample = &rest[sample_length..];
75
76        let sample = if enterprise == 0 {
77            match format {
78                1 => {
79                    let (_, fs) = flow_sample::parse_flow_sample(sample_data).map_err(|e| {
80                        SflowError::ParseError {
81                            offset: 0,
82                            context: "flow sample".to_string(),
83                            kind: nom_error_kind(&e),
84                        }
85                    })?;
86                    SflowSample::Flow(fs)
87                }
88                2 => {
89                    let (_, cs) =
90                        counter_sample::parse_counter_sample(sample_data).map_err(|e| {
91                            SflowError::ParseError {
92                                offset: 0,
93                                context: "counter sample".to_string(),
94                                kind: nom_error_kind(&e),
95                            }
96                        })?;
97                    SflowSample::Counter(cs)
98                }
99                3 => {
100                    let (_, efs) = flow_sample::parse_expanded_flow_sample(sample_data)
101                        .map_err(|e| SflowError::ParseError {
102                            offset: 0,
103                            context: "expanded flow sample".to_string(),
104                            kind: nom_error_kind(&e),
105                        })?;
106                    SflowSample::ExpandedFlow(efs)
107                }
108                4 => {
109                    let (_, ecs) = counter_sample::parse_expanded_counter_sample(sample_data)
110                        .map_err(|e| SflowError::ParseError {
111                        offset: 0,
112                        context: "expanded counter sample".to_string(),
113                        kind: nom_error_kind(&e),
114                    })?;
115                    SflowSample::ExpandedCounter(ecs)
116                }
117                _ => SflowSample::Unknown {
118                    enterprise,
119                    format,
120                    data: sample_data.to_vec(),
121                },
122            }
123        } else {
124            SflowSample::Unknown {
125                enterprise,
126                format,
127                data: sample_data.to_vec(),
128            }
129        };
130
131        samples.push(sample);
132        input = after_sample;
133    }
134
135    Ok((input, samples))
136}
137
138fn nom_error_kind(e: &nom::Err<nom::error::Error<&[u8]>>) -> String {
139    match e {
140        nom::Err::Error(e) | nom::Err::Failure(e) => format!("{:?}", e.code),
141        nom::Err::Incomplete(_) => "incomplete".to_string(),
142    }
143}