flowparser_sflow/flow_records/
mod.rs1pub mod app_operation;
2pub mod extended_80211_payload;
3pub mod extended_80211_rx;
4pub mod extended_80211_tx;
5pub mod extended_acl;
6pub mod extended_decapsulate;
7pub mod extended_egress_queue;
8pub mod extended_function;
9pub mod extended_gateway;
10pub mod extended_mpls;
11pub mod extended_mpls_ftn;
12pub mod extended_mpls_ldp_fec;
13pub mod extended_mpls_tunnel;
14pub mod extended_mpls_vc;
15pub mod extended_nat;
16pub mod extended_proxy_request;
17pub mod extended_proxy_socket_ipv4;
18pub mod extended_proxy_socket_ipv6;
19pub mod extended_queue;
20pub mod extended_router;
21pub mod extended_socket_ipv4;
22pub mod extended_socket_ipv6;
23pub mod extended_switch;
24pub mod extended_transit;
25pub mod extended_url;
26pub mod extended_user;
27pub mod extended_vlan_tunnel;
28pub mod extended_vni;
29pub mod http_request;
30pub mod jvm_runtime;
31pub mod memcache_operation;
32pub mod raw_packet_header;
33pub mod sampled_ethernet;
34pub mod sampled_ipv4;
35pub mod sampled_ipv6;
36
37use nom::IResult;
38use nom::bytes::complete::take;
39use nom::number::complete::be_u32;
40use serde::{Deserialize, Serialize};
41
42pub use app_operation::AppOperation;
43pub use extended_80211_payload::Extended80211Payload;
44pub use extended_80211_rx::Extended80211Rx;
45pub use extended_80211_tx::Extended80211Tx;
46pub use extended_acl::ExtendedAcl;
47pub use extended_decapsulate::{ExtendedDecapsulateEgress, ExtendedDecapsulateIngress};
48pub use extended_egress_queue::ExtendedEgressQueue;
49pub use extended_function::ExtendedFunction;
50pub use extended_gateway::ExtendedGateway;
51pub use extended_mpls::ExtendedMpls;
52pub use extended_mpls_ftn::ExtendedMplsFtn;
53pub use extended_mpls_ldp_fec::ExtendedMplsLdpFec;
54pub use extended_mpls_tunnel::ExtendedMplsTunnel;
55pub use extended_mpls_vc::ExtendedMplsVc;
56pub use extended_nat::ExtendedNat;
57pub use extended_proxy_request::ExtendedProxyRequest;
58pub use extended_proxy_socket_ipv4::ExtendedProxySocketIpv4;
59pub use extended_proxy_socket_ipv6::ExtendedProxySocketIpv6;
60pub use extended_queue::ExtendedQueue;
61pub use extended_router::ExtendedRouter;
62pub use extended_socket_ipv4::ExtendedSocketIpv4;
63pub use extended_socket_ipv6::ExtendedSocketIpv6;
64pub use extended_switch::ExtendedSwitch;
65pub use extended_transit::ExtendedTransit;
66pub use extended_url::ExtendedUrl;
67pub use extended_user::ExtendedUser;
68pub use extended_vlan_tunnel::ExtendedVlanTunnel;
69pub use extended_vni::{ExtendedVniEgress, ExtendedVniIngress};
70pub use http_request::HttpRequest;
71pub use jvm_runtime::JvmRuntime;
72pub use memcache_operation::MemcacheOperation;
73pub use raw_packet_header::RawPacketHeader;
74pub use sampled_ethernet::SampledEthernet;
75pub use sampled_ipv4::SampledIpv4;
76pub use sampled_ipv6::SampledIpv6;
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub enum FlowRecord {
84 RawPacketHeader(RawPacketHeader),
86 SampledEthernet(SampledEthernet),
88 SampledIpv4(SampledIpv4),
90 SampledIpv6(SampledIpv6),
92 ExtendedSwitch(ExtendedSwitch),
94 ExtendedRouter(ExtendedRouter),
96 ExtendedGateway(ExtendedGateway),
98 ExtendedUser(ExtendedUser),
100 ExtendedUrl(ExtendedUrl),
102 ExtendedMpls(ExtendedMpls),
104 ExtendedNat(ExtendedNat),
106 ExtendedMplsTunnel(ExtendedMplsTunnel),
108 ExtendedMplsVc(ExtendedMplsVc),
110 ExtendedMplsFtn(ExtendedMplsFtn),
112 ExtendedMplsLdpFec(ExtendedMplsLdpFec),
114 ExtendedVlanTunnel(ExtendedVlanTunnel),
116 Extended80211Payload(Extended80211Payload),
118 Extended80211Rx(Extended80211Rx),
120 Extended80211Tx(Extended80211Tx),
122 ExtendedL2TunnelEgress(SampledEthernet),
124 ExtendedL2TunnelIngress(SampledEthernet),
126 ExtendedIpv4TunnelEgress(SampledIpv4),
128 ExtendedIpv4TunnelIngress(SampledIpv4),
130 ExtendedIpv6TunnelEgress(SampledIpv6),
132 ExtendedIpv6TunnelIngress(SampledIpv6),
134 ExtendedDecapsulateEgress(ExtendedDecapsulateEgress),
136 ExtendedDecapsulateIngress(ExtendedDecapsulateIngress),
138 ExtendedVniEgress(ExtendedVniEgress),
140 ExtendedVniIngress(ExtendedVniIngress),
142 ExtendedEgressQueue(ExtendedEgressQueue),
144 ExtendedAcl(ExtendedAcl),
146 ExtendedFunction(ExtendedFunction),
148 ExtendedTransit(ExtendedTransit),
150 ExtendedQueue(ExtendedQueue),
152 ExtendedSocketIpv4(ExtendedSocketIpv4),
154 ExtendedSocketIpv6(ExtendedSocketIpv6),
156 ExtendedProxySocketIpv4(ExtendedProxySocketIpv4),
158 ExtendedProxySocketIpv6(ExtendedProxySocketIpv6),
160 JvmRuntime(JvmRuntime),
162 MemcacheOperation(MemcacheOperation),
164 AppOperation(AppOperation),
166 HttpRequest(HttpRequest),
168 ExtendedProxyRequest(ExtendedProxyRequest),
170 Unknown {
172 enterprise: u32,
174 format: u32,
176 data: Vec<u8>,
178 },
179}
180
181pub(crate) fn parse_sflow_string(input: &[u8]) -> IResult<&[u8], String> {
185 let (input, length) = be_u32(input)?;
186 let (input, bytes) = take(length as usize)(input)?;
187 let padding = (4 - (length as usize % 4)) % 4;
189 let (input, _) = take(padding)(input)?;
190 let s = String::from_utf8_lossy(bytes).into_owned();
191 Ok((input, s))
192}
193
194pub(crate) fn parse_flow_records(
195 mut input: &[u8],
196 num_records: u32,
197) -> IResult<&[u8], Vec<FlowRecord>> {
198 let cap = (num_records as usize).min(input.len() / 8);
200 let mut records = Vec::with_capacity(cap);
201
202 for _ in 0..num_records {
203 let (rest, data_format) = be_u32(input)?;
204 let enterprise = data_format >> 12;
205 let format = data_format & 0xFFF;
206
207 let (rest, record_length) = be_u32(rest)?;
208 let record_length = record_length as usize;
209
210 if rest.len() < record_length {
211 return Err(nom::Err::Error(nom::error::Error::new(
212 rest,
213 nom::error::ErrorKind::Eof,
214 )));
215 }
216
217 let record_data = &rest[..record_length];
218 let after_record = &rest[record_length..];
219
220 let record = if enterprise == 0 {
221 match format {
222 1 => {
223 let (_, r) = raw_packet_header::parse_raw_packet_header(record_data)?;
224 FlowRecord::RawPacketHeader(r)
225 }
226 2 => {
227 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
228 FlowRecord::SampledEthernet(r)
229 }
230 3 => {
231 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
232 FlowRecord::SampledIpv4(r)
233 }
234 4 => {
235 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
236 FlowRecord::SampledIpv6(r)
237 }
238 1001 => {
239 let (_, r) = extended_switch::parse_extended_switch(record_data)?;
240 FlowRecord::ExtendedSwitch(r)
241 }
242 1002 => {
243 let (_, r) = extended_router::parse_extended_router(record_data)?;
244 FlowRecord::ExtendedRouter(r)
245 }
246 1003 => {
247 let (_, r) = extended_gateway::parse_extended_gateway(record_data)?;
248 FlowRecord::ExtendedGateway(r)
249 }
250 1004 => {
251 let (_, r) = extended_user::parse_extended_user(record_data)?;
252 FlowRecord::ExtendedUser(r)
253 }
254 1005 => {
255 let (_, r) = extended_url::parse_extended_url(record_data)?;
256 FlowRecord::ExtendedUrl(r)
257 }
258 1006 => {
259 let (_, r) = extended_mpls::parse_extended_mpls(record_data)?;
260 FlowRecord::ExtendedMpls(r)
261 }
262 1007 => {
263 let (_, r) = extended_nat::parse_extended_nat(record_data)?;
264 FlowRecord::ExtendedNat(r)
265 }
266 1008 => {
267 let (_, r) = extended_mpls_tunnel::parse_extended_mpls_tunnel(record_data)?;
268 FlowRecord::ExtendedMplsTunnel(r)
269 }
270 1009 => {
271 let (_, r) = extended_mpls_vc::parse_extended_mpls_vc(record_data)?;
272 FlowRecord::ExtendedMplsVc(r)
273 }
274 1010 => {
275 let (_, r) = extended_mpls_ftn::parse_extended_mpls_ftn(record_data)?;
276 FlowRecord::ExtendedMplsFtn(r)
277 }
278 1011 => {
279 let (_, r) =
280 extended_mpls_ldp_fec::parse_extended_mpls_ldp_fec(record_data)?;
281 FlowRecord::ExtendedMplsLdpFec(r)
282 }
283 1012 => {
284 let (_, r) = extended_vlan_tunnel::parse_extended_vlan_tunnel(record_data)?;
285 FlowRecord::ExtendedVlanTunnel(r)
286 }
287 1013 => {
288 let (_, r) =
289 extended_80211_payload::parse_extended_80211_payload(record_data)?;
290 FlowRecord::Extended80211Payload(r)
291 }
292 1014 => {
293 let (_, r) = extended_80211_rx::parse_extended_80211_rx(record_data)?;
294 FlowRecord::Extended80211Rx(r)
295 }
296 1015 => {
297 let (_, r) = extended_80211_tx::parse_extended_80211_tx(record_data)?;
298 FlowRecord::Extended80211Tx(r)
299 }
300 1021 => {
301 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
302 FlowRecord::ExtendedL2TunnelEgress(r)
303 }
304 1022 => {
305 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
306 FlowRecord::ExtendedL2TunnelIngress(r)
307 }
308 1023 => {
309 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
310 FlowRecord::ExtendedIpv4TunnelEgress(r)
311 }
312 1024 => {
313 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
314 FlowRecord::ExtendedIpv4TunnelIngress(r)
315 }
316 1025 => {
317 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
318 FlowRecord::ExtendedIpv6TunnelEgress(r)
319 }
320 1026 => {
321 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
322 FlowRecord::ExtendedIpv6TunnelIngress(r)
323 }
324 1027 => {
325 let (_, r) =
326 extended_decapsulate::parse_extended_decapsulate_egress(record_data)?;
327 FlowRecord::ExtendedDecapsulateEgress(r)
328 }
329 1028 => {
330 let (_, r) =
331 extended_decapsulate::parse_extended_decapsulate_ingress(record_data)?;
332 FlowRecord::ExtendedDecapsulateIngress(r)
333 }
334 1029 => {
335 let (_, r) = extended_vni::parse_extended_vni_egress(record_data)?;
336 FlowRecord::ExtendedVniEgress(r)
337 }
338 1030 => {
339 let (_, r) = extended_vni::parse_extended_vni_ingress(record_data)?;
340 FlowRecord::ExtendedVniIngress(r)
341 }
342 1036 => {
343 let (_, r) =
344 extended_egress_queue::parse_extended_egress_queue(record_data)?;
345 FlowRecord::ExtendedEgressQueue(r)
346 }
347 1037 => {
348 let (_, r) = extended_acl::parse_extended_acl(record_data)?;
349 FlowRecord::ExtendedAcl(r)
350 }
351 1038 => {
352 let (_, r) = extended_function::parse_extended_function(record_data)?;
353 FlowRecord::ExtendedFunction(r)
354 }
355 1039 => {
356 let (_, r) = extended_transit::parse_extended_transit(record_data)?;
357 FlowRecord::ExtendedTransit(r)
358 }
359 1040 => {
360 let (_, r) = extended_queue::parse_extended_queue(record_data)?;
361 FlowRecord::ExtendedQueue(r)
362 }
363 2100 => {
364 let (_, r) = extended_socket_ipv4::parse_extended_socket_ipv4(record_data)?;
365 FlowRecord::ExtendedSocketIpv4(r)
366 }
367 2101 => {
368 let (_, r) = extended_socket_ipv6::parse_extended_socket_ipv6(record_data)?;
369 FlowRecord::ExtendedSocketIpv6(r)
370 }
371 2102 => {
372 let (_, r) = extended_proxy_socket_ipv4::parse_extended_proxy_socket_ipv4(
373 record_data,
374 )?;
375 FlowRecord::ExtendedProxySocketIpv4(r)
376 }
377 2103 => {
378 let (_, r) = extended_proxy_socket_ipv6::parse_extended_proxy_socket_ipv6(
379 record_data,
380 )?;
381 FlowRecord::ExtendedProxySocketIpv6(r)
382 }
383 2105 => {
384 let (_, r) = jvm_runtime::parse_jvm_runtime(record_data)?;
385 FlowRecord::JvmRuntime(r)
386 }
387 2200 => {
388 let (_, r) = memcache_operation::parse_memcache_operation(record_data)?;
389 FlowRecord::MemcacheOperation(r)
390 }
391 2202 => {
392 let (_, r) = app_operation::parse_app_operation(record_data)?;
393 FlowRecord::AppOperation(r)
394 }
395 2206 => {
396 let (_, r) = http_request::parse_http_request(record_data)?;
397 FlowRecord::HttpRequest(r)
398 }
399 2207 => {
400 let (_, r) =
401 extended_proxy_request::parse_extended_proxy_request(record_data)?;
402 FlowRecord::ExtendedProxyRequest(r)
403 }
404 _ => FlowRecord::Unknown {
405 enterprise,
406 format,
407 data: record_data.to_vec(),
408 },
409 }
410 } else {
411 FlowRecord::Unknown {
412 enterprise,
413 format,
414 data: record_data.to_vec(),
415 }
416 };
417
418 records.push(record);
419 input = after_record;
420 }
421
422 Ok((input, records))
423}