netflow_parser/
netflow_common.rs

1use std::collections::BTreeMap;
2use std::net::IpAddr;
3
4use crate::NetflowPacket;
5use crate::protocol::ProtocolTypes;
6use crate::static_versions::{v5::V5, v7::V7};
7use crate::variable_versions::data_number::FieldValue;
8use crate::variable_versions::ipfix_lookup::IPFixField;
9use crate::variable_versions::v9_lookup::V9Field;
10use crate::variable_versions::{
11    ipfix::{FlowSetBody as IPFixFlowSetBody, IPFix},
12    v9::{FlowSetBody as V9FlowSetBody, V9},
13};
14
15#[derive(Debug)]
16pub enum NetflowCommonError {
17    UnknownVersion(NetflowPacket),
18}
19
20#[derive(Debug, Default)]
21/// Common structure for Netflow
22pub struct NetflowCommon {
23    pub version: u16,
24    pub timestamp: u32,
25    pub flowsets: Vec<NetflowCommonFlowSet>,
26}
27
28impl TryFrom<&NetflowPacket> for NetflowCommon {
29    type Error = NetflowCommonError;
30
31    fn try_from(value: &NetflowPacket) -> Result<Self, NetflowCommonError> {
32        match value {
33            NetflowPacket::V5(v5) => Ok(v5.into()),
34            NetflowPacket::V7(v7) => Ok(v7.into()),
35            NetflowPacket::V9(v9) => Ok(v9.into()),
36            NetflowPacket::IPFix(ipfix) => Ok(ipfix.into()),
37            _ => Err(NetflowCommonError::UnknownVersion(value.clone())),
38        }
39    }
40}
41
42#[derive(Debug, Default)]
43/// Common flow set structure for Netflow
44pub struct NetflowCommonFlowSet {
45    /// Source IP address
46    pub src_addr: Option<IpAddr>,
47    /// Destination IP address
48    pub dst_addr: Option<IpAddr>,
49    /// TCP/UDP source port number or equivalent
50    pub src_port: Option<u16>,
51    /// TCP/UDP destination port number or equivalent
52    pub dst_port: Option<u16>,
53    /// Number of IP protocol type (for example, TCP = 6; UDP = 17)
54    pub protocol_number: Option<u8>,
55    /// IP protocol type itself
56    pub protocol_type: Option<ProtocolTypes>,
57    /// Duration of the flow first
58    pub first_seen: Option<u32>,
59    /// Duration of the flow last
60    pub last_seen: Option<u32>,
61    /// Source MAC address
62    pub src_mac: Option<String>,
63    /// Destination MAC address
64    pub dst_mac: Option<String>,
65}
66
67impl From<&V5> for NetflowCommon {
68    fn from(value: &V5) -> Self {
69        // Convert V5 to NetflowCommon
70        NetflowCommon {
71            version: value.header.version,
72            timestamp: value.header.sys_up_time,
73            flowsets: value
74                .flowsets
75                .iter()
76                .map(|set| NetflowCommonFlowSet {
77                    src_addr: Some(set.src_addr.into()),
78                    dst_addr: Some(set.dst_addr.into()),
79                    src_port: Some(set.src_port),
80                    dst_port: Some(set.dst_port),
81                    protocol_number: Some(set.protocol_number),
82                    protocol_type: Some(set.protocol_type),
83                    first_seen: Some(set.first),
84                    last_seen: Some(set.last),
85                    src_mac: None,
86                    dst_mac: None,
87                })
88                .collect(),
89        }
90    }
91}
92
93impl From<&V7> for NetflowCommon {
94    fn from(value: &V7) -> Self {
95        // Convert V7 to NetflowCommon
96        NetflowCommon {
97            version: value.header.version,
98            timestamp: value.header.sys_up_time,
99            flowsets: value
100                .flowsets
101                .iter()
102                .map(|set| NetflowCommonFlowSet {
103                    src_addr: Some(set.src_addr.into()),
104                    dst_addr: Some(set.dst_addr.into()),
105                    src_port: Some(set.src_port),
106                    dst_port: Some(set.dst_port),
107                    protocol_number: Some(set.protocol_number),
108                    protocol_type: Some(set.protocol_type),
109                    first_seen: Some(set.first),
110                    last_seen: Some(set.last),
111                    src_mac: None,
112                    dst_mac: None,
113                })
114                .collect(),
115        }
116    }
117}
118
119impl From<&V9> for NetflowCommon {
120    fn from(value: &V9) -> Self {
121        // Convert V9 to NetflowCommon
122        let mut flowsets = vec![];
123
124        for flowset in &value.flowsets {
125            if let V9FlowSetBody::Data(data) = &flowset.body {
126                for data_field in &data.fields {
127                    let value_map: BTreeMap<V9Field, FieldValue> =
128                        data_field.values().cloned().collect();
129                    flowsets.push(NetflowCommonFlowSet {
130                        src_addr: value_map
131                            .get(&V9Field::Ipv4SrcAddr)
132                            .or_else(|| value_map.get(&V9Field::Ipv6SrcAddr))
133                            .and_then(|v| v.try_into().ok()),
134                        dst_addr: value_map
135                            .get(&V9Field::Ipv4DstAddr)
136                            .or_else(|| value_map.get(&V9Field::Ipv6DstAddr))
137                            .and_then(|v| v.try_into().ok()),
138                        src_port: value_map
139                            .get(&V9Field::L4SrcPort)
140                            .and_then(|v| v.try_into().ok()),
141                        dst_port: value_map
142                            .get(&V9Field::L4DstPort)
143                            .and_then(|v| v.try_into().ok()),
144                        protocol_number: value_map
145                            .get(&V9Field::Protocol)
146                            .and_then(|v| v.try_into().ok()),
147                        protocol_type: value_map.get(&V9Field::Protocol).and_then(|v| {
148                            v.try_into()
149                                .ok()
150                                .map(|proto: u8| ProtocolTypes::from(proto))
151                        }),
152                        first_seen: value_map
153                            .get(&V9Field::FirstSwitched)
154                            .and_then(|v| v.try_into().ok()),
155                        last_seen: value_map
156                            .get(&V9Field::LastSwitched)
157                            .and_then(|v| v.try_into().ok()),
158                        src_mac: value_map
159                            .get(&V9Field::InSrcMac)
160                            .and_then(|v| v.try_into().ok()),
161                        dst_mac: value_map
162                            .get(&V9Field::InDstMac)
163                            .and_then(|v| v.try_into().ok()),
164                    });
165                }
166            }
167        }
168
169        NetflowCommon {
170            version: value.header.version,
171            timestamp: value.header.sys_up_time,
172            flowsets,
173        }
174    }
175}
176
177impl From<&IPFix> for NetflowCommon {
178    fn from(value: &IPFix) -> Self {
179        // Convert IPFix to NetflowCommon
180
181        let mut flowsets = vec![];
182
183        for flowset in &value.flowsets {
184            if let IPFixFlowSetBody::Data(data) = &flowset.body {
185                for data_field in &data.fields {
186                    let value_map: BTreeMap<IPFixField, FieldValue> =
187                        data_field.values().cloned().collect();
188                    flowsets.push(NetflowCommonFlowSet {
189                        src_addr: value_map
190                            .get(&IPFixField::SourceIpv4address)
191                            .or_else(|| value_map.get(&IPFixField::SourceIpv6address))
192                            .and_then(|v| v.try_into().ok()),
193                        dst_addr: value_map
194                            .get(&IPFixField::DestinationIpv4address)
195                            .or_else(|| value_map.get(&IPFixField::DestinationIpv6address))
196                            .and_then(|v| v.try_into().ok()),
197                        src_port: value_map
198                            .get(&IPFixField::SourceTransportPort)
199                            .and_then(|v| v.try_into().ok()),
200                        dst_port: value_map
201                            .get(&IPFixField::DestinationTransportPort)
202                            .and_then(|v| v.try_into().ok()),
203                        protocol_number: value_map
204                            .get(&IPFixField::ProtocolIdentifier)
205                            .and_then(|v| v.try_into().ok()),
206                        protocol_type: value_map.get(&IPFixField::ProtocolIdentifier).and_then(
207                            |v| {
208                                v.try_into()
209                                    .ok()
210                                    .map(|proto: u8| ProtocolTypes::from(proto))
211                            },
212                        ),
213                        first_seen: value_map
214                            .get(&IPFixField::FlowStartSysUpTime)
215                            .and_then(|v| v.try_into().ok()),
216                        last_seen: value_map
217                            .get(&IPFixField::FlowEndSysUpTime)
218                            .and_then(|v| v.try_into().ok()),
219                        src_mac: value_map
220                            .get(&IPFixField::SourceMacaddress)
221                            .and_then(|v| v.try_into().ok()),
222                        dst_mac: value_map
223                            .get(&IPFixField::DestinationMacaddress)
224                            .and_then(|v| v.try_into().ok()),
225                    });
226                }
227            }
228        }
229
230        NetflowCommon {
231            version: value.header.version,
232            timestamp: value.header.export_time,
233            flowsets,
234        }
235    }
236}
237
238#[cfg(test)]
239mod common_tests {
240
241    use std::collections::BTreeMap;
242    use std::net::{IpAddr, Ipv4Addr};
243
244    use crate::netflow_common::NetflowCommon;
245    use crate::static_versions::v5::{FlowSet as V5FlowSet, Header as V5Header, V5};
246    use crate::static_versions::v7::{FlowSet as V7FlowSet, Header as V7Header, V7};
247    use crate::variable_versions::data_number::{DataNumber, FieldValue};
248    use crate::variable_versions::ipfix::{
249        Data as IPFixData, FlowSet as IPFixFlowSet, FlowSetBody as IPFixFlowSetBody,
250        FlowSetHeader as IPFixFlowSetHeader, Header as IPFixHeader, IPFix,
251    };
252    use crate::variable_versions::ipfix_lookup::IPFixField;
253    use crate::variable_versions::v9::{
254        Data as V9Data, FlowSet as V9FlowSet, FlowSetBody as V9FlowSetBody,
255        FlowSetHeader as V9FlowSetHeader, Header as V9Header, V9,
256    };
257    use crate::variable_versions::v9_lookup::V9Field;
258
259    #[test]
260    fn it_converts_v5_to_common() {
261        let v5 = V5 {
262            header: V5Header {
263                version: 5,
264                count: 1,
265                sys_up_time: 100,
266                unix_secs: 1609459200,
267                unix_nsecs: 0,
268                flow_sequence: 1,
269                engine_type: 0,
270                engine_id: 0,
271                sampling_interval: 0,
272            },
273            flowsets: vec![V5FlowSet {
274                src_addr: Ipv4Addr::new(192, 168, 1, 1),
275                dst_addr: Ipv4Addr::new(192, 168, 1, 2),
276                src_port: 1234,
277                dst_port: 80,
278                protocol_number: 6,
279                protocol_type: crate::protocol::ProtocolTypes::Tcp,
280                next_hop: Ipv4Addr::new(192, 168, 1, 254),
281                input: 0,
282                output: 0,
283                d_pkts: 10,
284                d_octets: 1000,
285                first: 100,
286                last: 200,
287                pad1: 0,
288                tcp_flags: 0,
289                tos: 0,
290                src_as: 0,
291                dst_as: 0,
292                src_mask: 0,
293                dst_mask: 0,
294                pad2: 0,
295            }],
296        };
297
298        let common: NetflowCommon = NetflowCommon::try_from(&v5).unwrap();
299
300        assert_eq!(common.version, 5);
301        assert_eq!(common.timestamp, 100);
302        assert_eq!(common.flowsets.len(), 1);
303        let flowset = &common.flowsets[0];
304        assert_eq!(
305            flowset.src_addr.unwrap(),
306            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
307        );
308        assert_eq!(
309            flowset.dst_addr.unwrap(),
310            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
311        );
312        assert_eq!(flowset.src_port.unwrap(), 1234);
313        assert_eq!(flowset.dst_port.unwrap(), 80);
314        assert_eq!(flowset.protocol_number.unwrap(), 6);
315        assert_eq!(
316            flowset.protocol_type.unwrap(),
317            crate::protocol::ProtocolTypes::Tcp
318        );
319        assert_eq!(flowset.first_seen.unwrap(), 100);
320        assert_eq!(flowset.last_seen.unwrap(), 200);
321    }
322
323    #[test]
324    fn it_converts_v7_to_common() {
325        let v7 = V7 {
326            header: V7Header {
327                version: 7,
328                count: 1,
329                sys_up_time: 100,
330                unix_secs: 1609459200,
331                unix_nsecs: 0,
332                flow_sequence: 1,
333                reserved: 0,
334            },
335            flowsets: vec![V7FlowSet {
336                src_addr: Ipv4Addr::new(192, 168, 1, 1),
337                dst_addr: Ipv4Addr::new(192, 168, 1, 2),
338                src_port: 1234,
339                dst_port: 80,
340                protocol_number: 6,
341                protocol_type: crate::protocol::ProtocolTypes::Tcp,
342                next_hop: Ipv4Addr::new(192, 168, 1, 254),
343                input: 0,
344                output: 0,
345                d_pkts: 10,
346                d_octets: 1000,
347                first: 100,
348                last: 200,
349                tcp_flags: 0,
350                tos: 0,
351                src_as: 0,
352                dst_as: 0,
353                src_mask: 0,
354                dst_mask: 0,
355                flags_fields_invalid: 0,
356                flags_fields_valid: 0,
357                router_src: Ipv4Addr::new(192, 168, 1, 254),
358            }],
359        };
360
361        let common: NetflowCommon = NetflowCommon::try_from(&v7).unwrap();
362
363        assert_eq!(common.version, 7);
364        assert_eq!(common.timestamp, 100);
365        assert_eq!(common.flowsets.len(), 1);
366        let flowset = &common.flowsets[0];
367        assert_eq!(
368            flowset.src_addr.unwrap(),
369            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
370        );
371        assert_eq!(
372            flowset.dst_addr.unwrap(),
373            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
374        );
375        assert_eq!(flowset.src_port.unwrap(), 1234);
376        assert_eq!(flowset.dst_port.unwrap(), 80);
377        assert_eq!(flowset.protocol_number.unwrap(), 6);
378        assert_eq!(
379            flowset.protocol_type.unwrap(),
380            crate::protocol::ProtocolTypes::Tcp
381        );
382        assert_eq!(flowset.first_seen.unwrap(), 100);
383        assert_eq!(flowset.last_seen.unwrap(), 200);
384    }
385
386    #[test]
387    fn it_converts_v9_to_common() {
388        // Test for V9 conversion
389        let v9 = V9 {
390            header: V9Header {
391                version: 9,
392                count: 1,
393                sys_up_time: 100,
394                unix_secs: 1609459200,
395                sequence_number: 1,
396                source_id: 0,
397            },
398            flowsets: vec![V9FlowSet {
399                header: V9FlowSetHeader {
400                    flowset_id: 0,
401                    length: 0,
402                },
403                body: V9FlowSetBody::Data(V9Data {
404                    padding: vec![],
405                    fields: vec![BTreeMap::from([
406                        (
407                            0,
408                            (
409                                V9Field::Ipv4SrcAddr,
410                                FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 1)),
411                            ),
412                        ),
413                        (
414                            1,
415                            (
416                                V9Field::Ipv4DstAddr,
417                                FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 2)),
418                            ),
419                        ),
420                        (
421                            2,
422                            (
423                                V9Field::L4SrcPort,
424                                FieldValue::DataNumber(DataNumber::U16(1234)),
425                            ),
426                        ),
427                        (
428                            3,
429                            (
430                                V9Field::L4DstPort,
431                                FieldValue::DataNumber(DataNumber::U16(80)),
432                            ),
433                        ),
434                        (
435                            4,
436                            (V9Field::Protocol, FieldValue::DataNumber(DataNumber::U8(6))),
437                        ),
438                        (
439                            5,
440                            (
441                                V9Field::FirstSwitched,
442                                FieldValue::DataNumber(DataNumber::U32(100)),
443                            ),
444                        ),
445                        (
446                            6,
447                            (
448                                V9Field::LastSwitched,
449                                FieldValue::DataNumber(DataNumber::U32(200)),
450                            ),
451                        ),
452                        (
453                            7,
454                            (
455                                V9Field::InSrcMac,
456                                FieldValue::MacAddr("00:00:00:00:00:01".to_string()),
457                            ),
458                        ),
459                        (
460                            8,
461                            (
462                                V9Field::InDstMac,
463                                FieldValue::MacAddr("00:00:00:00:00:02".to_string()),
464                            ),
465                        ),
466                    ])],
467                }),
468            }],
469        };
470
471        let common: NetflowCommon = NetflowCommon::try_from(&v9).unwrap();
472        assert_eq!(common.version, 9);
473        assert_eq!(common.timestamp, 100);
474        assert_eq!(common.flowsets.len(), 1);
475        let flowset = &common.flowsets[0];
476        assert_eq!(
477            flowset.src_addr.unwrap(),
478            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
479        );
480        assert_eq!(
481            flowset.dst_addr.unwrap(),
482            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
483        );
484        assert_eq!(flowset.src_port.unwrap(), 1234);
485        assert_eq!(flowset.dst_port.unwrap(), 80);
486        assert_eq!(flowset.protocol_number.unwrap(), 6);
487        assert_eq!(
488            flowset.protocol_type.unwrap(),
489            crate::protocol::ProtocolTypes::Tcp
490        );
491        assert_eq!(flowset.first_seen.unwrap(), 100);
492        assert_eq!(flowset.last_seen.unwrap(), 200);
493        assert_eq!(flowset.src_mac.as_ref().unwrap(), "00:00:00:00:00:01");
494        assert_eq!(flowset.dst_mac.as_ref().unwrap(), "00:00:00:00:00:02");
495    }
496
497    #[test]
498    fn it_converts_ipfix_to_common() {
499        // Test for IPFix conversion
500        let ipfix = IPFix {
501            header: IPFixHeader {
502                version: 10,
503                length: 0,
504                export_time: 100,
505                sequence_number: 1,
506                observation_domain_id: 0,
507            },
508            flowsets: vec![IPFixFlowSet {
509                header: IPFixFlowSetHeader {
510                    header_id: 0,
511                    length: 0,
512                },
513                body: IPFixFlowSetBody::Data(IPFixData {
514                    padding: vec![],
515                    fields: vec![BTreeMap::from([
516                        (
517                            0,
518                            (
519                                IPFixField::SourceIpv4address,
520                                FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 1)),
521                            ),
522                        ),
523                        (
524                            1,
525                            (
526                                IPFixField::DestinationIpv4address,
527                                FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 2)),
528                            ),
529                        ),
530                        (
531                            2,
532                            (
533                                IPFixField::SourceTransportPort,
534                                FieldValue::DataNumber(DataNumber::U16(1234)),
535                            ),
536                        ),
537                        (
538                            3,
539                            (
540                                IPFixField::DestinationTransportPort,
541                                FieldValue::DataNumber(DataNumber::U16(80)),
542                            ),
543                        ),
544                        (
545                            4,
546                            (
547                                IPFixField::ProtocolIdentifier,
548                                FieldValue::DataNumber(DataNumber::U8(6)),
549                            ),
550                        ),
551                        (
552                            5,
553                            (
554                                IPFixField::FlowStartSysUpTime,
555                                FieldValue::DataNumber(DataNumber::U32(100)),
556                            ),
557                        ),
558                        (
559                            6,
560                            (
561                                IPFixField::FlowEndSysUpTime,
562                                FieldValue::DataNumber(DataNumber::U32(200)),
563                            ),
564                        ),
565                        (
566                            7,
567                            (
568                                IPFixField::SourceMacaddress,
569                                FieldValue::MacAddr("00:00:00:00:00:01".to_string()),
570                            ),
571                        ),
572                        (
573                            8,
574                            (
575                                IPFixField::DestinationMacaddress,
576                                FieldValue::MacAddr("00:00:00:00:00:02".to_string()),
577                            ),
578                        ),
579                    ])],
580                }),
581            }],
582        };
583
584        let common: NetflowCommon = NetflowCommon::try_from(&ipfix).unwrap();
585        assert_eq!(common.version, 10);
586        assert_eq!(common.timestamp, 100);
587        assert_eq!(common.flowsets.len(), 1);
588        let flowset = &common.flowsets[0];
589        assert_eq!(
590            flowset.src_addr.unwrap(),
591            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
592        );
593        assert_eq!(
594            flowset.dst_addr.unwrap(),
595            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
596        );
597        assert_eq!(flowset.src_port.unwrap(), 1234);
598        assert_eq!(flowset.dst_port.unwrap(), 80);
599        assert_eq!(flowset.protocol_number.unwrap(), 6);
600        assert_eq!(
601            flowset.protocol_type.unwrap(),
602            crate::protocol::ProtocolTypes::Tcp
603        );
604        assert_eq!(flowset.first_seen.unwrap(), 100);
605        assert_eq!(flowset.last_seen.unwrap(), 200);
606        assert_eq!(flowset.src_mac.as_ref().unwrap(), "00:00:00:00:00:01");
607        assert_eq!(flowset.dst_mac.as_ref().unwrap(), "00:00:00:00:00:02");
608    }
609}