Skip to main content

flowparser_sflow/flow_records/
mod.rs

1pub mod extended_gateway;
2pub mod extended_router;
3pub mod extended_switch;
4pub mod extended_url;
5pub mod extended_user;
6pub mod raw_packet_header;
7pub mod sampled_ethernet;
8pub mod sampled_ipv4;
9pub mod sampled_ipv6;
10
11use nom::IResult;
12use nom::bytes::complete::take;
13use nom::number::complete::be_u32;
14use serde::{Deserialize, Serialize};
15
16pub use extended_gateway::ExtendedGateway;
17pub use extended_router::ExtendedRouter;
18pub use extended_switch::ExtendedSwitch;
19pub use extended_url::ExtendedUrl;
20pub use extended_user::ExtendedUser;
21pub use raw_packet_header::RawPacketHeader;
22pub use sampled_ethernet::SampledEthernet;
23pub use sampled_ipv4::SampledIpv4;
24pub use sampled_ipv6::SampledIpv6;
25
26/// A flow record within a flow sample.
27///
28/// Flow records describe properties of a sampled packet, ranging from
29/// raw header bytes to decoded L2/L3/L4 fields and extended routing data.
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub enum FlowRecord {
32    /// Raw packet header bytes (enterprise=0, format=1).
33    RawPacketHeader(RawPacketHeader),
34    /// Sampled Ethernet frame header (enterprise=0, format=2).
35    SampledEthernet(SampledEthernet),
36    /// Sampled IPv4 packet header (enterprise=0, format=3).
37    SampledIpv4(SampledIpv4),
38    /// Sampled IPv6 packet header (enterprise=0, format=4).
39    SampledIpv6(SampledIpv6),
40    /// Extended switch data — VLAN and priority (enterprise=0, format=1001).
41    ExtendedSwitch(ExtendedSwitch),
42    /// Extended router data — next hop and masks (enterprise=0, format=1002).
43    ExtendedRouter(ExtendedRouter),
44    /// Extended gateway data — BGP AS path and communities (enterprise=0, format=1003).
45    ExtendedGateway(ExtendedGateway),
46    /// Extended user data — source and destination user identifiers (enterprise=0, format=1004).
47    ExtendedUser(ExtendedUser),
48    /// Extended URL data — URL and host strings (enterprise=0, format=1005).
49    ExtendedUrl(ExtendedUrl),
50    /// Unrecognized flow record type, preserved as raw bytes.
51    Unknown {
52        /// Enterprise code from the record header.
53        enterprise: u32,
54        /// Format code from the record header.
55        format: u32,
56        /// Raw record data.
57        data: Vec<u8>,
58    },
59}
60
61/// Parse an XDR-encoded sFlow string (length-prefixed, padded to 4-byte boundary).
62///
63/// Note: Invalid UTF-8 bytes are replaced with U+FFFD (replacement character).
64pub(crate) fn parse_sflow_string(input: &[u8]) -> IResult<&[u8], String> {
65    let (input, length) = be_u32(input)?;
66    let (input, bytes) = take(length as usize)(input)?;
67    // Pad to 4-byte boundary
68    let padding = (4 - (length as usize % 4)) % 4;
69    let (input, _) = take(padding)(input)?;
70    let s = String::from_utf8_lossy(bytes).into_owned();
71    Ok((input, s))
72}
73
74pub(crate) fn parse_flow_records(
75    mut input: &[u8],
76    num_records: u32,
77) -> IResult<&[u8], Vec<FlowRecord>> {
78    // Cap capacity to prevent DoS: each record needs at least 8 bytes (format + length)
79    let cap = (num_records as usize).min(input.len() / 8);
80    let mut records = Vec::with_capacity(cap);
81
82    for _ in 0..num_records {
83        let (rest, data_format) = be_u32(input)?;
84        let enterprise = data_format >> 12;
85        let format = data_format & 0xFFF;
86
87        let (rest, record_length) = be_u32(rest)?;
88        let record_length = record_length as usize;
89
90        if rest.len() < record_length {
91            return Err(nom::Err::Error(nom::error::Error::new(
92                rest,
93                nom::error::ErrorKind::Eof,
94            )));
95        }
96
97        let record_data = &rest[..record_length];
98        let after_record = &rest[record_length..];
99
100        let record = if enterprise == 0 {
101            match format {
102                1 => {
103                    let (_, r) = raw_packet_header::parse_raw_packet_header(record_data)?;
104                    FlowRecord::RawPacketHeader(r)
105                }
106                2 => {
107                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
108                    FlowRecord::SampledEthernet(r)
109                }
110                3 => {
111                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
112                    FlowRecord::SampledIpv4(r)
113                }
114                4 => {
115                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
116                    FlowRecord::SampledIpv6(r)
117                }
118                1001 => {
119                    let (_, r) = extended_switch::parse_extended_switch(record_data)?;
120                    FlowRecord::ExtendedSwitch(r)
121                }
122                1002 => {
123                    let (_, r) = extended_router::parse_extended_router(record_data)?;
124                    FlowRecord::ExtendedRouter(r)
125                }
126                1003 => {
127                    let (_, r) = extended_gateway::parse_extended_gateway(record_data)?;
128                    FlowRecord::ExtendedGateway(r)
129                }
130                1004 => {
131                    let (_, r) = extended_user::parse_extended_user(record_data)?;
132                    FlowRecord::ExtendedUser(r)
133                }
134                1005 => {
135                    let (_, r) = extended_url::parse_extended_url(record_data)?;
136                    FlowRecord::ExtendedUrl(r)
137                }
138                _ => FlowRecord::Unknown {
139                    enterprise,
140                    format,
141                    data: record_data.to_vec(),
142                },
143            }
144        } else {
145            FlowRecord::Unknown {
146                enterprise,
147                format,
148                data: record_data.to_vec(),
149            }
150        };
151
152        records.push(record);
153        input = after_record;
154    }
155
156    Ok((input, records))
157}