flowparser_sflow/flow_records/
mod.rs1pub mod extended_gateway;
2pub mod extended_router;
3pub mod extended_switch;
4pub mod extended_url;
5pub mod extended_user;
6pub mod raw_packet_header;
7pub mod sampled_ethernet;
8pub mod sampled_ipv4;
9pub mod sampled_ipv6;
10
11use nom::IResult;
12use nom::bytes::complete::take;
13use nom::number::complete::be_u32;
14use serde::{Deserialize, Serialize};
15
16pub use extended_gateway::ExtendedGateway;
17pub use extended_router::ExtendedRouter;
18pub use extended_switch::ExtendedSwitch;
19pub use extended_url::ExtendedUrl;
20pub use extended_user::ExtendedUser;
21pub use raw_packet_header::RawPacketHeader;
22pub use sampled_ethernet::SampledEthernet;
23pub use sampled_ipv4::SampledIpv4;
24pub use sampled_ipv6::SampledIpv6;
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub enum FlowRecord {
32 RawPacketHeader(RawPacketHeader),
34 SampledEthernet(SampledEthernet),
36 SampledIpv4(SampledIpv4),
38 SampledIpv6(SampledIpv6),
40 ExtendedSwitch(ExtendedSwitch),
42 ExtendedRouter(ExtendedRouter),
44 ExtendedGateway(ExtendedGateway),
46 ExtendedUser(ExtendedUser),
48 ExtendedUrl(ExtendedUrl),
50 Unknown {
52 enterprise: u32,
54 format: u32,
56 data: Vec<u8>,
58 },
59}
60
61pub(crate) fn parse_sflow_string(input: &[u8]) -> IResult<&[u8], String> {
65 let (input, length) = be_u32(input)?;
66 let (input, bytes) = take(length as usize)(input)?;
67 let padding = (4 - (length as usize % 4)) % 4;
69 let (input, _) = take(padding)(input)?;
70 let s = String::from_utf8_lossy(bytes).into_owned();
71 Ok((input, s))
72}
73
74pub(crate) fn parse_flow_records(
75 mut input: &[u8],
76 num_records: u32,
77) -> IResult<&[u8], Vec<FlowRecord>> {
78 let cap = (num_records as usize).min(input.len() / 8);
80 let mut records = Vec::with_capacity(cap);
81
82 for _ in 0..num_records {
83 let (rest, data_format) = be_u32(input)?;
84 let enterprise = data_format >> 12;
85 let format = data_format & 0xFFF;
86
87 let (rest, record_length) = be_u32(rest)?;
88 let record_length = record_length as usize;
89
90 if rest.len() < record_length {
91 return Err(nom::Err::Error(nom::error::Error::new(
92 rest,
93 nom::error::ErrorKind::Eof,
94 )));
95 }
96
97 let record_data = &rest[..record_length];
98 let after_record = &rest[record_length..];
99
100 let record = if enterprise == 0 {
101 match format {
102 1 => {
103 let (_, r) = raw_packet_header::parse_raw_packet_header(record_data)?;
104 FlowRecord::RawPacketHeader(r)
105 }
106 2 => {
107 let (_, r) = sampled_ethernet::parse_sampled_ethernet(record_data)?;
108 FlowRecord::SampledEthernet(r)
109 }
110 3 => {
111 let (_, r) = sampled_ipv4::parse_sampled_ipv4(record_data)?;
112 FlowRecord::SampledIpv4(r)
113 }
114 4 => {
115 let (_, r) = sampled_ipv6::parse_sampled_ipv6(record_data)?;
116 FlowRecord::SampledIpv6(r)
117 }
118 1001 => {
119 let (_, r) = extended_switch::parse_extended_switch(record_data)?;
120 FlowRecord::ExtendedSwitch(r)
121 }
122 1002 => {
123 let (_, r) = extended_router::parse_extended_router(record_data)?;
124 FlowRecord::ExtendedRouter(r)
125 }
126 1003 => {
127 let (_, r) = extended_gateway::parse_extended_gateway(record_data)?;
128 FlowRecord::ExtendedGateway(r)
129 }
130 1004 => {
131 let (_, r) = extended_user::parse_extended_user(record_data)?;
132 FlowRecord::ExtendedUser(r)
133 }
134 1005 => {
135 let (_, r) = extended_url::parse_extended_url(record_data)?;
136 FlowRecord::ExtendedUrl(r)
137 }
138 _ => FlowRecord::Unknown {
139 enterprise,
140 format,
141 data: record_data.to_vec(),
142 },
143 }
144 } else {
145 FlowRecord::Unknown {
146 enterprise,
147 format,
148 data: record_data.to_vec(),
149 }
150 };
151
152 records.push(record);
153 input = after_record;
154 }
155
156 Ok((input, records))
157}