Skip to main content

netgauze_pcap_decoder/handlers/
flow.rs

1// Copyright (C) 2025-present The NetGauze Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12// implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::{decode_buffer, serialize_error, serialize_success};
17use crate::protocol_handler::{DecodeOutcome, ProtocolHandler};
18use bytes::BytesMut;
19use netgauze_flow_pkt::FlowInfo;
20use netgauze_flow_pkt::codec::{FlowInfoCodec, FlowInfoCodecDecoderError};
21use netgauze_pcap_reader::TransportProtocol;
22use std::collections::HashMap;
23use std::io;
24use std::net::IpAddr;
25
26pub struct FlowProtocolHandler {
27    ports: Vec<u16>,
28}
29
30impl FlowProtocolHandler {
31    pub fn new(ports: Vec<u16>) -> Self {
32        FlowProtocolHandler { ports }
33    }
34}
35
36impl ProtocolHandler<FlowInfo, FlowInfoCodec, FlowInfoCodecDecoderError> for FlowProtocolHandler {
37    fn decode(
38        &self,
39        flow_key: (IpAddr, u16, IpAddr, u16),
40        protocol: TransportProtocol,
41        packet_data: &[u8],
42        exporter_peers: &mut HashMap<(IpAddr, u16, IpAddr, u16), (FlowInfoCodec, BytesMut)>,
43    ) -> Option<Vec<DecodeOutcome<FlowInfo, FlowInfoCodecDecoderError>>> {
44        let dst_port: u16 = flow_key.3;
45
46        if protocol == TransportProtocol::UDP && self.ports.contains(&dst_port) {
47            let (codec, buffer) = exporter_peers
48                .entry(flow_key)
49                .or_insert((FlowInfoCodec::default(), BytesMut::new()));
50            buffer.extend_from_slice(packet_data);
51
52            let mut results = Vec::new();
53            decode_buffer(buffer, codec, flow_key, &mut results);
54            if !results.is_empty() {
55                return Some(results);
56            }
57        }
58        None
59    }
60
61    fn serialize(
62        &self,
63        decode_outcome: DecodeOutcome<FlowInfo, FlowInfoCodecDecoderError>,
64    ) -> io::Result<serde_json::Value> {
65        match decode_outcome {
66            DecodeOutcome::Success(m) => {
67                let (flow_key, flow_info) = m;
68                serialize_success(flow_key, flow_info)
69            }
70            DecodeOutcome::Error(m) => serialize_error(m),
71        }
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use chrono::{TimeZone, Utc};
79    use ipfix::IpfixPacket;
80    use netgauze_flow_pkt::ie::{Field, IE};
81    use netgauze_flow_pkt::ipfix::{DataRecord, OptionsTemplateRecord, Set};
82    use netgauze_flow_pkt::wire::deserializer::ipfix::IpfixPacketParsingError;
83    use netgauze_flow_pkt::{DataSetId, FlowInfo, ipfix};
84    use serde_json::json;
85    use std::net::Ipv4Addr;
86
87    #[test]
88    fn test_flow_handler_decode_success() {
89        let handler = FlowProtocolHandler::new(vec![9991]);
90        let flow_key = (
91            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
92            12345,
93            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
94            9991,
95        );
96        // A simple IPFIX options template
97        let packet_data = [
98            0x00, 0x0a, 0x00, 0x24, 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00,
99            0x82, 0x20, 0x00, 0x03, 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95,
100            0x00, 0x04, 0x00, 0xa0, 0x00, 0x08, 0x00, 0x00,
101        ];
102        let mut exporter_peers = HashMap::new();
103
104        let result = handler.decode(
105            flow_key,
106            TransportProtocol::UDP,
107            &packet_data,
108            &mut exporter_peers,
109        );
110
111        assert_eq!(
112            result,
113            Some(vec![DecodeOutcome::Success((
114                flow_key,
115                FlowInfo::IPFIX(IpfixPacket::new(
116                    Utc.with_ymd_and_hms(2024, 1, 12, 14, 40, 22).unwrap(),
117                    2494624,
118                    33312,
119                    Box::new([Set::OptionsTemplate(Box::new([
120                        OptionsTemplateRecord::new(
121                            338,
122                            Box::new([netgauze_flow_pkt::FieldSpecifier::new(
123                                IE::observationDomainId,
124                                4
125                            )
126                            .unwrap()]),
127                            Box::new([netgauze_flow_pkt::FieldSpecifier::new(
128                                IE::systemInitTimeMilliseconds,
129                                8
130                            )
131                            .unwrap()]),
132                        ),
133                    ]))]),
134                ))
135            ))]),
136        );
137        // Now we should have an empty buffer for this flow key
138        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
139    }
140
141    #[test]
142    fn test_flow_handler_decode_fragmented_success() {
143        let handler = FlowProtocolHandler::new(vec![9991]);
144        let flow_key = (
145            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
146            12345,
147            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
148            9991,
149        );
150        let packet_data1 = &[0x00, 0x0a, 0x00, 0x24];
151        let packet_data2 = &[
152            0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00, 0x82, 0x20, 0x00, 0x03,
153            0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95, 0x00, 0x04, 0x00, 0xa0,
154            0x00, 0x08, 0x00, 0x00,
155        ];
156        let mut exporter_peers = HashMap::new();
157
158        let result1 = handler.decode(
159            flow_key,
160            TransportProtocol::UDP,
161            packet_data1,
162            &mut exporter_peers,
163        );
164        assert!(result1.is_none());
165        // The buffer for this flow key should now contain the first part, so not empty
166        assert!(!exporter_peers.get(&flow_key).unwrap().1.is_empty());
167
168        // Second packet completes it
169        let result2 = handler.decode(
170            flow_key,
171            TransportProtocol::UDP,
172            packet_data2,
173            &mut exporter_peers,
174        );
175
176        assert_eq!(
177            result2,
178            Some(vec![DecodeOutcome::Success((
179                flow_key,
180                FlowInfo::IPFIX(IpfixPacket::new(
181                    Utc.with_ymd_and_hms(2024, 1, 12, 14, 40, 22).unwrap(),
182                    2494624,
183                    33312,
184                    Box::new([Set::OptionsTemplate(Box::new([
185                        OptionsTemplateRecord::new(
186                            338,
187                            Box::new([netgauze_flow_pkt::FieldSpecifier::new(
188                                IE::observationDomainId,
189                                4
190                            )
191                            .unwrap()]),
192                            Box::new([netgauze_flow_pkt::FieldSpecifier::new(
193                                IE::systemInitTimeMilliseconds,
194                                8
195                            )
196                            .unwrap()]),
197                        ),
198                    ]))]),
199                ))
200            ))]),
201        );
202        // Now we should have an empty buffer for this flow key
203        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
204    }
205
206    #[test]
207    fn test_flow_handler_decode_multiple_messages_success() {
208        let handler = FlowProtocolHandler::new(vec![9991]);
209        let flow_key = (
210            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
211            12345,
212            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
213            9991,
214        );
215        // Two simple IPFIX options template
216        let packet_data = [
217            0x00, 0x0a, 0x00, 0x24, 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00,
218            0x82, 0x20, 0x00, 0x03, 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95,
219            0x00, 0x04, 0x00, 0xa0, 0x00, 0x08, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x24, 0x65, 0xa1,
220            0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00, 0x82, 0x20, 0x00, 0x03, 0x00, 0x14,
221            0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95, 0x00, 0x04, 0x00, 0xa0, 0x00, 0x08,
222            0x00, 0x00,
223        ];
224        let mut exporter_peers = HashMap::new();
225
226        let result = handler.decode(
227            flow_key,
228            TransportProtocol::UDP,
229            &packet_data,
230            &mut exporter_peers,
231        );
232
233        let expected_flow = FlowInfo::IPFIX(IpfixPacket::new(
234            Utc.with_ymd_and_hms(2024, 1, 12, 14, 40, 22).unwrap(),
235            2494624,
236            33312,
237            Box::new([Set::OptionsTemplate(Box::new([
238                OptionsTemplateRecord::new(
239                    338,
240                    Box::new([
241                        netgauze_flow_pkt::FieldSpecifier::new(IE::observationDomainId, 4).unwrap(),
242                    ]),
243                    Box::new([netgauze_flow_pkt::FieldSpecifier::new(
244                        IE::systemInitTimeMilliseconds,
245                        8,
246                    )
247                    .unwrap()]),
248                ),
249            ]))]),
250        ));
251        assert_eq!(
252            result,
253            Some(vec![
254                DecodeOutcome::Success((flow_key, expected_flow.clone())),
255                DecodeOutcome::Success((flow_key, expected_flow))
256            ]),
257        );
258
259        // Now we should have an empty buffer for this flow key
260        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
261    }
262
263    #[test]
264    fn test_flow_handler_decode_failure() {
265        let handler = FlowProtocolHandler::new(vec![9991]);
266        let flow_key = (
267            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
268            12345,
269            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
270            9991,
271        );
272        // Invalid flow packet, version not supported (0x03)
273        let packet_data = [
274            0x00, 0x03, 0x00, 0x24, 0x65, 0xa1, 0x4f, 0x56, 0x00, 0x26, 0x10, 0xa0, 0x00, 0x00,
275            0x82, 0x20, 0x00, 0x03, 0x00, 0x14, 0x01, 0x52, 0x00, 0x02, 0x00, 0x01, 0x00, 0x95,
276            0x00, 0x04, 0x00, 0xa0, 0x00, 0x08, 0x00, 0x00,
277        ];
278        let mut exporter_peers = HashMap::new();
279
280        let result = handler.decode(
281            flow_key,
282            TransportProtocol::UDP,
283            &packet_data,
284            &mut exporter_peers,
285        );
286
287        assert_eq!(
288            result,
289            Some(vec![DecodeOutcome::Error(
290                FlowInfoCodecDecoderError::UnsupportedVersion(3)
291            )]),
292        );
293        // Now we should have an empty buffer for this flow key
294        assert!(exporter_peers.get(&flow_key).unwrap().1.is_empty());
295    }
296
297    #[test]
298    fn test_flow_handler_decode_ignore_wrong_port() {
299        let handler = FlowProtocolHandler::new(vec![9991]);
300        let flow_key = (
301            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
302            12345,
303            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
304            123, // Wrong port
305        );
306        let packet_data = [0xff; 20];
307        let mut exporter_peers = HashMap::new();
308
309        let result = handler.decode(
310            flow_key,
311            TransportProtocol::TCP,
312            &packet_data,
313            &mut exporter_peers,
314        );
315
316        assert!(result.is_none());
317    }
318
319    #[test]
320    fn test_bgp_handler_decode_ignore_wrong_protocol() {
321        let handler = FlowProtocolHandler::new(vec![9991]);
322        let flow_key = (
323            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
324            12345,
325            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
326            9991,
327        );
328        let packet_data = [0xff; 20];
329        let mut exporter_peers = HashMap::new();
330
331        let result = handler.decode(
332            flow_key,
333            TransportProtocol::TCP, // Wrong protocol
334            &packet_data,
335            &mut exporter_peers,
336        );
337
338        assert!(result.is_none());
339    }
340
341    #[test]
342    fn test_flow_handler_serialize_success() {
343        let handler = FlowProtocolHandler::new(vec![9991]);
344        let flow_key = (
345            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
346            12345,
347            IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
348            9991,
349        );
350        let fields = vec![
351            Field::sourceIPv4Address(Ipv4Addr::new(10, 0, 0, 1)),
352            Field::octetDeltaCount(300),
353        ];
354        let record = DataRecord::new(Box::new([]), fields.into_boxed_slice());
355        let set = Set::Data {
356            id: DataSetId::new(600).unwrap(),
357            records: Box::new([record]),
358        };
359        let export_time = Utc.with_ymd_and_hms(2025, 1, 1, 18, 0, 0).unwrap();
360        let ipfix_pkt = IpfixPacket::new(export_time, 15, 400, Box::new([set]));
361        let ipfix_message = FlowInfo::IPFIX(ipfix_pkt);
362        let outcome = DecodeOutcome::Success((flow_key, ipfix_message));
363
364        let result = handler.serialize(outcome);
365
366        assert!(result.is_ok());
367        let json = result.unwrap();
368        let expected = json!({
369          "source_address": "10.0.0.1:12345",
370          "destination_address": "10.0.0.2:9991",
371          "info": {
372            "IPFIX": {
373              "version": 10,
374              "export_time": "2025-01-01T18:00:00Z",
375              "sequence_number": 15,
376              "observation_domain_id": 400,
377              "sets": [
378                {
379                  "Data": {
380                    "id": 600,
381                    "records": [
382                      {
383                        "scope_fields": [],
384                        "fields": [
385                          {
386                            "sourceIPv4Address": "10.0.0.1"
387                          },
388                          {
389                            "octetDeltaCount": 300
390                          }
391                        ]
392                      }
393                    ]
394                  }
395                }
396              ]
397            }
398          }
399        });
400        assert_eq!(json, expected);
401    }
402
403    #[test]
404    fn test_flow_handler_serialize_error() {
405        let handler = FlowProtocolHandler::new(vec![9991]);
406        let error = FlowInfoCodecDecoderError::IpfixParsingError(
407            IpfixPacketParsingError::InvalidLength(10),
408        );
409        let outcome = DecodeOutcome::Error(error);
410
411        let result = handler.serialize(outcome);
412
413        assert!(result.is_ok());
414        let json = result.unwrap();
415        let expected = json!({
416            "IpfixParsingError": {
417                "InvalidLength": 10
418            }
419        });
420        assert_eq!(json, expected);
421    }
422}