Skip to main content

flowparser_sflow/flow_records/
mod.rs

1pub mod app_initiator;
2pub mod app_operation;
3pub mod app_parent_context;
4pub mod app_target;
5pub mod extended_80211_payload;
6pub mod extended_80211_rx;
7pub mod extended_80211_tx;
8pub mod extended_acl;
9pub mod extended_decapsulate;
10pub mod extended_egress_queue;
11pub mod extended_entities;
12pub mod extended_function;
13pub mod extended_gateway;
14pub mod extended_hw_trap;
15pub mod extended_ib;
16pub mod extended_linux_drop_reason;
17pub mod extended_mpls;
18pub mod extended_mpls_ftn;
19pub mod extended_mpls_ldp_fec;
20pub mod extended_mpls_tunnel;
21pub mod extended_mpls_vc;
22pub mod extended_nat;
23pub mod extended_nat_port;
24pub mod extended_proxy_request;
25pub mod extended_proxy_socket_ipv4;
26pub mod extended_proxy_socket_ipv6;
27pub mod extended_queue;
28pub mod extended_router;
29pub mod extended_socket_ipv4;
30pub mod extended_socket_ipv6;
31pub mod extended_switch;
32pub mod extended_tcp_info;
33pub mod extended_timestamp;
34pub mod extended_transit;
35pub mod extended_url;
36pub mod extended_user;
37pub mod extended_vlan_tunnel;
38pub mod extended_vni;
39pub mod http_request;
40pub mod jvm_runtime;
41pub mod memcache_operation;
42pub mod raw_packet_header;
43pub mod sampled_ethernet;
44pub mod sampled_ipv4;
45pub mod sampled_ipv6;
46
47use nom::IResult;
48use nom::bytes::complete::take;
49use nom::number::complete::be_u32;
50use serde::{Deserialize, Serialize};
51
52pub use app_initiator::AppInitiator;
53pub use app_operation::AppOperation;
54pub use app_parent_context::AppParentContext;
55pub use app_target::AppTarget;
56pub use extended_80211_payload::Extended80211Payload;
57pub use extended_80211_rx::Extended80211Rx;
58pub use extended_80211_tx::Extended80211Tx;
59pub use extended_acl::ExtendedAcl;
60pub use extended_decapsulate::{ExtendedDecapsulateEgress, ExtendedDecapsulateIngress};
61pub use extended_egress_queue::ExtendedEgressQueue;
62pub use extended_entities::ExtendedEntities;
63pub use extended_function::ExtendedFunction;
64pub use extended_gateway::ExtendedGateway;
65pub use extended_hw_trap::ExtendedHwTrap;
66pub use extended_ib::{ExtendedIbBrh, ExtendedIbGrh, ExtendedIbLrh};
67pub use extended_linux_drop_reason::ExtendedLinuxDropReason;
68pub use extended_mpls::ExtendedMpls;
69pub use extended_mpls_ftn::ExtendedMplsFtn;
70pub use extended_mpls_ldp_fec::ExtendedMplsLdpFec;
71pub use extended_mpls_tunnel::ExtendedMplsTunnel;
72pub use extended_mpls_vc::ExtendedMplsVc;
73pub use extended_nat::ExtendedNat;
74pub use extended_nat_port::ExtendedNatPort;
75pub use extended_proxy_request::ExtendedProxyRequest;
76pub use extended_proxy_socket_ipv4::ExtendedProxySocketIpv4;
77pub use extended_proxy_socket_ipv6::ExtendedProxySocketIpv6;
78pub use extended_queue::ExtendedQueue;
79pub use extended_router::ExtendedRouter;
80pub use extended_socket_ipv4::ExtendedSocketIpv4;
81pub use extended_socket_ipv6::ExtendedSocketIpv6;
82pub use extended_switch::ExtendedSwitch;
83pub use extended_tcp_info::ExtendedTcpInfo;
84pub use extended_timestamp::ExtendedTimestamp;
85pub use extended_transit::ExtendedTransit;
86pub use extended_url::ExtendedUrl;
87pub use extended_user::ExtendedUser;
88pub use extended_vlan_tunnel::ExtendedVlanTunnel;
89pub use extended_vni::{ExtendedVniEgress, ExtendedVniIngress};
90pub use http_request::HttpRequest;
91pub use jvm_runtime::JvmRuntime;
92pub use memcache_operation::MemcacheOperation;
93pub use raw_packet_header::RawPacketHeader;
94pub use sampled_ethernet::SampledEthernet;
95pub use sampled_ipv4::SampledIpv4;
96pub use sampled_ipv6::SampledIpv6;
97
98/// A flow record within a flow sample.
99///
100/// Flow records describe properties of a sampled packet, ranging from
101/// raw header bytes to decoded L2/L3/L4 fields and extended routing data.
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub enum FlowRecord {
104    /// Raw packet header bytes (enterprise=0, format=1).
105    RawPacketHeader(RawPacketHeader),
106    /// Sampled Ethernet frame header (enterprise=0, format=2).
107    SampledEthernet(SampledEthernet),
108    /// Sampled IPv4 packet header (enterprise=0, format=3).
109    SampledIpv4(SampledIpv4),
110    /// Sampled IPv6 packet header (enterprise=0, format=4).
111    SampledIpv6(SampledIpv6),
112    /// Extended switch data — VLAN and priority (enterprise=0, format=1001).
113    ExtendedSwitch(ExtendedSwitch),
114    /// Extended router data — next hop and masks (enterprise=0, format=1002).
115    ExtendedRouter(ExtendedRouter),
116    /// Extended gateway data — BGP AS path and communities (enterprise=0, format=1003).
117    ExtendedGateway(ExtendedGateway),
118    /// Extended user data — source and destination user identifiers (enterprise=0, format=1004).
119    ExtendedUser(ExtendedUser),
120    /// Extended URL data — URL and host strings (enterprise=0, format=1005).
121    ExtendedUrl(ExtendedUrl),
122    /// Extended MPLS data — next hop and label stacks (enterprise=0, format=1006).
123    ExtendedMpls(ExtendedMpls),
124    /// Extended NAT data — translated source and destination addresses (enterprise=0, format=1007).
125    ExtendedNat(ExtendedNat),
126    /// Extended NAT port translation data (enterprise=0, format=1020).
127    ExtendedNatPort(ExtendedNatPort),
128    /// Extended MPLS tunnel data (enterprise=0, format=1008).
129    ExtendedMplsTunnel(ExtendedMplsTunnel),
130    /// Extended MPLS virtual circuit data (enterprise=0, format=1009).
131    ExtendedMplsVc(ExtendedMplsVc),
132    /// Extended MPLS FEC to NHLFE mapping (enterprise=0, format=1010).
133    ExtendedMplsFtn(ExtendedMplsFtn),
134    /// Extended MPLS LDP FEC data (enterprise=0, format=1011).
135    ExtendedMplsLdpFec(ExtendedMplsLdpFec),
136    /// Extended VLAN tunnel data — 802.1Q-in-Q stack (enterprise=0, format=1012).
137    ExtendedVlanTunnel(ExtendedVlanTunnel),
138    /// Extended 802.11 payload data (enterprise=0, format=1013).
139    Extended80211Payload(Extended80211Payload),
140    /// Extended 802.11 receive data (enterprise=0, format=1014).
141    Extended80211Rx(Extended80211Rx),
142    /// Extended 802.11 transmit data (enterprise=0, format=1015).
143    Extended80211Tx(Extended80211Tx),
144    /// Extended L2 tunnel egress — reuses sampled Ethernet (enterprise=0, format=1021).
145    ExtendedL2TunnelEgress(SampledEthernet),
146    /// Extended L2 tunnel ingress — reuses sampled Ethernet (enterprise=0, format=1022).
147    ExtendedL2TunnelIngress(SampledEthernet),
148    /// Extended IPv4 tunnel egress — reuses sampled IPv4 (enterprise=0, format=1023).
149    ExtendedIpv4TunnelEgress(SampledIpv4),
150    /// Extended IPv4 tunnel ingress — reuses sampled IPv4 (enterprise=0, format=1024).
151    ExtendedIpv4TunnelIngress(SampledIpv4),
152    /// Extended IPv6 tunnel egress — reuses sampled IPv6 (enterprise=0, format=1025).
153    ExtendedIpv6TunnelEgress(SampledIpv6),
154    /// Extended IPv6 tunnel ingress — reuses sampled IPv6 (enterprise=0, format=1026).
155    ExtendedIpv6TunnelIngress(SampledIpv6),
156    /// Extended decapsulate egress data (enterprise=0, format=1027).
157    ExtendedDecapsulateEgress(ExtendedDecapsulateEgress),
158    /// Extended decapsulate ingress data (enterprise=0, format=1028).
159    ExtendedDecapsulateIngress(ExtendedDecapsulateIngress),
160    /// Extended VNI egress data (enterprise=0, format=1029).
161    ExtendedVniEgress(ExtendedVniEgress),
162    /// Extended VNI ingress data (enterprise=0, format=1030).
163    ExtendedVniIngress(ExtendedVniIngress),
164    /// InfiniBand Local Routing Header (enterprise=0, format=1031).
165    ExtendedIbLrh(ExtendedIbLrh),
166    /// InfiniBand Global Routing Header (enterprise=0, format=1032).
167    ExtendedIbGrh(ExtendedIbGrh),
168    /// InfiniBand Base Transport Header (enterprise=0, format=1033).
169    ExtendedIbBrh(ExtendedIbBrh),
170    /// Extended egress queue identifier (enterprise=0, format=1036).
171    ExtendedEgressQueue(ExtendedEgressQueue),
172    /// Extended ACL data (enterprise=0, format=1037).
173    ExtendedAcl(ExtendedAcl),
174    /// Extended function/symbol data (enterprise=0, format=1038).
175    ExtendedFunction(ExtendedFunction),
176    /// Extended transit delay data (enterprise=0, format=1039).
177    ExtendedTransit(ExtendedTransit),
178    /// Extended queue depth data (enterprise=0, format=1040).
179    ExtendedQueue(ExtendedQueue),
180    /// Extended hardware trap data (enterprise=0, format=1041).
181    ExtendedHwTrap(ExtendedHwTrap),
182    /// Extended Linux kernel drop reason (enterprise=0, format=1042).
183    ExtendedLinuxDropReason(ExtendedLinuxDropReason),
184    /// Extended nanosecond-precision timestamp (enterprise=0, format=1043).
185    ExtendedTimestamp(ExtendedTimestamp),
186    /// Extended socket IPv4 data (enterprise=0, format=2100).
187    ExtendedSocketIpv4(ExtendedSocketIpv4),
188    /// Extended socket IPv6 data (enterprise=0, format=2101).
189    ExtendedSocketIpv6(ExtendedSocketIpv6),
190    /// Extended proxy socket IPv4 data (enterprise=0, format=2102).
191    ExtendedProxySocketIpv4(ExtendedProxySocketIpv4),
192    /// Extended proxy socket IPv6 data (enterprise=0, format=2103).
193    ExtendedProxySocketIpv6(ExtendedProxySocketIpv6),
194    /// JVM runtime information (enterprise=0, format=2105).
195    JvmRuntime(JvmRuntime),
196    /// Memcache operation data (enterprise=0, format=2200).
197    MemcacheOperation(MemcacheOperation),
198    /// Application operation data (enterprise=0, format=2202).
199    AppOperation(AppOperation),
200    /// Application parent context (enterprise=0, format=2203).
201    AppParentContext(AppParentContext),
202    /// Application initiator (enterprise=0, format=2204).
203    AppInitiator(AppInitiator),
204    /// Application target (enterprise=0, format=2205).
205    AppTarget(AppTarget),
206    /// HTTP request data (enterprise=0, format=2206).
207    HttpRequest(HttpRequest),
208    /// Extended proxy request data (enterprise=0, format=2207).
209    ExtendedProxyRequest(ExtendedProxyRequest),
210    /// Extended TCP info (enterprise=0, format=2209).
211    ExtendedTcpInfo(ExtendedTcpInfo),
212    /// Extended entities data (enterprise=0, format=2210).
213    ExtendedEntities(ExtendedEntities),
214    /// Unrecognized flow record type, preserved as raw bytes.
215    Unknown {
216        /// Enterprise code from the record header.
217        enterprise: u32,
218        /// Format code from the record header.
219        format: u32,
220        /// Raw record data.
221        data: Vec<u8>,
222    },
223}
224
225/// Parse an XDR-encoded sFlow string (length-prefixed, padded to 4-byte boundary).
226///
227/// Note: Invalid UTF-8 bytes are replaced with U+FFFD (replacement character).
228pub(crate) fn parse_sflow_string(input: &[u8]) -> IResult<&[u8], String> {
229    let (input, length) = be_u32(input)?;
230    let (input, bytes) = take(length as usize)(input)?;
231    // Pad to 4-byte boundary
232    let padding = (4 - (length as usize % 4)) % 4;
233    let (input, _) = take(padding)(input)?;
234    let s = String::from_utf8_lossy(bytes).into_owned();
235    Ok((input, s))
236}
237
238pub(crate) fn parse_flow_records(
239    mut input: &[u8],
240    num_records: u32,
241) -> IResult<&[u8], Vec<FlowRecord>> {
242    // Cap capacity to prevent DoS: each record needs at least 8 bytes (format + length)
243    let cap = (num_records as usize).min(input.len() / 8);
244    let mut records = Vec::with_capacity(cap);
245
246    for _ in 0..num_records {
247        let (rest, data_format) = be_u32(input)?;
248        let enterprise = data_format >> 12;
249        let format = data_format & 0xFFF;
250
251        let (rest, record_length) = be_u32(rest)?;
252        let record_length = record_length as usize;
253
254        if rest.len() < record_length {
255            return Err(nom::Err::Error(nom::error::Error::new(
256                rest,
257                nom::error::ErrorKind::Eof,
258            )));
259        }
260
261        let record_data = &rest[..record_length];
262        let after_record = &rest[record_length..];
263
264        let record = if enterprise == 0 {
265            match format {
266                1 => {
267                    let (_, r) = raw_packet_header::parse_raw_packet_header(record_data)?;
268                    FlowRecord::RawPacketHeader(r)
269                }
270                2 => {
271                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
272                    FlowRecord::SampledEthernet(r)
273                }
274                3 => {
275                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
276                    FlowRecord::SampledIpv4(r)
277                }
278                4 => {
279                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
280                    FlowRecord::SampledIpv6(r)
281                }
282                1001 => {
283                    let (_, r) = extended_switch::parse_extended_switch(record_data)?;
284                    FlowRecord::ExtendedSwitch(r)
285                }
286                1002 => {
287                    let (_, r) = extended_router::parse_extended_router(record_data)?;
288                    FlowRecord::ExtendedRouter(r)
289                }
290                1003 => {
291                    let (_, r) = extended_gateway::parse_extended_gateway(record_data)?;
292                    FlowRecord::ExtendedGateway(r)
293                }
294                1004 => {
295                    let (_, r) = extended_user::parse_extended_user(record_data)?;
296                    FlowRecord::ExtendedUser(r)
297                }
298                1005 => {
299                    let (_, r) = extended_url::parse_extended_url(record_data)?;
300                    FlowRecord::ExtendedUrl(r)
301                }
302                1006 => {
303                    let (_, r) = extended_mpls::parse_extended_mpls(record_data)?;
304                    FlowRecord::ExtendedMpls(r)
305                }
306                1007 => {
307                    let (_, r) = extended_nat::parse_extended_nat(record_data)?;
308                    FlowRecord::ExtendedNat(r)
309                }
310                1008 => {
311                    let (_, r) = extended_mpls_tunnel::parse_extended_mpls_tunnel(record_data)?;
312                    FlowRecord::ExtendedMplsTunnel(r)
313                }
314                1009 => {
315                    let (_, r) = extended_mpls_vc::parse_extended_mpls_vc(record_data)?;
316                    FlowRecord::ExtendedMplsVc(r)
317                }
318                1010 => {
319                    let (_, r) = extended_mpls_ftn::parse_extended_mpls_ftn(record_data)?;
320                    FlowRecord::ExtendedMplsFtn(r)
321                }
322                1011 => {
323                    let (_, r) =
324                        extended_mpls_ldp_fec::parse_extended_mpls_ldp_fec(record_data)?;
325                    FlowRecord::ExtendedMplsLdpFec(r)
326                }
327                1020 => {
328                    let (_, r) = extended_nat_port::parse_extended_nat_port(record_data)?;
329                    FlowRecord::ExtendedNatPort(r)
330                }
331                1012 => {
332                    let (_, r) = extended_vlan_tunnel::parse_extended_vlan_tunnel(record_data)?;
333                    FlowRecord::ExtendedVlanTunnel(r)
334                }
335                1013 => {
336                    let (_, r) =
337                        extended_80211_payload::parse_extended_80211_payload(record_data)?;
338                    FlowRecord::Extended80211Payload(r)
339                }
340                1014 => {
341                    let (_, r) = extended_80211_rx::parse_extended_80211_rx(record_data)?;
342                    FlowRecord::Extended80211Rx(r)
343                }
344                1015 => {
345                    let (_, r) = extended_80211_tx::parse_extended_80211_tx(record_data)?;
346                    FlowRecord::Extended80211Tx(r)
347                }
348                1021 => {
349                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
350                    FlowRecord::ExtendedL2TunnelEgress(r)
351                }
352                1022 => {
353                    let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
354                    FlowRecord::ExtendedL2TunnelIngress(r)
355                }
356                1023 => {
357                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
358                    FlowRecord::ExtendedIpv4TunnelEgress(r)
359                }
360                1024 => {
361                    let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
362                    FlowRecord::ExtendedIpv4TunnelIngress(r)
363                }
364                1025 => {
365                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
366                    FlowRecord::ExtendedIpv6TunnelEgress(r)
367                }
368                1026 => {
369                    let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
370                    FlowRecord::ExtendedIpv6TunnelIngress(r)
371                }
372                1027 => {
373                    let (_, r) =
374                        extended_decapsulate::parse_extended_decapsulate_egress(record_data)?;
375                    FlowRecord::ExtendedDecapsulateEgress(r)
376                }
377                1028 => {
378                    let (_, r) =
379                        extended_decapsulate::parse_extended_decapsulate_ingress(record_data)?;
380                    FlowRecord::ExtendedDecapsulateIngress(r)
381                }
382                1029 => {
383                    let (_, r) = extended_vni::parse_extended_vni_egress(record_data)?;
384                    FlowRecord::ExtendedVniEgress(r)
385                }
386                1030 => {
387                    let (_, r) = extended_vni::parse_extended_vni_ingress(record_data)?;
388                    FlowRecord::ExtendedVniIngress(r)
389                }
390                1031 => {
391                    let (_, r) = extended_ib::parse_extended_ib_lrh(record_data)?;
392                    FlowRecord::ExtendedIbLrh(r)
393                }
394                1032 => {
395                    let (_, r) = extended_ib::parse_extended_ib_grh(record_data)?;
396                    FlowRecord::ExtendedIbGrh(r)
397                }
398                1033 => {
399                    let (_, r) = extended_ib::parse_extended_ib_brh(record_data)?;
400                    FlowRecord::ExtendedIbBrh(r)
401                }
402                1036 => {
403                    let (_, r) =
404                        extended_egress_queue::parse_extended_egress_queue(record_data)?;
405                    FlowRecord::ExtendedEgressQueue(r)
406                }
407                1037 => {
408                    let (_, r) = extended_acl::parse_extended_acl(record_data)?;
409                    FlowRecord::ExtendedAcl(r)
410                }
411                1038 => {
412                    let (_, r) = extended_function::parse_extended_function(record_data)?;
413                    FlowRecord::ExtendedFunction(r)
414                }
415                1039 => {
416                    let (_, r) = extended_transit::parse_extended_transit(record_data)?;
417                    FlowRecord::ExtendedTransit(r)
418                }
419                1040 => {
420                    let (_, r) = extended_queue::parse_extended_queue(record_data)?;
421                    FlowRecord::ExtendedQueue(r)
422                }
423                1041 => {
424                    let (_, r) = extended_hw_trap::parse_extended_hw_trap(record_data)?;
425                    FlowRecord::ExtendedHwTrap(r)
426                }
427                1042 => {
428                    let (_, r) = extended_linux_drop_reason::parse_extended_linux_drop_reason(
429                        record_data,
430                    )?;
431                    FlowRecord::ExtendedLinuxDropReason(r)
432                }
433                1043 => {
434                    let (_, r) = extended_timestamp::parse_extended_timestamp(record_data)?;
435                    FlowRecord::ExtendedTimestamp(r)
436                }
437                2100 => {
438                    let (_, r) = extended_socket_ipv4::parse_extended_socket_ipv4(record_data)?;
439                    FlowRecord::ExtendedSocketIpv4(r)
440                }
441                2101 => {
442                    let (_, r) = extended_socket_ipv6::parse_extended_socket_ipv6(record_data)?;
443                    FlowRecord::ExtendedSocketIpv6(r)
444                }
445                2102 => {
446                    let (_, r) = extended_proxy_socket_ipv4::parse_extended_proxy_socket_ipv4(
447                        record_data,
448                    )?;
449                    FlowRecord::ExtendedProxySocketIpv4(r)
450                }
451                2103 => {
452                    let (_, r) = extended_proxy_socket_ipv6::parse_extended_proxy_socket_ipv6(
453                        record_data,
454                    )?;
455                    FlowRecord::ExtendedProxySocketIpv6(r)
456                }
457                2105 => {
458                    let (_, r) = jvm_runtime::parse_jvm_runtime(record_data)?;
459                    FlowRecord::JvmRuntime(r)
460                }
461                2200 => {
462                    let (_, r) = memcache_operation::parse_memcache_operation(record_data)?;
463                    FlowRecord::MemcacheOperation(r)
464                }
465                2202 => {
466                    let (_, r) = app_operation::parse_app_operation(record_data)?;
467                    FlowRecord::AppOperation(r)
468                }
469                2203 => {
470                    let (_, r) = app_parent_context::parse_app_parent_context(record_data)?;
471                    FlowRecord::AppParentContext(r)
472                }
473                2204 => {
474                    let (_, r) = app_initiator::parse_app_initiator(record_data)?;
475                    FlowRecord::AppInitiator(r)
476                }
477                2205 => {
478                    let (_, r) = app_target::parse_app_target(record_data)?;
479                    FlowRecord::AppTarget(r)
480                }
481                2206 => {
482                    let (_, r) = http_request::parse_http_request(record_data)?;
483                    FlowRecord::HttpRequest(r)
484                }
485                2207 => {
486                    let (_, r) =
487                        extended_proxy_request::parse_extended_proxy_request(record_data)?;
488                    FlowRecord::ExtendedProxyRequest(r)
489                }
490                2209 => {
491                    let (_, r) = extended_tcp_info::parse_extended_tcp_info(record_data)?;
492                    FlowRecord::ExtendedTcpInfo(r)
493                }
494                2210 => {
495                    let (_, r) = extended_entities::parse_extended_entities(record_data)?;
496                    FlowRecord::ExtendedEntities(r)
497                }
498                _ => FlowRecord::Unknown {
499                    enterprise,
500                    format,
501                    data: record_data.to_vec(),
502                },
503            }
504        } else {
505            FlowRecord::Unknown {
506                enterprise,
507                format,
508                data: record_data.to_vec(),
509            }
510        };
511
512        records.push(record);
513        input = after_record;
514    }
515
516    Ok((input, records))
517}