flowparser_sflow/flow_records/
mod.rs1pub mod app_initiator;
2pub mod app_operation;
3pub mod app_parent_context;
4pub mod app_target;
5pub mod extended_80211_payload;
6pub mod extended_80211_rx;
7pub mod extended_80211_tx;
8pub mod extended_acl;
9pub mod extended_decapsulate;
10pub mod extended_egress_queue;
11pub mod extended_entities;
12pub mod extended_function;
13pub mod extended_gateway;
14pub mod extended_hw_trap;
15pub mod extended_ib;
16pub mod extended_linux_drop_reason;
17pub mod extended_mpls;
18pub mod extended_mpls_ftn;
19pub mod extended_mpls_ldp_fec;
20pub mod extended_mpls_tunnel;
21pub mod extended_mpls_vc;
22pub mod extended_nat;
23pub mod extended_nat_port;
24pub mod extended_proxy_request;
25pub mod extended_proxy_socket_ipv4;
26pub mod extended_proxy_socket_ipv6;
27pub mod extended_queue;
28pub mod extended_router;
29pub mod extended_socket_ipv4;
30pub mod extended_socket_ipv6;
31pub mod extended_switch;
32pub mod extended_tcp_info;
33pub mod extended_timestamp;
34pub mod extended_transit;
35pub mod extended_url;
36pub mod extended_user;
37pub mod extended_vlan_tunnel;
38pub mod extended_vni;
39pub mod http_request;
40pub mod jvm_runtime;
41pub mod memcache_operation;
42pub mod raw_packet_header;
43pub mod sampled_ethernet;
44pub mod sampled_ipv4;
45pub mod sampled_ipv6;
46
47use nom::IResult;
48use nom::bytes::complete::take;
49use nom::number::complete::be_u32;
50use serde::{Deserialize, Serialize};
51
52pub use app_initiator::AppInitiator;
53pub use app_operation::AppOperation;
54pub use app_parent_context::AppParentContext;
55pub use app_target::AppTarget;
56pub use extended_80211_payload::Extended80211Payload;
57pub use extended_80211_rx::Extended80211Rx;
58pub use extended_80211_tx::Extended80211Tx;
59pub use extended_acl::ExtendedAcl;
60pub use extended_decapsulate::{ExtendedDecapsulateEgress, ExtendedDecapsulateIngress};
61pub use extended_egress_queue::ExtendedEgressQueue;
62pub use extended_entities::ExtendedEntities;
63pub use extended_function::ExtendedFunction;
64pub use extended_gateway::ExtendedGateway;
65pub use extended_hw_trap::ExtendedHwTrap;
66pub use extended_ib::{ExtendedIbBrh, ExtendedIbGrh, ExtendedIbLrh};
67pub use extended_linux_drop_reason::ExtendedLinuxDropReason;
68pub use extended_mpls::ExtendedMpls;
69pub use extended_mpls_ftn::ExtendedMplsFtn;
70pub use extended_mpls_ldp_fec::ExtendedMplsLdpFec;
71pub use extended_mpls_tunnel::ExtendedMplsTunnel;
72pub use extended_mpls_vc::ExtendedMplsVc;
73pub use extended_nat::ExtendedNat;
74pub use extended_nat_port::ExtendedNatPort;
75pub use extended_proxy_request::ExtendedProxyRequest;
76pub use extended_proxy_socket_ipv4::ExtendedProxySocketIpv4;
77pub use extended_proxy_socket_ipv6::ExtendedProxySocketIpv6;
78pub use extended_queue::ExtendedQueue;
79pub use extended_router::ExtendedRouter;
80pub use extended_socket_ipv4::ExtendedSocketIpv4;
81pub use extended_socket_ipv6::ExtendedSocketIpv6;
82pub use extended_switch::ExtendedSwitch;
83pub use extended_tcp_info::ExtendedTcpInfo;
84pub use extended_timestamp::ExtendedTimestamp;
85pub use extended_transit::ExtendedTransit;
86pub use extended_url::ExtendedUrl;
87pub use extended_user::ExtendedUser;
88pub use extended_vlan_tunnel::ExtendedVlanTunnel;
89pub use extended_vni::{ExtendedVniEgress, ExtendedVniIngress};
90pub use http_request::HttpRequest;
91pub use jvm_runtime::JvmRuntime;
92pub use memcache_operation::MemcacheOperation;
93pub use raw_packet_header::RawPacketHeader;
94pub use sampled_ethernet::SampledEthernet;
95pub use sampled_ipv4::SampledIpv4;
96pub use sampled_ipv6::SampledIpv6;
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub enum FlowRecord {
104 RawPacketHeader(RawPacketHeader),
106 SampledEthernet(SampledEthernet),
108 SampledIpv4(SampledIpv4),
110 SampledIpv6(SampledIpv6),
112 ExtendedSwitch(ExtendedSwitch),
114 ExtendedRouter(ExtendedRouter),
116 ExtendedGateway(ExtendedGateway),
118 ExtendedUser(ExtendedUser),
120 ExtendedUrl(ExtendedUrl),
122 ExtendedMpls(ExtendedMpls),
124 ExtendedNat(ExtendedNat),
126 ExtendedNatPort(ExtendedNatPort),
128 ExtendedMplsTunnel(ExtendedMplsTunnel),
130 ExtendedMplsVc(ExtendedMplsVc),
132 ExtendedMplsFtn(ExtendedMplsFtn),
134 ExtendedMplsLdpFec(ExtendedMplsLdpFec),
136 ExtendedVlanTunnel(ExtendedVlanTunnel),
138 Extended80211Payload(Extended80211Payload),
140 Extended80211Rx(Extended80211Rx),
142 Extended80211Tx(Extended80211Tx),
144 ExtendedL2TunnelEgress(SampledEthernet),
146 ExtendedL2TunnelIngress(SampledEthernet),
148 ExtendedIpv4TunnelEgress(SampledIpv4),
150 ExtendedIpv4TunnelIngress(SampledIpv4),
152 ExtendedIpv6TunnelEgress(SampledIpv6),
154 ExtendedIpv6TunnelIngress(SampledIpv6),
156 ExtendedDecapsulateEgress(ExtendedDecapsulateEgress),
158 ExtendedDecapsulateIngress(ExtendedDecapsulateIngress),
160 ExtendedVniEgress(ExtendedVniEgress),
162 ExtendedVniIngress(ExtendedVniIngress),
164 ExtendedIbLrh(ExtendedIbLrh),
166 ExtendedIbGrh(ExtendedIbGrh),
168 ExtendedIbBrh(ExtendedIbBrh),
170 ExtendedEgressQueue(ExtendedEgressQueue),
172 ExtendedAcl(ExtendedAcl),
174 ExtendedFunction(ExtendedFunction),
176 ExtendedTransit(ExtendedTransit),
178 ExtendedQueue(ExtendedQueue),
180 ExtendedHwTrap(ExtendedHwTrap),
182 ExtendedLinuxDropReason(ExtendedLinuxDropReason),
184 ExtendedTimestamp(ExtendedTimestamp),
186 ExtendedSocketIpv4(ExtendedSocketIpv4),
188 ExtendedSocketIpv6(ExtendedSocketIpv6),
190 ExtendedProxySocketIpv4(ExtendedProxySocketIpv4),
192 ExtendedProxySocketIpv6(ExtendedProxySocketIpv6),
194 JvmRuntime(JvmRuntime),
196 MemcacheOperation(MemcacheOperation),
198 AppOperation(AppOperation),
200 AppParentContext(AppParentContext),
202 AppInitiator(AppInitiator),
204 AppTarget(AppTarget),
206 HttpRequest(HttpRequest),
208 ExtendedProxyRequest(ExtendedProxyRequest),
210 ExtendedTcpInfo(ExtendedTcpInfo),
212 ExtendedEntities(ExtendedEntities),
214 Unknown {
216 enterprise: u32,
218 format: u32,
220 data: Vec<u8>,
222 },
223}
224
225pub(crate) fn parse_sflow_string(input: &[u8]) -> IResult<&[u8], String> {
229 let (input, length) = be_u32(input)?;
230 let (input, bytes) = take(length as usize)(input)?;
231 let padding = (4 - (length as usize % 4)) % 4;
233 let (input, _) = take(padding)(input)?;
234 let s = String::from_utf8_lossy(bytes).into_owned();
235 Ok((input, s))
236}
237
238pub(crate) fn parse_flow_records(
239 mut input: &[u8],
240 num_records: u32,
241) -> IResult<&[u8], Vec<FlowRecord>> {
242 let cap = (num_records as usize).min(input.len() / 8);
244 let mut records = Vec::with_capacity(cap);
245
246 for _ in 0..num_records {
247 let (rest, data_format) = be_u32(input)?;
248 let enterprise = data_format >> 12;
249 let format = data_format & 0xFFF;
250
251 let (rest, record_length) = be_u32(rest)?;
252 let record_length = record_length as usize;
253
254 if rest.len() < record_length {
255 return Err(nom::Err::Error(nom::error::Error::new(
256 rest,
257 nom::error::ErrorKind::Eof,
258 )));
259 }
260
261 let record_data = &rest[..record_length];
262 let after_record = &rest[record_length..];
263
264 let record = if enterprise == 0 {
265 match format {
266 1 => {
267 let (_, r) = raw_packet_header::parse_raw_packet_header(record_data)?;
268 FlowRecord::RawPacketHeader(r)
269 }
270 2 => {
271 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
272 FlowRecord::SampledEthernet(r)
273 }
274 3 => {
275 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
276 FlowRecord::SampledIpv4(r)
277 }
278 4 => {
279 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
280 FlowRecord::SampledIpv6(r)
281 }
282 1001 => {
283 let (_, r) = extended_switch::parse_extended_switch(record_data)?;
284 FlowRecord::ExtendedSwitch(r)
285 }
286 1002 => {
287 let (_, r) = extended_router::parse_extended_router(record_data)?;
288 FlowRecord::ExtendedRouter(r)
289 }
290 1003 => {
291 let (_, r) = extended_gateway::parse_extended_gateway(record_data)?;
292 FlowRecord::ExtendedGateway(r)
293 }
294 1004 => {
295 let (_, r) = extended_user::parse_extended_user(record_data)?;
296 FlowRecord::ExtendedUser(r)
297 }
298 1005 => {
299 let (_, r) = extended_url::parse_extended_url(record_data)?;
300 FlowRecord::ExtendedUrl(r)
301 }
302 1006 => {
303 let (_, r) = extended_mpls::parse_extended_mpls(record_data)?;
304 FlowRecord::ExtendedMpls(r)
305 }
306 1007 => {
307 let (_, r) = extended_nat::parse_extended_nat(record_data)?;
308 FlowRecord::ExtendedNat(r)
309 }
310 1008 => {
311 let (_, r) = extended_mpls_tunnel::parse_extended_mpls_tunnel(record_data)?;
312 FlowRecord::ExtendedMplsTunnel(r)
313 }
314 1009 => {
315 let (_, r) = extended_mpls_vc::parse_extended_mpls_vc(record_data)?;
316 FlowRecord::ExtendedMplsVc(r)
317 }
318 1010 => {
319 let (_, r) = extended_mpls_ftn::parse_extended_mpls_ftn(record_data)?;
320 FlowRecord::ExtendedMplsFtn(r)
321 }
322 1011 => {
323 let (_, r) =
324 extended_mpls_ldp_fec::parse_extended_mpls_ldp_fec(record_data)?;
325 FlowRecord::ExtendedMplsLdpFec(r)
326 }
327 1020 => {
328 let (_, r) = extended_nat_port::parse_extended_nat_port(record_data)?;
329 FlowRecord::ExtendedNatPort(r)
330 }
331 1012 => {
332 let (_, r) = extended_vlan_tunnel::parse_extended_vlan_tunnel(record_data)?;
333 FlowRecord::ExtendedVlanTunnel(r)
334 }
335 1013 => {
336 let (_, r) =
337 extended_80211_payload::parse_extended_80211_payload(record_data)?;
338 FlowRecord::Extended80211Payload(r)
339 }
340 1014 => {
341 let (_, r) = extended_80211_rx::parse_extended_80211_rx(record_data)?;
342 FlowRecord::Extended80211Rx(r)
343 }
344 1015 => {
345 let (_, r) = extended_80211_tx::parse_extended_80211_tx(record_data)?;
346 FlowRecord::Extended80211Tx(r)
347 }
348 1021 => {
349 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
350 FlowRecord::ExtendedL2TunnelEgress(r)
351 }
352 1022 => {
353 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
354 FlowRecord::ExtendedL2TunnelIngress(r)
355 }
356 1023 => {
357 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
358 FlowRecord::ExtendedIpv4TunnelEgress(r)
359 }
360 1024 => {
361 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
362 FlowRecord::ExtendedIpv4TunnelIngress(r)
363 }
364 1025 => {
365 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
366 FlowRecord::ExtendedIpv6TunnelEgress(r)
367 }
368 1026 => {
369 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
370 FlowRecord::ExtendedIpv6TunnelIngress(r)
371 }
372 1027 => {
373 let (_, r) =
374 extended_decapsulate::parse_extended_decapsulate_egress(record_data)?;
375 FlowRecord::ExtendedDecapsulateEgress(r)
376 }
377 1028 => {
378 let (_, r) =
379 extended_decapsulate::parse_extended_decapsulate_ingress(record_data)?;
380 FlowRecord::ExtendedDecapsulateIngress(r)
381 }
382 1029 => {
383 let (_, r) = extended_vni::parse_extended_vni_egress(record_data)?;
384 FlowRecord::ExtendedVniEgress(r)
385 }
386 1030 => {
387 let (_, r) = extended_vni::parse_extended_vni_ingress(record_data)?;
388 FlowRecord::ExtendedVniIngress(r)
389 }
390 1031 => {
391 let (_, r) = extended_ib::parse_extended_ib_lrh(record_data)?;
392 FlowRecord::ExtendedIbLrh(r)
393 }
394 1032 => {
395 let (_, r) = extended_ib::parse_extended_ib_grh(record_data)?;
396 FlowRecord::ExtendedIbGrh(r)
397 }
398 1033 => {
399 let (_, r) = extended_ib::parse_extended_ib_brh(record_data)?;
400 FlowRecord::ExtendedIbBrh(r)
401 }
402 1036 => {
403 let (_, r) =
404 extended_egress_queue::parse_extended_egress_queue(record_data)?;
405 FlowRecord::ExtendedEgressQueue(r)
406 }
407 1037 => {
408 let (_, r) = extended_acl::parse_extended_acl(record_data)?;
409 FlowRecord::ExtendedAcl(r)
410 }
411 1038 => {
412 let (_, r) = extended_function::parse_extended_function(record_data)?;
413 FlowRecord::ExtendedFunction(r)
414 }
415 1039 => {
416 let (_, r) = extended_transit::parse_extended_transit(record_data)?;
417 FlowRecord::ExtendedTransit(r)
418 }
419 1040 => {
420 let (_, r) = extended_queue::parse_extended_queue(record_data)?;
421 FlowRecord::ExtendedQueue(r)
422 }
423 1041 => {
424 let (_, r) = extended_hw_trap::parse_extended_hw_trap(record_data)?;
425 FlowRecord::ExtendedHwTrap(r)
426 }
427 1042 => {
428 let (_, r) = extended_linux_drop_reason::parse_extended_linux_drop_reason(
429 record_data,
430 )?;
431 FlowRecord::ExtendedLinuxDropReason(r)
432 }
433 1043 => {
434 let (_, r) = extended_timestamp::parse_extended_timestamp(record_data)?;
435 FlowRecord::ExtendedTimestamp(r)
436 }
437 2100 => {
438 let (_, r) = extended_socket_ipv4::parse_extended_socket_ipv4(record_data)?;
439 FlowRecord::ExtendedSocketIpv4(r)
440 }
441 2101 => {
442 let (_, r) = extended_socket_ipv6::parse_extended_socket_ipv6(record_data)?;
443 FlowRecord::ExtendedSocketIpv6(r)
444 }
445 2102 => {
446 let (_, r) = extended_proxy_socket_ipv4::parse_extended_proxy_socket_ipv4(
447 record_data,
448 )?;
449 FlowRecord::ExtendedProxySocketIpv4(r)
450 }
451 2103 => {
452 let (_, r) = extended_proxy_socket_ipv6::parse_extended_proxy_socket_ipv6(
453 record_data,
454 )?;
455 FlowRecord::ExtendedProxySocketIpv6(r)
456 }
457 2105 => {
458 let (_, r) = jvm_runtime::parse_jvm_runtime(record_data)?;
459 FlowRecord::JvmRuntime(r)
460 }
461 2200 => {
462 let (_, r) = memcache_operation::parse_memcache_operation(record_data)?;
463 FlowRecord::MemcacheOperation(r)
464 }
465 2202 => {
466 let (_, r) = app_operation::parse_app_operation(record_data)?;
467 FlowRecord::AppOperation(r)
468 }
469 2203 => {
470 let (_, r) = app_parent_context::parse_app_parent_context(record_data)?;
471 FlowRecord::AppParentContext(r)
472 }
473 2204 => {
474 let (_, r) = app_initiator::parse_app_initiator(record_data)?;
475 FlowRecord::AppInitiator(r)
476 }
477 2205 => {
478 let (_, r) = app_target::parse_app_target(record_data)?;
479 FlowRecord::AppTarget(r)
480 }
481 2206 => {
482 let (_, r) = http_request::parse_http_request(record_data)?;
483 FlowRecord::HttpRequest(r)
484 }
485 2207 => {
486 let (_, r) =
487 extended_proxy_request::parse_extended_proxy_request(record_data)?;
488 FlowRecord::ExtendedProxyRequest(r)
489 }
490 2209 => {
491 let (_, r) = extended_tcp_info::parse_extended_tcp_info(record_data)?;
492 FlowRecord::ExtendedTcpInfo(r)
493 }
494 2210 => {
495 let (_, r) = extended_entities::parse_extended_entities(record_data)?;
496 FlowRecord::ExtendedEntities(r)
497 }
498 _ => FlowRecord::Unknown {
499 enterprise,
500 format,
501 data: record_data.to_vec(),
502 },
503 }
504 } else {
505 FlowRecord::Unknown {
506 enterprise,
507 format,
508 data: record_data.to_vec(),
509 }
510 };
511
512 records.push(record);
513 input = after_record;
514 }
515
516 Ok((input, records))
517}