1use super::{decode_buffer, serialize_error, serialize_success};
17use crate::protocol_handler::{DecodeOutcome, ProtocolHandler};
18use bytes::BytesMut;
19use netgauze_flow_pkt::FlowInfo;
20use netgauze_flow_pkt::codec::{FlowInfoCodec, FlowInfoCodecDecoderError};
21use netgauze_pcap_reader::TransportProtocol;
22use std::collections::HashMap;
23use std::io;
24use std::net::IpAddr;
25
26pub struct FlowProtocolHandler {
27 ports: Vec<u16>,
28}
29
30impl FlowProtocolHandler {
31 pub fn new(ports: Vec<u16>) -> Self {
32 FlowProtocolHandler { ports }
33 }
34}
35
36impl ProtocolHandler<FlowInfo, FlowInfoCodec, FlowInfoCodecDecoderError> for FlowProtocolHandler {
37 fn decode(
38 &self,
39 flow_key: (IpAddr, u16, IpAddr, u16),
40 protocol: TransportProtocol,
41 packet_data: &[u8],
42 exporter_peers: &mut HashMap<(IpAddr, u16, IpAddr, u16), (FlowInfoCodec, BytesMut)>,
43 ) -> Option<Vec<DecodeOutcome<FlowInfo, FlowInfoCodecDecoderError>>> {
44 let dst_port: u16 = flow_key.3;
45
46 if protocol == TransportProtocol::UDP && self.ports.contains(&dst_port) {
47 let (codec, buffer) = exporter_peers
48 .entry(flow_key)
49 .or_insert((FlowInfoCodec::default(), BytesMut::new()));
50 buffer.extend_from_slice(packet_data);
51
52 let mut results = Vec::new();
53 decode_buffer(buffer, codec, flow_key, &mut results);
54 if !results.is_empty() {
55 return Some(results);
56 }
57 }
58 None
59 }
60
61 fn serialize(
62 &self,
63 decode_outcome: DecodeOutcome<FlowInfo, FlowInfoCodecDecoderError>,
64 ) -> io::Result<serde_json::Value> {
65 match decode_outcome {
66 DecodeOutcome::Success(m) => {
67 let (flow_key, flow_info) = m;
68 serialize_success(flow_key, flow_info)
69 }
70 DecodeOutcome::Error(m) => serialize_error(m),
71 }
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use chrono::{TimeZone, Utc};
79 use ipfix::IpfixPacket;
80 use netgauze_flow_pkt::ie::{Field, IE};
81 use netgauze_flow_pkt::ipfix::{DataRecord, OptionsTemplateRecord, Set};
82 use netgauze_flow_pkt::wire::deserializer::ipfix::IpfixPacketParsingError;
83 use netgauze_flow_pkt::{DataSetId, FlowInfo, ipfix};
84 use serde_json::json;
85 use std::net::Ipv4Addr;
86
87 #[test]
88 fn test_flow_handler_decode_success() {
89 let handler = FlowProtocolHandler::new(vec![9991]);
90 let flow_key = (
91 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
92 12345,
93 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
94 9991,
95 );
96 let packet_data = [
98 0x00, 0x0a, 0x00, 0x24, 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00,
99 0x82, 0x20, 0x00, 0x03, 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95,
100 0x00, 0x04, 0x00, 0xa0, 0x00, 0x08, 0x00, 0x00,
101 ];
102 let mut exporter_peers = HashMap::new();
103
104 let result = handler.decode(
105 flow_key,
106 TransportProtocol::UDP,
107 &packet_data,
108 &mut exporter_peers,
109 );
110
111 assert_eq!(
112 result,
113 Some(vec![DecodeOutcome::Success((
114 flow_key,
115 FlowInfo::IPFIX(IpfixPacket::new(
116 Utc.with_ymd_and_hms(2024, 1, 12, 14, 40, 22).unwrap(),
117 2494624,
118 33312,
119 Box::new([Set::OptionsTemplate(Box::new([
120 OptionsTemplateRecord::new(
121 338,
122 Box::new([netgauze_flow_pkt::FieldSpecifier::new(
123 IE::observationDomainId,
124 4
125 )
126 .unwrap()]),
127 Box::new([netgauze_flow_pkt::FieldSpecifier::new(
128 IE::systemInitTimeMilliseconds,
129 8
130 )
131 .unwrap()]),
132 ),
133 ]))]),
134 ))
135 ))]),
136 );
137 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
139 }
140
141 #[test]
142 fn test_flow_handler_decode_fragmented_success() {
143 let handler = FlowProtocolHandler::new(vec![9991]);
144 let flow_key = (
145 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
146 12345,
147 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
148 9991,
149 );
150 let packet_data1 = &[0x00, 0x0a, 0x00, 0x24];
151 let packet_data2 = &[
152 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00, 0x82, 0x20, 0x00, 0x03,
153 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95, 0x00, 0x04, 0x00, 0xa0,
154 0x00, 0x08, 0x00, 0x00,
155 ];
156 let mut exporter_peers = HashMap::new();
157
158 let result1 = handler.decode(
159 flow_key,
160 TransportProtocol::UDP,
161 packet_data1,
162 &mut exporter_peers,
163 );
164 assert!(result1.is_none());
165 assert!(!exporter_peers.get(&flow_key).unwrap().1.is_empty());
167
168 let result2 = handler.decode(
170 flow_key,
171 TransportProtocol::UDP,
172 packet_data2,
173 &mut exporter_peers,
174 );
175
176 assert_eq!(
177 result2,
178 Some(vec![DecodeOutcome::Success((
179 flow_key,
180 FlowInfo::IPFIX(IpfixPacket::new(
181 Utc.with_ymd_and_hms(2024, 1, 12, 14, 40, 22).unwrap(),
182 2494624,
183 33312,
184 Box::new([Set::OptionsTemplate(Box::new([
185 OptionsTemplateRecord::new(
186 338,
187 Box::new([netgauze_flow_pkt::FieldSpecifier::new(
188 IE::observationDomainId,
189 4
190 )
191 .unwrap()]),
192 Box::new([netgauze_flow_pkt::FieldSpecifier::new(
193 IE::systemInitTimeMilliseconds,
194 8
195 )
196 .unwrap()]),
197 ),
198 ]))]),
199 ))
200 ))]),
201 );
202 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
204 }
205
206 #[test]
207 fn test_flow_handler_decode_multiple_messages_success() {
208 let handler = FlowProtocolHandler::new(vec![9991]);
209 let flow_key = (
210 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
211 12345,
212 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
213 9991,
214 );
215 let packet_data = [
217 0x00, 0x0a, 0x00, 0x24, 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00,
218 0x82, 0x20, 0x00, 0x03, 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95,
219 0x00, 0x04, 0x00, 0xa0, 0x00, 0x08, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x24, 0x65, 0xa1,
220 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00, 0x82, 0x20, 0x00, 0x03, 0x00, 0x14,
221 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95, 0x00, 0x04, 0x00, 0xa0, 0x00, 0x08,
222 0x00, 0x00,
223 ];
224 let mut exporter_peers = HashMap::new();
225
226 let result = handler.decode(
227 flow_key,
228 TransportProtocol::UDP,
229 &packet_data,
230 &mut exporter_peers,
231 );
232
233 let expected_flow = FlowInfo::IPFIX(IpfixPacket::new(
234 Utc.with_ymd_and_hms(2024, 1, 12, 14, 40, 22).unwrap(),
235 2494624,
236 33312,
237 Box::new([Set::OptionsTemplate(Box::new([
238 OptionsTemplateRecord::new(
239 338,
240 Box::new([
241 netgauze_flow_pkt::FieldSpecifier::new(IE::observationDomainId, 4).unwrap(),
242 ]),
243 Box::new([netgauze_flow_pkt::FieldSpecifier::new(
244 IE::systemInitTimeMilliseconds,
245 8,
246 )
247 .unwrap()]),
248 ),
249 ]))]),
250 ));
251 assert_eq!(
252 result,
253 Some(vec![
254 DecodeOutcome::Success((flow_key, expected_flow.clone())),
255 DecodeOutcome::Success((flow_key, expected_flow))
256 ]),
257 );
258
259 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
261 }
262
263 #[test]
264 fn test_flow_handler_decode_failure() {
265 let handler = FlowProtocolHandler::new(vec![9991]);
266 let flow_key = (
267 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
268 12345,
269 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
270 9991,
271 );
272 let packet_data = [
274 0x00, 0x03, 0x00, 0x24, 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00,
275 0x82, 0x20, 0x00, 0x03, 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95,
276 0x00, 0x04, 0x00, 0xa0, 0x00, 0x08, 0x00, 0x00,
277 ];
278 let mut exporter_peers = HashMap::new();
279
280 let result = handler.decode(
281 flow_key,
282 TransportProtocol::UDP,
283 &packet_data,
284 &mut exporter_peers,
285 );
286
287 assert_eq!(
288 result,
289 Some(vec![DecodeOutcome::Error(
290 FlowInfoCodecDecoderError::UnsupportedVersion(3)
291 )]),
292 );
293 assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
295 }
296
297 #[test]
298 fn test_flow_handler_decode_ignore_wrong_port() {
299 let handler = FlowProtocolHandler::new(vec![9991]);
300 let flow_key = (
301 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
302 12345,
303 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
304 123, );
306 let packet_data = [0xff; 20];
307 let mut exporter_peers = HashMap::new();
308
309 let result = handler.decode(
310 flow_key,
311 TransportProtocol::TCP,
312 &packet_data,
313 &mut exporter_peers,
314 );
315
316 assert!(result.is_none());
317 }
318
319 #[test]
320 fn test_bgp_handler_decode_ignore_wrong_protocol() {
321 let handler = FlowProtocolHandler::new(vec![9991]);
322 let flow_key = (
323 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
324 12345,
325 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
326 9991,
327 );
328 let packet_data = [0xff; 20];
329 let mut exporter_peers = HashMap::new();
330
331 let result = handler.decode(
332 flow_key,
333 TransportProtocol::TCP, &packet_data,
335 &mut exporter_peers,
336 );
337
338 assert!(result.is_none());
339 }
340
341 #[test]
342 fn test_flow_handler_serialize_success() {
343 let handler = FlowProtocolHandler::new(vec![9991]);
344 let flow_key = (
345 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
346 12345,
347 IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
348 9991,
349 );
350 let fields = vec![
351 Field::sourceIPv4Address(Ipv4Addr::new(10, 0, 0, 1)),
352 Field::octetDeltaCount(300),
353 ];
354 let record = DataRecord::new(Box::new([]), fields.into_boxed_slice());
355 let set = Set::Data {
356 id: DataSetId::new(600).unwrap(),
357 records: Box::new([record]),
358 };
359 let export_time = Utc.with_ymd_and_hms(2025, 1, 1, 18, 0, 0).unwrap();
360 let ipfix_pkt = IpfixPacket::new(export_time, 15, 400, Box::new([set]));
361 let ipfix_message = FlowInfo::IPFIX(ipfix_pkt);
362 let outcome = DecodeOutcome::Success((flow_key, ipfix_message));
363
364 let result = handler.serialize(outcome);
365
366 assert!(result.is_ok());
367 let json = result.unwrap();
368 let expected = json!({
369 "source_address": "10.0.0.1:12345",
370 "destination_address": "10.0.0.2:9991",
371 "info": {
372 "IPFIX": {
373 "version": 10,
374 "export_time": "2025-01-01T18:00:00Z",
375 "sequence_number": 15,
376 "observation_domain_id": 400,
377 "sets": [
378 {
379 "Data": {
380 "id": 600,
381 "records": [
382 {
383 "scope_fields": [],
384 "fields": [
385 {
386 "sourceIPv4Address": "10.0.0.1"
387 },
388 {
389 "octetDeltaCount": 300
390 }
391 ]
392 }
393 ]
394 }
395 }
396 ]
397 }
398 }
399 });
400 assert_eq!(json, expected);
401 }
402
403 #[test]
404 fn test_flow_handler_serialize_error() {
405 let handler = FlowProtocolHandler::new(vec![9991]);
406 let error = FlowInfoCodecDecoderError::IpfixParsingError(
407 IpfixPacketParsingError::InvalidLength(10),
408 );
409 let outcome = DecodeOutcome::Error(error);
410
411 let result = handler.serialize(outcome);
412
413 assert!(result.is_ok());
414 let json = result.unwrap();
415 let expected = json!({
416 "IpfixParsingError": {
417 "InvalidLength": 10
418 }
419 });
420 assert_eq!(json, expected);
421 }
422}