netflow_parser/
netflow_common.rs

1use std::collections::BTreeMap;
2use std::net::IpAddr;
3
4use crate::NetflowPacket;
5use crate::protocol::ProtocolTypes;
6use crate::static_versions::{v5::V5, v7::V7};
7use crate::variable_versions::data_number::FieldValue;
8use crate::variable_versions::ipfix_lookup::{IANAIPFixField, IPFixField};
9use crate::variable_versions::v9_lookup::V9Field;
10use crate::variable_versions::{
11    ipfix::{FlowSetBody as IPFixFlowSetBody, IPFix},
12    v9::{FlowSetBody as V9FlowSetBody, V9},
13};
14
15#[derive(Debug)]
16pub enum NetflowCommonError {
17    UnknownVersion(NetflowPacket),
18}
19
20#[derive(Debug, Default)]
21/// Common structure for Netflow
22pub struct NetflowCommon {
23    pub version: u16,
24    pub timestamp: u32,
25    pub flowsets: Vec<NetflowCommonFlowSet>,
26}
27
28impl TryFrom<&NetflowPacket> for NetflowCommon {
29    type Error = NetflowCommonError;
30
31    fn try_from(value: &NetflowPacket) -> Result<Self, NetflowCommonError> {
32        match value {
33            NetflowPacket::V5(v5) => Ok(v5.into()),
34            NetflowPacket::V7(v7) => Ok(v7.into()),
35            NetflowPacket::V9(v9) => Ok(v9.into()),
36            NetflowPacket::IPFix(ipfix) => Ok(ipfix.into()),
37            _ => Err(NetflowCommonError::UnknownVersion(value.clone())),
38        }
39    }
40}
41
42#[derive(Debug, Default)]
43/// Common flow set structure for Netflow
44pub struct NetflowCommonFlowSet {
45    /// Source IP address
46    pub src_addr: Option<IpAddr>,
47    /// Destination IP address
48    pub dst_addr: Option<IpAddr>,
49    /// TCP/UDP source port number or equivalent
50    pub src_port: Option<u16>,
51    /// TCP/UDP destination port number or equivalent
52    pub dst_port: Option<u16>,
53    /// Number of IP protocol type (for example, TCP = 6; UDP = 17)
54    pub protocol_number: Option<u8>,
55    /// IP protocol type itself
56    pub protocol_type: Option<ProtocolTypes>,
57    /// Duration of the flow first
58    pub first_seen: Option<u32>,
59    /// Duration of the flow last
60    pub last_seen: Option<u32>,
61    /// Source MAC address
62    pub src_mac: Option<String>,
63    /// Destination MAC address
64    pub dst_mac: Option<String>,
65}
66
67impl From<&V5> for NetflowCommon {
68    fn from(value: &V5) -> Self {
69        // Convert V5 to NetflowCommon
70        NetflowCommon {
71            version: value.header.version,
72            timestamp: value.header.sys_up_time,
73            flowsets: value
74                .flowsets
75                .iter()
76                .map(|set| NetflowCommonFlowSet {
77                    src_addr: Some(set.src_addr.into()),
78                    dst_addr: Some(set.dst_addr.into()),
79                    src_port: Some(set.src_port),
80                    dst_port: Some(set.dst_port),
81                    protocol_number: Some(set.protocol_number),
82                    protocol_type: Some(set.protocol_type),
83                    first_seen: Some(set.first),
84                    last_seen: Some(set.last),
85                    src_mac: None,
86                    dst_mac: None,
87                })
88                .collect(),
89        }
90    }
91}
92
93impl From<&V7> for NetflowCommon {
94    fn from(value: &V7) -> Self {
95        // Convert V7 to NetflowCommon
96        NetflowCommon {
97            version: value.header.version,
98            timestamp: value.header.sys_up_time,
99            flowsets: value
100                .flowsets
101                .iter()
102                .map(|set| NetflowCommonFlowSet {
103                    src_addr: Some(set.src_addr.into()),
104                    dst_addr: Some(set.dst_addr.into()),
105                    src_port: Some(set.src_port),
106                    dst_port: Some(set.dst_port),
107                    protocol_number: Some(set.protocol_number),
108                    protocol_type: Some(set.protocol_type),
109                    first_seen: Some(set.first),
110                    last_seen: Some(set.last),
111                    src_mac: None,
112                    dst_mac: None,
113                })
114                .collect(),
115        }
116    }
117}
118
119impl From<&V9> for NetflowCommon {
120    fn from(value: &V9) -> Self {
121        // Convert V9 to NetflowCommon
122        let mut flowsets = vec![];
123
124        for flowset in &value.flowsets {
125            if let V9FlowSetBody::Data(data) = &flowset.body {
126                for data_field in &data.fields {
127                    let value_map: BTreeMap<V9Field, FieldValue> =
128                        data_field.clone().into_iter().collect();
129                    flowsets.push(NetflowCommonFlowSet {
130                        src_addr: value_map
131                            .get(&V9Field::Ipv4SrcAddr)
132                            .or_else(|| value_map.get(&V9Field::Ipv6SrcAddr))
133                            .and_then(|v| v.try_into().ok()),
134                        dst_addr: value_map
135                            .get(&V9Field::Ipv4DstAddr)
136                            .or_else(|| value_map.get(&V9Field::Ipv6DstAddr))
137                            .and_then(|v| v.try_into().ok()),
138                        src_port: value_map
139                            .get(&V9Field::L4SrcPort)
140                            .and_then(|v| v.try_into().ok()),
141                        dst_port: value_map
142                            .get(&V9Field::L4DstPort)
143                            .and_then(|v| v.try_into().ok()),
144                        protocol_number: value_map
145                            .get(&V9Field::Protocol)
146                            .and_then(|v| v.try_into().ok()),
147                        protocol_type: value_map.get(&V9Field::Protocol).and_then(|v| {
148                            v.try_into()
149                                .ok()
150                                .map(|proto: u8| ProtocolTypes::from(proto))
151                        }),
152                        first_seen: value_map
153                            .get(&V9Field::FirstSwitched)
154                            .and_then(|v| v.try_into().ok()),
155                        last_seen: value_map
156                            .get(&V9Field::LastSwitched)
157                            .and_then(|v| v.try_into().ok()),
158                        src_mac: value_map
159                            .get(&V9Field::InSrcMac)
160                            .and_then(|v| v.try_into().ok()),
161                        dst_mac: value_map
162                            .get(&V9Field::InDstMac)
163                            .and_then(|v| v.try_into().ok()),
164                    });
165                }
166            }
167        }
168
169        NetflowCommon {
170            version: value.header.version,
171            timestamp: value.header.sys_up_time,
172            flowsets,
173        }
174    }
175}
176
177impl From<&IPFix> for NetflowCommon {
178    fn from(value: &IPFix) -> Self {
179        // Convert IPFix to NetflowCommon
180
181        let mut flowsets = vec![];
182
183        for flowset in &value.flowsets {
184            if let IPFixFlowSetBody::Data(data) = &flowset.body {
185                for data_field in &data.fields {
186                    let value_map: BTreeMap<IPFixField, FieldValue> =
187                        data_field.clone().into_iter().collect();
188                    flowsets.push(NetflowCommonFlowSet {
189                        src_addr: value_map
190                            .get(&IPFixField::IANA(IANAIPFixField::SourceIpv4address))
191                            .or_else(|| {
192                                value_map
193                                    .get(&IPFixField::IANA(IANAIPFixField::SourceIpv6address))
194                            })
195                            .and_then(|v| v.try_into().ok()),
196                        dst_addr: value_map
197                            .get(&IPFixField::IANA(IANAIPFixField::DestinationIpv4address))
198                            .or_else(|| {
199                                value_map.get(&IPFixField::IANA(
200                                    IANAIPFixField::DestinationIpv6address,
201                                ))
202                            })
203                            .and_then(|v| v.try_into().ok()),
204                        src_port: value_map
205                            .get(&IPFixField::IANA(IANAIPFixField::SourceTransportPort))
206                            .and_then(|v| v.try_into().ok()),
207                        dst_port: value_map
208                            .get(&IPFixField::IANA(IANAIPFixField::DestinationTransportPort))
209                            .and_then(|v| v.try_into().ok()),
210                        protocol_number: value_map
211                            .get(&IPFixField::IANA(IANAIPFixField::ProtocolIdentifier))
212                            .and_then(|v| v.try_into().ok()),
213                        protocol_type: value_map
214                            .get(&IPFixField::IANA(IANAIPFixField::ProtocolIdentifier))
215                            .and_then(|v| {
216                                v.try_into()
217                                    .ok()
218                                    .map(|proto: u8| ProtocolTypes::from(proto))
219                            }),
220                        first_seen: value_map
221                            .get(&IPFixField::IANA(IANAIPFixField::FlowStartSysUpTime))
222                            .and_then(|v| v.try_into().ok()),
223                        last_seen: value_map
224                            .get(&IPFixField::IANA(IANAIPFixField::FlowEndSysUpTime))
225                            .and_then(|v| v.try_into().ok()),
226                        src_mac: value_map
227                            .get(&IPFixField::IANA(IANAIPFixField::SourceMacaddress))
228                            .and_then(|v| v.try_into().ok()),
229                        dst_mac: value_map
230                            .get(&IPFixField::IANA(IANAIPFixField::DestinationMacaddress))
231                            .and_then(|v| v.try_into().ok()),
232                    });
233                }
234            }
235        }
236
237        NetflowCommon {
238            version: value.header.version,
239            timestamp: value.header.export_time,
240            flowsets,
241        }
242    }
243}
244
245#[cfg(test)]
246mod common_tests {
247    use std::net::{IpAddr, Ipv4Addr};
248
249    use crate::netflow_common::NetflowCommon;
250    use crate::static_versions::v5::{FlowSet as V5FlowSet, Header as V5Header, V5};
251    use crate::static_versions::v7::{FlowSet as V7FlowSet, Header as V7Header, V7};
252    use crate::variable_versions::data_number::{DataNumber, FieldValue};
253    use crate::variable_versions::ipfix::{
254        Data as IPFixData, FlowSet as IPFixFlowSet, FlowSetBody as IPFixFlowSetBody,
255        FlowSetHeader as IPFixFlowSetHeader, Header as IPFixHeader, IPFix,
256    };
257    use crate::variable_versions::ipfix_lookup::{IANAIPFixField, IPFixField};
258    use crate::variable_versions::v9::{
259        Data as V9Data, FlowSet as V9FlowSet, FlowSetBody as V9FlowSetBody,
260        FlowSetHeader as V9FlowSetHeader, Header as V9Header, V9,
261    };
262    use crate::variable_versions::v9_lookup::V9Field;
263
264    #[test]
265    fn it_converts_v5_to_common() {
266        let v5 = V5 {
267            header: V5Header {
268                version: 5,
269                count: 1,
270                sys_up_time: 100,
271                unix_secs: 1609459200,
272                unix_nsecs: 0,
273                flow_sequence: 1,
274                engine_type: 0,
275                engine_id: 0,
276                sampling_interval: 0,
277            },
278            flowsets: vec![V5FlowSet {
279                src_addr: Ipv4Addr::new(192, 168, 1, 1),
280                dst_addr: Ipv4Addr::new(192, 168, 1, 2),
281                src_port: 1234,
282                dst_port: 80,
283                protocol_number: 6,
284                protocol_type: crate::protocol::ProtocolTypes::Tcp,
285                next_hop: Ipv4Addr::new(192, 168, 1, 254),
286                input: 0,
287                output: 0,
288                d_pkts: 10,
289                d_octets: 1000,
290                first: 100,
291                last: 200,
292                pad1: 0,
293                tcp_flags: 0,
294                tos: 0,
295                src_as: 0,
296                dst_as: 0,
297                src_mask: 0,
298                dst_mask: 0,
299                pad2: 0,
300            }],
301        };
302
303        let common: NetflowCommon = NetflowCommon::try_from(&v5).unwrap();
304
305        assert_eq!(common.version, 5);
306        assert_eq!(common.timestamp, 100);
307        assert_eq!(common.flowsets.len(), 1);
308        let flowset = &common.flowsets[0];
309        assert_eq!(
310            flowset.src_addr.unwrap(),
311            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
312        );
313        assert_eq!(
314            flowset.dst_addr.unwrap(),
315            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
316        );
317        assert_eq!(flowset.src_port.unwrap(), 1234);
318        assert_eq!(flowset.dst_port.unwrap(), 80);
319        assert_eq!(flowset.protocol_number.unwrap(), 6);
320        assert_eq!(
321            flowset.protocol_type.unwrap(),
322            crate::protocol::ProtocolTypes::Tcp
323        );
324        assert_eq!(flowset.first_seen.unwrap(), 100);
325        assert_eq!(flowset.last_seen.unwrap(), 200);
326    }
327
328    #[test]
329    fn it_converts_v7_to_common() {
330        let v7 = V7 {
331            header: V7Header {
332                version: 7,
333                count: 1,
334                sys_up_time: 100,
335                unix_secs: 1609459200,
336                unix_nsecs: 0,
337                flow_sequence: 1,
338                reserved: 0,
339            },
340            flowsets: vec![V7FlowSet {
341                src_addr: Ipv4Addr::new(192, 168, 1, 1),
342                dst_addr: Ipv4Addr::new(192, 168, 1, 2),
343                src_port: 1234,
344                dst_port: 80,
345                protocol_number: 6,
346                protocol_type: crate::protocol::ProtocolTypes::Tcp,
347                next_hop: Ipv4Addr::new(192, 168, 1, 254),
348                input: 0,
349                output: 0,
350                d_pkts: 10,
351                d_octets: 1000,
352                first: 100,
353                last: 200,
354                tcp_flags: 0,
355                tos: 0,
356                src_as: 0,
357                dst_as: 0,
358                src_mask: 0,
359                dst_mask: 0,
360                flags_fields_invalid: 0,
361                flags_fields_valid: 0,
362                router_src: Ipv4Addr::new(192, 168, 1, 254),
363            }],
364        };
365
366        let common: NetflowCommon = NetflowCommon::try_from(&v7).unwrap();
367
368        assert_eq!(common.version, 7);
369        assert_eq!(common.timestamp, 100);
370        assert_eq!(common.flowsets.len(), 1);
371        let flowset = &common.flowsets[0];
372        assert_eq!(
373            flowset.src_addr.unwrap(),
374            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
375        );
376        assert_eq!(
377            flowset.dst_addr.unwrap(),
378            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
379        );
380        assert_eq!(flowset.src_port.unwrap(), 1234);
381        assert_eq!(flowset.dst_port.unwrap(), 80);
382        assert_eq!(flowset.protocol_number.unwrap(), 6);
383        assert_eq!(
384            flowset.protocol_type.unwrap(),
385            crate::protocol::ProtocolTypes::Tcp
386        );
387        assert_eq!(flowset.first_seen.unwrap(), 100);
388        assert_eq!(flowset.last_seen.unwrap(), 200);
389    }
390
391    #[test]
392    fn it_converts_v9_to_common() {
393        // Test for V9 conversion
394        let v9 = V9 {
395            header: V9Header {
396                version: 9,
397                count: 1,
398                sys_up_time: 100,
399                unix_secs: 1609459200,
400                sequence_number: 1,
401                source_id: 0,
402            },
403            flowsets: vec![V9FlowSet {
404                header: V9FlowSetHeader {
405                    flowset_id: 0,
406                    length: 0,
407                },
408                body: V9FlowSetBody::Data(V9Data {
409                    padding: vec![],
410                    fields: vec![Vec::from([
411                        (
412                            V9Field::Ipv4SrcAddr,
413                            FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 1)),
414                        ),
415                        (
416                            V9Field::Ipv4DstAddr,
417                            FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 2)),
418                        ),
419                        (
420                            V9Field::L4SrcPort,
421                            FieldValue::DataNumber(DataNumber::U16(1234)),
422                        ),
423                        (
424                            V9Field::L4DstPort,
425                            FieldValue::DataNumber(DataNumber::U16(80)),
426                        ),
427                        (V9Field::Protocol, FieldValue::DataNumber(DataNumber::U8(6))),
428                        (
429                            V9Field::FirstSwitched,
430                            FieldValue::DataNumber(DataNumber::U32(100)),
431                        ),
432                        (
433                            V9Field::LastSwitched,
434                            FieldValue::DataNumber(DataNumber::U32(200)),
435                        ),
436                        (
437                            V9Field::InSrcMac,
438                            FieldValue::MacAddr("00:00:00:00:00:01".to_string()),
439                        ),
440                        (
441                            V9Field::InDstMac,
442                            FieldValue::MacAddr("00:00:00:00:00:02".to_string()),
443                        ),
444                    ])],
445                }),
446            }],
447        };
448
449        let common: NetflowCommon = NetflowCommon::try_from(&v9).unwrap();
450        assert_eq!(common.version, 9);
451        assert_eq!(common.timestamp, 100);
452        assert_eq!(common.flowsets.len(), 1);
453        let flowset = &common.flowsets[0];
454        assert_eq!(
455            flowset.src_addr.unwrap(),
456            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
457        );
458        assert_eq!(
459            flowset.dst_addr.unwrap(),
460            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
461        );
462        assert_eq!(flowset.src_port.unwrap(), 1234);
463        assert_eq!(flowset.dst_port.unwrap(), 80);
464        assert_eq!(flowset.protocol_number.unwrap(), 6);
465        assert_eq!(
466            flowset.protocol_type.unwrap(),
467            crate::protocol::ProtocolTypes::Tcp
468        );
469        assert_eq!(flowset.first_seen.unwrap(), 100);
470        assert_eq!(flowset.last_seen.unwrap(), 200);
471        assert_eq!(flowset.src_mac.as_ref().unwrap(), "00:00:00:00:00:01");
472        assert_eq!(flowset.dst_mac.as_ref().unwrap(), "00:00:00:00:00:02");
473    }
474
475    #[test]
476    fn it_converts_ipfix_to_common() {
477        // Test for IPFix conversion
478        let ipfix = IPFix {
479            header: IPFixHeader {
480                version: 10,
481                length: 0,
482                export_time: 100,
483                sequence_number: 1,
484                observation_domain_id: 0,
485            },
486            flowsets: vec![IPFixFlowSet {
487                header: IPFixFlowSetHeader {
488                    header_id: 0,
489                    length: 0,
490                },
491                body: IPFixFlowSetBody::Data(IPFixData {
492                    fields: vec![Vec::from([
493                        (
494                            IPFixField::IANA(IANAIPFixField::SourceIpv4address),
495                            FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 1)),
496                        ),
497                        (
498                            IPFixField::IANA(IANAIPFixField::DestinationIpv4address),
499                            FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 2)),
500                        ),
501                        (
502                            IPFixField::IANA(IANAIPFixField::SourceTransportPort),
503                            FieldValue::DataNumber(DataNumber::U16(1234)),
504                        ),
505                        (
506                            IPFixField::IANA(IANAIPFixField::DestinationTransportPort),
507                            FieldValue::DataNumber(DataNumber::U16(80)),
508                        ),
509                        (
510                            IPFixField::IANA(IANAIPFixField::ProtocolIdentifier),
511                            FieldValue::DataNumber(DataNumber::U8(6)),
512                        ),
513                        (
514                            IPFixField::IANA(IANAIPFixField::FlowStartSysUpTime),
515                            FieldValue::DataNumber(DataNumber::U32(100)),
516                        ),
517                        (
518                            IPFixField::IANA(IANAIPFixField::FlowEndSysUpTime),
519                            FieldValue::DataNumber(DataNumber::U32(200)),
520                        ),
521                        (
522                            IPFixField::IANA(IANAIPFixField::SourceMacaddress),
523                            FieldValue::MacAddr("00:00:00:00:00:01".to_string()),
524                        ),
525                        (
526                            IPFixField::IANA(IANAIPFixField::DestinationMacaddress),
527                            FieldValue::MacAddr("00:00:00:00:00:02".to_string()),
528                        ),
529                    ])],
530                }),
531            }],
532        };
533
534        let common: NetflowCommon = NetflowCommon::try_from(&ipfix).unwrap();
535        assert_eq!(common.version, 10);
536        assert_eq!(common.timestamp, 100);
537        assert_eq!(common.flowsets.len(), 1);
538        let flowset = &common.flowsets[0];
539        assert_eq!(
540            flowset.src_addr.unwrap(),
541            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
542        );
543        assert_eq!(
544            flowset.dst_addr.unwrap(),
545            IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
546        );
547        assert_eq!(flowset.src_port.unwrap(), 1234);
548        assert_eq!(flowset.dst_port.unwrap(), 80);
549        assert_eq!(flowset.protocol_number.unwrap(), 6);
550        assert_eq!(
551            flowset.protocol_type.unwrap(),
552            crate::protocol::ProtocolTypes::Tcp
553        );
554        assert_eq!(flowset.first_seen.unwrap(), 100);
555        assert_eq!(flowset.last_seen.unwrap(), 200);
556        assert_eq!(flowset.src_mac.as_ref().unwrap(), "00:00:00:00:00:01");
557        assert_eq!(flowset.dst_mac.as_ref().unwrap(), "00:00:00:00:00:02");
558    }
559}