Skip to main content

flowparser_sflow/flow_records/
mod.rs

1pub mod app_operation;
2pub mod extended_80211_payload;
3pub mod extended_80211_rx;
4pub mod extended_80211_tx;
5pub mod extended_acl;
6pub mod extended_decapsulate;
7pub mod extended_egress_queue;
8pub mod extended_function;
9pub mod extended_gateway;
10pub mod extended_mpls;
11pub mod extended_mpls_ftn;
12pub mod extended_mpls_ldp_fec;
13pub mod extended_mpls_tunnel;
14pub mod extended_mpls_vc;
15pub mod extended_nat;
16pub mod extended_proxy_request;
17pub mod extended_proxy_socket_ipv4;
18pub mod extended_proxy_socket_ipv6;
19pub mod extended_queue;
20pub mod extended_router;
21pub mod extended_socket_ipv4;
22pub mod extended_socket_ipv6;
23pub mod extended_switch;
24pub mod extended_transit;
25pub mod extended_url;
26pub mod extended_user;
27pub mod extended_vlan_tunnel;
28pub mod extended_vni;
29pub mod http_request;
30pub mod jvm_runtime;
31pub mod memcache_operation;
32pub mod raw_packet_header;
33pub mod sampled_ethernet;
34pub mod sampled_ipv4;
35pub mod sampled_ipv6;
36
37use nom::IResult;
38use nom::bytes::complete::take;
39use nom::number::complete::be_u32;
40use serde::{Deserialize, Serialize};
41
42pub use app_operation::AppOperation;
43pub use extended_80211_payload::Extended80211Payload;
44pub use extended_80211_rx::Extended80211Rx;
45pub use extended_80211_tx::Extended80211Tx;
46pub use extended_acl::ExtendedAcl;
47pub use extended_decapsulate::{ExtendedDecapsulateEgress, ExtendedDecapsulateIngress};
48pub use extended_egress_queue::ExtendedEgressQueue;
49pub use extended_function::ExtendedFunction;
50pub use extended_gateway::ExtendedGateway;
51pub use extended_mpls::ExtendedMpls;
52pub use extended_mpls_ftn::ExtendedMplsFtn;
53pub use extended_mpls_ldp_fec::ExtendedMplsLdpFec;
54pub use extended_mpls_tunnel::ExtendedMplsTunnel;
55pub use extended_mpls_vc::ExtendedMplsVc;
56pub use extended_nat::ExtendedNat;
57pub use extended_proxy_request::ExtendedProxyRequest;
58pub use extended_proxy_socket_ipv4::ExtendedProxySocketIpv4;
59pub use extended_proxy_socket_ipv6::ExtendedProxySocketIpv6;
60pub use extended_queue::ExtendedQueue;
61pub use extended_router::ExtendedRouter;
62pub use extended_socket_ipv4::ExtendedSocketIpv4;
63pub use extended_socket_ipv6::ExtendedSocketIpv6;
64pub use extended_switch::ExtendedSwitch;
65pub use extended_transit::ExtendedTransit;
66pub use extended_url::ExtendedUrl;
67pub use extended_user::ExtendedUser;
68pub use extended_vlan_tunnel::ExtendedVlanTunnel;
69pub use extended_vni::{ExtendedVniEgress, ExtendedVniIngress};
70pub use http_request::HttpRequest;
71pub use jvm_runtime::JvmRuntime;
72pub use memcache_operation::MemcacheOperation;
73pub use raw_packet_header::RawPacketHeader;
74pub use sampled_ethernet::SampledEthernet;
75pub use sampled_ipv4::SampledIpv4;
76pub use sampled_ipv6::SampledIpv6;
77
78/// A flow record within a flow sample.
79///
80/// Flow records describe properties of a sampled packet, ranging from
81/// raw header bytes to decoded L2/L3/L4 fields and extended routing data.
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub enum FlowRecord {
84    /// Raw packet header bytes (enterprise=0, format=1).
85    RawPacketHeader(RawPacketHeader),
86    /// Sampled Ethernet frame header (enterprise=0, format=2).
87    SampledEthernet(SampledEthernet),
88    /// Sampled IPv4 packet header (enterprise=0, format=3).
89    SampledIpv4(SampledIpv4),
90    /// Sampled IPv6 packet header (enterprise=0, format=4).
91    SampledIpv6(SampledIpv6),
92    /// Extended switch data — VLAN and priority (enterprise=0, format=1001).
93    ExtendedSwitch(ExtendedSwitch),
94    /// Extended router data — next hop and masks (enterprise=0, format=1002).
95    ExtendedRouter(ExtendedRouter),
96    /// Extended gateway data — BGP AS path and communities (enterprise=0, format=1003).
97    ExtendedGateway(ExtendedGateway),
98    /// Extended user data — source and destination user identifiers (enterprise=0, format=1004).
99    ExtendedUser(ExtendedUser),
100    /// Extended URL data — URL and host strings (enterprise=0, format=1005).
101    ExtendedUrl(ExtendedUrl),
102    /// Extended MPLS data — next hop and label stacks (enterprise=0, format=1006).
103    ExtendedMpls(ExtendedMpls),
104    /// Extended NAT data — translated source and destination addresses (enterprise=0, format=1007).
105    ExtendedNat(ExtendedNat),
106    /// Extended MPLS tunnel data (enterprise=0, format=1008).
107    ExtendedMplsTunnel(ExtendedMplsTunnel),
108    /// Extended MPLS virtual circuit data (enterprise=0, format=1009).
109    ExtendedMplsVc(ExtendedMplsVc),
110    /// Extended MPLS FEC to NHLFE mapping (enterprise=0, format=1010).
111    ExtendedMplsFtn(ExtendedMplsFtn),
112    /// Extended MPLS LDP FEC data (enterprise=0, format=1011).
113    ExtendedMplsLdpFec(ExtendedMplsLdpFec),
114    /// Extended VLAN tunnel data — 802.1Q-in-Q stack (enterprise=0, format=1012).
115    ExtendedVlanTunnel(ExtendedVlanTunnel),
116    /// Extended 802.11 payload data (enterprise=0, format=1013).
117    Extended80211Payload(Extended80211Payload),
118    /// Extended 802.11 receive data (enterprise=0, format=1014).
119    Extended80211Rx(Extended80211Rx),
120    /// Extended 802.11 transmit data (enterprise=0, format=1015).
121    Extended80211Tx(Extended80211Tx),
122    /// Extended L2 tunnel egress — reuses sampled Ethernet (enterprise=0, format=1021).
123    ExtendedL2TunnelEgress(SampledEthernet),
124    /// Extended L2 tunnel ingress — reuses sampled Ethernet (enterprise=0, format=1022).
125    ExtendedL2TunnelIngress(SampledEthernet),
126    /// Extended IPv4 tunnel egress — reuses sampled IPv4 (enterprise=0, format=1023).
127    ExtendedIpv4TunnelEgress(SampledIpv4),
128    /// Extended IPv4 tunnel ingress — reuses sampled IPv4 (enterprise=0, format=1024).
129    ExtendedIpv4TunnelIngress(SampledIpv4),
130    /// Extended IPv6 tunnel egress — reuses sampled IPv6 (enterprise=0, format=1025).
131    ExtendedIpv6TunnelEgress(SampledIpv6),
132    /// Extended IPv6 tunnel ingress — reuses sampled IPv6 (enterprise=0, format=1026).
133    ExtendedIpv6TunnelIngress(SampledIpv6),
134    /// Extended decapsulate egress data (enterprise=0, format=1027).
135    ExtendedDecapsulateEgress(ExtendedDecapsulateEgress),
136    /// Extended decapsulate ingress data (enterprise=0, format=1028).
137    ExtendedDecapsulateIngress(ExtendedDecapsulateIngress),
138    /// Extended VNI egress data (enterprise=0, format=1029).
139    ExtendedVniEgress(ExtendedVniEgress),
140    /// Extended VNI ingress data (enterprise=0, format=1030).
141    ExtendedVniIngress(ExtendedVniIngress),
142    /// Extended egress queue identifier (enterprise=0, format=1036).
143    ExtendedEgressQueue(ExtendedEgressQueue),
144    /// Extended ACL data (enterprise=0, format=1037).
145    ExtendedAcl(ExtendedAcl),
146    /// Extended function/symbol data (enterprise=0, format=1038).
147    ExtendedFunction(ExtendedFunction),
148    /// Extended transit delay data (enterprise=0, format=1039).
149    ExtendedTransit(ExtendedTransit),
150    /// Extended queue depth data (enterprise=0, format=1040).
151    ExtendedQueue(ExtendedQueue),
152    /// Extended socket IPv4 data (enterprise=0, format=2100).
153    ExtendedSocketIpv4(ExtendedSocketIpv4),
154    /// Extended socket IPv6 data (enterprise=0, format=2101).
155    ExtendedSocketIpv6(ExtendedSocketIpv6),
156    /// Extended proxy socket IPv4 data (enterprise=0, format=2102).
157    ExtendedProxySocketIpv4(ExtendedProxySocketIpv4),
158    /// Extended proxy socket IPv6 data (enterprise=0, format=2103).
159    ExtendedProxySocketIpv6(ExtendedProxySocketIpv6),
160    /// JVM runtime information (enterprise=0, format=2105).
161    JvmRuntime(JvmRuntime),
162    /// Memcache operation data (enterprise=0, format=2200).
163    MemcacheOperation(MemcacheOperation),
164    /// Application operation data (enterprise=0, format=2202).
165    AppOperation(AppOperation),
166    /// HTTP request data (enterprise=0, format=2206).
167    HttpRequest(HttpRequest),
168    /// Extended proxy request data (enterprise=0, format=2207).
169    ExtendedProxyRequest(ExtendedProxyRequest),
170    /// Unrecognized flow record type, preserved as raw bytes.
171    Unknown {
172        /// Enterprise code from the record header.
173        enterprise: u32,
174        /// Format code from the record header.
175        format: u32,
176        /// Raw record data.
177        data: Vec<u8>,
178    },
179}
180
181/// Parse an XDR-encoded sFlow string (length-prefixed, padded to 4-byte boundary).
182///
183/// Note: Invalid UTF-8 bytes are replaced with U+FFFD (replacement character).
184pub(crate) fn parse_sflow_string(input: &[u8]) -> IResult<&[u8], String> {
185    let (input, length) = be_u32(input)?;
186    let (input, bytes) = take(length as usize)(input)?;
187    // Pad to 4-byte boundary
188    let padding = (4 - (length as usize % 4)) % 4;
189    let (input, _) = take(padding)(input)?;
190    let s = String::from_utf8_lossy(bytes).into_owned();
191    Ok((input, s))
192}
193
194pub(crate) fn parse_flow_records(
195    mut input: &[u8],
196    num_records: u32,
197) -> IResult<&[u8], Vec<FlowRecord>> {
198    // Cap capacity to prevent DoS: each record needs at least 8 bytes (format + length)
199    let cap = (num_records as usize).min(input.len() / 8);
200    let mut records = Vec::with_capacity(cap);
201
202    for _ in 0..num_records {
203        let (rest, data_format) = be_u32(input)?;
204        let enterprise = data_format >> 12;
205        let format = data_format & 0xFFF;
206
207        let (rest, record_length) = be_u32(rest)?;
208        let record_length = record_length as usize;
209
210        if rest.len() < record_length {
211            return Err(nom::Err::Error(nom::error::Error::new(
212                rest,
213                nom::error::ErrorKind::Eof,
214            )));
215        }
216
217        let record_data = &rest[..record_length];
218        let after_record = &rest[record_length..];
219
220        let record = if enterprise == 0 {
221            match format {
222                1 => {
223                    let (_, r) = raw_packet_header::parse_raw_packet_header(record_data)?;
224                    FlowRecord::RawPacketHeader(r)
225                }
226                2 => {
227                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
228                    FlowRecord::SampledEthernet(r)
229                }
230                3 => {
231                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
232                    FlowRecord::SampledIpv4(r)
233                }
234                4 => {
235                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
236                    FlowRecord::SampledIpv6(r)
237                }
238                1001 => {
239                    let (_, r) = extended_switch::parse_extended_switch(record_data)?;
240                    FlowRecord::ExtendedSwitch(r)
241                }
242                1002 => {
243                    let (_, r) = extended_router::parse_extended_router(record_data)?;
244                    FlowRecord::ExtendedRouter(r)
245                }
246                1003 => {
247                    let (_, r) = extended_gateway::parse_extended_gateway(record_data)?;
248                    FlowRecord::ExtendedGateway(r)
249                }
250                1004 => {
251                    let (_, r) = extended_user::parse_extended_user(record_data)?;
252                    FlowRecord::ExtendedUser(r)
253                }
254                1005 => {
255                    let (_, r) = extended_url::parse_extended_url(record_data)?;
256                    FlowRecord::ExtendedUrl(r)
257                }
258                1006 => {
259                    let (_, r) = extended_mpls::parse_extended_mpls(record_data)?;
260                    FlowRecord::ExtendedMpls(r)
261                }
262                1007 => {
263                    let (_, r) = extended_nat::parse_extended_nat(record_data)?;
264                    FlowRecord::ExtendedNat(r)
265                }
266                1008 => {
267                    let (_, r) = extended_mpls_tunnel::parse_extended_mpls_tunnel(record_data)?;
268                    FlowRecord::ExtendedMplsTunnel(r)
269                }
270                1009 => {
271                    let (_, r) = extended_mpls_vc::parse_extended_mpls_vc(record_data)?;
272                    FlowRecord::ExtendedMplsVc(r)
273                }
274                1010 => {
275                    let (_, r) = extended_mpls_ftn::parse_extended_mpls_ftn(record_data)?;
276                    FlowRecord::ExtendedMplsFtn(r)
277                }
278                1011 => {
279                    let (_, r) =
280                        extended_mpls_ldp_fec::parse_extended_mpls_ldp_fec(record_data)?;
281                    FlowRecord::ExtendedMplsLdpFec(r)
282                }
283                1012 => {
284                    let (_, r) = extended_vlan_tunnel::parse_extended_vlan_tunnel(record_data)?;
285                    FlowRecord::ExtendedVlanTunnel(r)
286                }
287                1013 => {
288                    let (_, r) =
289                        extended_80211_payload::parse_extended_80211_payload(record_data)?;
290                    FlowRecord::Extended80211Payload(r)
291                }
292                1014 => {
293                    let (_, r) = extended_80211_rx::parse_extended_80211_rx(record_data)?;
294                    FlowRecord::Extended80211Rx(r)
295                }
296                1015 => {
297                    let (_, r) = extended_80211_tx::parse_extended_80211_tx(record_data)?;
298                    FlowRecord::Extended80211Tx(r)
299                }
300                1021 => {
301                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
302                    FlowRecord::ExtendedL2TunnelEgress(r)
303                }
304                1022 => {
305                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
306                    FlowRecord::ExtendedL2TunnelIngress(r)
307                }
308                1023 => {
309                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
310                    FlowRecord::ExtendedIpv4TunnelEgress(r)
311                }
312                1024 => {
313                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
314                    FlowRecord::ExtendedIpv4TunnelIngress(r)
315                }
316                1025 => {
317                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
318                    FlowRecord::ExtendedIpv6TunnelEgress(r)
319                }
320                1026 => {
321                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
322                    FlowRecord::ExtendedIpv6TunnelIngress(r)
323                }
324                1027 => {
325                    let (_, r) =
326                        extended_decapsulate::parse_extended_decapsulate_egress(record_data)?;
327                    FlowRecord::ExtendedDecapsulateEgress(r)
328                }
329                1028 => {
330                    let (_, r) =
331                        extended_decapsulate::parse_extended_decapsulate_ingress(record_data)?;
332                    FlowRecord::ExtendedDecapsulateIngress(r)
333                }
334                1029 => {
335                    let (_, r) = extended_vni::parse_extended_vni_egress(record_data)?;
336                    FlowRecord::ExtendedVniEgress(r)
337                }
338                1030 => {
339                    let (_, r) = extended_vni::parse_extended_vni_ingress(record_data)?;
340                    FlowRecord::ExtendedVniIngress(r)
341                }
342                1036 => {
343                    let (_, r) =
344                        extended_egress_queue::parse_extended_egress_queue(record_data)?;
345                    FlowRecord::ExtendedEgressQueue(r)
346                }
347                1037 => {
348                    let (_, r) = extended_acl::parse_extended_acl(record_data)?;
349                    FlowRecord::ExtendedAcl(r)
350                }
351                1038 => {
352                    let (_, r) = extended_function::parse_extended_function(record_data)?;
353                    FlowRecord::ExtendedFunction(r)
354                }
355                1039 => {
356                    let (_, r) = extended_transit::parse_extended_transit(record_data)?;
357                    FlowRecord::ExtendedTransit(r)
358                }
359                1040 => {
360                    let (_, r) = extended_queue::parse_extended_queue(record_data)?;
361                    FlowRecord::ExtendedQueue(r)
362                }
363                2100 => {
364                    let (_, r) = extended_socket_ipv4::parse_extended_socket_ipv4(record_data)?;
365                    FlowRecord::ExtendedSocketIpv4(r)
366                }
367                2101 => {
368                    let (_, r) = extended_socket_ipv6::parse_extended_socket_ipv6(record_data)?;
369                    FlowRecord::ExtendedSocketIpv6(r)
370                }
371                2102 => {
372                    let (_, r) = extended_proxy_socket_ipv4::parse_extended_proxy_socket_ipv4(
373                        record_data,
374                    )?;
375                    FlowRecord::ExtendedProxySocketIpv4(r)
376                }
377                2103 => {
378                    let (_, r) = extended_proxy_socket_ipv6::parse_extended_proxy_socket_ipv6(
379                        record_data,
380                    )?;
381                    FlowRecord::ExtendedProxySocketIpv6(r)
382                }
383                2105 => {
384                    let (_, r) = jvm_runtime::parse_jvm_runtime(record_data)?;
385                    FlowRecord::JvmRuntime(r)
386                }
387                2200 => {
388                    let (_, r) = memcache_operation::parse_memcache_operation(record_data)?;
389                    FlowRecord::MemcacheOperation(r)
390                }
391                2202 => {
392                    let (_, r) = app_operation::parse_app_operation(record_data)?;
393                    FlowRecord::AppOperation(r)
394                }
395                2206 => {
396                    let (_, r) = http_request::parse_http_request(record_data)?;
397                    FlowRecord::HttpRequest(r)
398                }
399                2207 => {
400                    let (_, r) =
401                        extended_proxy_request::parse_extended_proxy_request(record_data)?;
402                    FlowRecord::ExtendedProxyRequest(r)
403                }
404                _ => FlowRecord::Unknown {
405                    enterprise,
406                    format,
407                    data: record_data.to_vec(),
408                },
409            }
410        } else {
411            FlowRecord::Unknown {
412                enterprise,
413                format,
414                data: record_data.to_vec(),
415            }
416        };
417
418        records.push(record);
419        input = after_record;
420    }
421
422    Ok((input, records))
423}