1use std::collections::BTreeMap;
2use std::net::IpAddr;
3
4use crate::NetflowPacket;
5use crate::protocol::ProtocolTypes;
6use crate::static_versions::{v5::V5, v7::V7};
7use crate::variable_versions::data_number::FieldValue;
8use crate::variable_versions::ipfix_lookup::IPFixField;
9use crate::variable_versions::v9_lookup::V9Field;
10use crate::variable_versions::{
11 ipfix::{FlowSetBody as IPFixFlowSetBody, IPFix},
12 v9::{FlowSetBody as V9FlowSetBody, V9},
13};
14
15#[derive(Debug)]
16pub enum NetflowCommonError {
17 UnknownVersion(NetflowPacket),
18}
19
20#[derive(Debug, Default)]
21pub struct NetflowCommon {
23 pub version: u16,
24 pub timestamp: u32,
25 pub flowsets: Vec<NetflowCommonFlowSet>,
26}
27
28impl TryFrom<&NetflowPacket> for NetflowCommon {
29 type Error = NetflowCommonError;
30
31 fn try_from(value: &NetflowPacket) -> Result<Self, NetflowCommonError> {
32 match value {
33 NetflowPacket::V5(v5) => Ok(v5.into()),
34 NetflowPacket::V7(v7) => Ok(v7.into()),
35 NetflowPacket::V9(v9) => Ok(v9.into()),
36 NetflowPacket::IPFix(ipfix) => Ok(ipfix.into()),
37 _ => Err(NetflowCommonError::UnknownVersion(value.clone())),
38 }
39 }
40}
41
42#[derive(Debug, Default)]
43pub struct NetflowCommonFlowSet {
45 pub src_addr: Option<IpAddr>,
47 pub dst_addr: Option<IpAddr>,
49 pub src_port: Option<u16>,
51 pub dst_port: Option<u16>,
53 pub protocol_number: Option<u8>,
55 pub protocol_type: Option<ProtocolTypes>,
57 pub first_seen: Option<u32>,
59 pub last_seen: Option<u32>,
61 pub src_mac: Option<String>,
63 pub dst_mac: Option<String>,
65}
66
67impl From<&V5> for NetflowCommon {
68 fn from(value: &V5) -> Self {
69 NetflowCommon {
71 version: value.header.version,
72 timestamp: value.header.sys_up_time,
73 flowsets: value
74 .flowsets
75 .iter()
76 .map(|set| NetflowCommonFlowSet {
77 src_addr: Some(set.src_addr.into()),
78 dst_addr: Some(set.dst_addr.into()),
79 src_port: Some(set.src_port),
80 dst_port: Some(set.dst_port),
81 protocol_number: Some(set.protocol_number),
82 protocol_type: Some(set.protocol_type),
83 first_seen: Some(set.first),
84 last_seen: Some(set.last),
85 src_mac: None,
86 dst_mac: None,
87 })
88 .collect(),
89 }
90 }
91}
92
93impl From<&V7> for NetflowCommon {
94 fn from(value: &V7) -> Self {
95 NetflowCommon {
97 version: value.header.version,
98 timestamp: value.header.sys_up_time,
99 flowsets: value
100 .flowsets
101 .iter()
102 .map(|set| NetflowCommonFlowSet {
103 src_addr: Some(set.src_addr.into()),
104 dst_addr: Some(set.dst_addr.into()),
105 src_port: Some(set.src_port),
106 dst_port: Some(set.dst_port),
107 protocol_number: Some(set.protocol_number),
108 protocol_type: Some(set.protocol_type),
109 first_seen: Some(set.first),
110 last_seen: Some(set.last),
111 src_mac: None,
112 dst_mac: None,
113 })
114 .collect(),
115 }
116 }
117}
118
119impl From<&V9> for NetflowCommon {
120 fn from(value: &V9) -> Self {
121 let mut flowsets = vec![];
123
124 for flowset in &value.flowsets {
125 if let V9FlowSetBody::Data(data) = &flowset.body {
126 for data_field in &data.fields {
127 let value_map: BTreeMap<V9Field, FieldValue> =
128 data_field.values().cloned().collect();
129 flowsets.push(NetflowCommonFlowSet {
130 src_addr: value_map
131 .get(&V9Field::Ipv4SrcAddr)
132 .or_else(|| value_map.get(&V9Field::Ipv6SrcAddr))
133 .and_then(|v| v.try_into().ok()),
134 dst_addr: value_map
135 .get(&V9Field::Ipv4DstAddr)
136 .or_else(|| value_map.get(&V9Field::Ipv6DstAddr))
137 .and_then(|v| v.try_into().ok()),
138 src_port: value_map
139 .get(&V9Field::L4SrcPort)
140 .and_then(|v| v.try_into().ok()),
141 dst_port: value_map
142 .get(&V9Field::L4DstPort)
143 .and_then(|v| v.try_into().ok()),
144 protocol_number: value_map
145 .get(&V9Field::Protocol)
146 .and_then(|v| v.try_into().ok()),
147 protocol_type: value_map.get(&V9Field::Protocol).and_then(|v| {
148 v.try_into()
149 .ok()
150 .map(|proto: u8| ProtocolTypes::from(proto))
151 }),
152 first_seen: value_map
153 .get(&V9Field::FirstSwitched)
154 .and_then(|v| v.try_into().ok()),
155 last_seen: value_map
156 .get(&V9Field::LastSwitched)
157 .and_then(|v| v.try_into().ok()),
158 src_mac: value_map
159 .get(&V9Field::InSrcMac)
160 .and_then(|v| v.try_into().ok()),
161 dst_mac: value_map
162 .get(&V9Field::InDstMac)
163 .and_then(|v| v.try_into().ok()),
164 });
165 }
166 }
167 }
168
169 NetflowCommon {
170 version: value.header.version,
171 timestamp: value.header.sys_up_time,
172 flowsets,
173 }
174 }
175}
176
177impl From<&IPFix> for NetflowCommon {
178 fn from(value: &IPFix) -> Self {
179 let mut flowsets = vec![];
182
183 for flowset in &value.flowsets {
184 if let IPFixFlowSetBody::Data(data) = &flowset.body {
185 for data_field in &data.fields {
186 let value_map: BTreeMap<IPFixField, FieldValue> =
187 data_field.values().cloned().collect();
188 flowsets.push(NetflowCommonFlowSet {
189 src_addr: value_map
190 .get(&IPFixField::SourceIpv4address)
191 .or_else(|| value_map.get(&IPFixField::SourceIpv6address))
192 .and_then(|v| v.try_into().ok()),
193 dst_addr: value_map
194 .get(&IPFixField::DestinationIpv4address)
195 .or_else(|| value_map.get(&IPFixField::DestinationIpv6address))
196 .and_then(|v| v.try_into().ok()),
197 src_port: value_map
198 .get(&IPFixField::SourceTransportPort)
199 .and_then(|v| v.try_into().ok()),
200 dst_port: value_map
201 .get(&IPFixField::DestinationTransportPort)
202 .and_then(|v| v.try_into().ok()),
203 protocol_number: value_map
204 .get(&IPFixField::ProtocolIdentifier)
205 .and_then(|v| v.try_into().ok()),
206 protocol_type: value_map.get(&IPFixField::ProtocolIdentifier).and_then(
207 |v| {
208 v.try_into()
209 .ok()
210 .map(|proto: u8| ProtocolTypes::from(proto))
211 },
212 ),
213 first_seen: value_map
214 .get(&IPFixField::FlowStartSysUpTime)
215 .and_then(|v| v.try_into().ok()),
216 last_seen: value_map
217 .get(&IPFixField::FlowEndSysUpTime)
218 .and_then(|v| v.try_into().ok()),
219 src_mac: value_map
220 .get(&IPFixField::SourceMacaddress)
221 .and_then(|v| v.try_into().ok()),
222 dst_mac: value_map
223 .get(&IPFixField::DestinationMacaddress)
224 .and_then(|v| v.try_into().ok()),
225 });
226 }
227 }
228 }
229
230 NetflowCommon {
231 version: value.header.version,
232 timestamp: value.header.export_time,
233 flowsets,
234 }
235 }
236}
237
238#[cfg(test)]
239mod common_tests {
240
241 use std::collections::BTreeMap;
242 use std::net::{IpAddr, Ipv4Addr};
243
244 use crate::netflow_common::NetflowCommon;
245 use crate::static_versions::v5::{FlowSet as V5FlowSet, Header as V5Header, V5};
246 use crate::static_versions::v7::{FlowSet as V7FlowSet, Header as V7Header, V7};
247 use crate::variable_versions::data_number::{DataNumber, FieldValue};
248 use crate::variable_versions::ipfix::{
249 Data as IPFixData, FlowSet as IPFixFlowSet, FlowSetBody as IPFixFlowSetBody,
250 FlowSetHeader as IPFixFlowSetHeader, Header as IPFixHeader, IPFix,
251 };
252 use crate::variable_versions::ipfix_lookup::IPFixField;
253 use crate::variable_versions::v9::{
254 Data as V9Data, FlowSet as V9FlowSet, FlowSetBody as V9FlowSetBody,
255 FlowSetHeader as V9FlowSetHeader, Header as V9Header, V9,
256 };
257 use crate::variable_versions::v9_lookup::V9Field;
258
259 #[test]
260 fn it_converts_v5_to_common() {
261 let v5 = V5 {
262 header: V5Header {
263 version: 5,
264 count: 1,
265 sys_up_time: 100,
266 unix_secs: 1609459200,
267 unix_nsecs: 0,
268 flow_sequence: 1,
269 engine_type: 0,
270 engine_id: 0,
271 sampling_interval: 0,
272 },
273 flowsets: vec![V5FlowSet {
274 src_addr: Ipv4Addr::new(192, 168, 1, 1),
275 dst_addr: Ipv4Addr::new(192, 168, 1, 2),
276 src_port: 1234,
277 dst_port: 80,
278 protocol_number: 6,
279 protocol_type: crate::protocol::ProtocolTypes::Tcp,
280 next_hop: Ipv4Addr::new(192, 168, 1, 254),
281 input: 0,
282 output: 0,
283 d_pkts: 10,
284 d_octets: 1000,
285 first: 100,
286 last: 200,
287 pad1: 0,
288 tcp_flags: 0,
289 tos: 0,
290 src_as: 0,
291 dst_as: 0,
292 src_mask: 0,
293 dst_mask: 0,
294 pad2: 0,
295 }],
296 };
297
298 let common: NetflowCommon = NetflowCommon::try_from(&v5).unwrap();
299
300 assert_eq!(common.version, 5);
301 assert_eq!(common.timestamp, 100);
302 assert_eq!(common.flowsets.len(), 1);
303 let flowset = &common.flowsets[0];
304 assert_eq!(
305 flowset.src_addr.unwrap(),
306 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
307 );
308 assert_eq!(
309 flowset.dst_addr.unwrap(),
310 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
311 );
312 assert_eq!(flowset.src_port.unwrap(), 1234);
313 assert_eq!(flowset.dst_port.unwrap(), 80);
314 assert_eq!(flowset.protocol_number.unwrap(), 6);
315 assert_eq!(
316 flowset.protocol_type.unwrap(),
317 crate::protocol::ProtocolTypes::Tcp
318 );
319 assert_eq!(flowset.first_seen.unwrap(), 100);
320 assert_eq!(flowset.last_seen.unwrap(), 200);
321 }
322
323 #[test]
324 fn it_converts_v7_to_common() {
325 let v7 = V7 {
326 header: V7Header {
327 version: 7,
328 count: 1,
329 sys_up_time: 100,
330 unix_secs: 1609459200,
331 unix_nsecs: 0,
332 flow_sequence: 1,
333 reserved: 0,
334 },
335 flowsets: vec![V7FlowSet {
336 src_addr: Ipv4Addr::new(192, 168, 1, 1),
337 dst_addr: Ipv4Addr::new(192, 168, 1, 2),
338 src_port: 1234,
339 dst_port: 80,
340 protocol_number: 6,
341 protocol_type: crate::protocol::ProtocolTypes::Tcp,
342 next_hop: Ipv4Addr::new(192, 168, 1, 254),
343 input: 0,
344 output: 0,
345 d_pkts: 10,
346 d_octets: 1000,
347 first: 100,
348 last: 200,
349 tcp_flags: 0,
350 tos: 0,
351 src_as: 0,
352 dst_as: 0,
353 src_mask: 0,
354 dst_mask: 0,
355 flags_fields_invalid: 0,
356 flags_fields_valid: 0,
357 router_src: Ipv4Addr::new(192, 168, 1, 254),
358 }],
359 };
360
361 let common: NetflowCommon = NetflowCommon::try_from(&v7).unwrap();
362
363 assert_eq!(common.version, 7);
364 assert_eq!(common.timestamp, 100);
365 assert_eq!(common.flowsets.len(), 1);
366 let flowset = &common.flowsets[0];
367 assert_eq!(
368 flowset.src_addr.unwrap(),
369 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
370 );
371 assert_eq!(
372 flowset.dst_addr.unwrap(),
373 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
374 );
375 assert_eq!(flowset.src_port.unwrap(), 1234);
376 assert_eq!(flowset.dst_port.unwrap(), 80);
377 assert_eq!(flowset.protocol_number.unwrap(), 6);
378 assert_eq!(
379 flowset.protocol_type.unwrap(),
380 crate::protocol::ProtocolTypes::Tcp
381 );
382 assert_eq!(flowset.first_seen.unwrap(), 100);
383 assert_eq!(flowset.last_seen.unwrap(), 200);
384 }
385
386 #[test]
387 fn it_converts_v9_to_common() {
388 let v9 = V9 {
390 header: V9Header {
391 version: 9,
392 count: 1,
393 sys_up_time: 100,
394 unix_secs: 1609459200,
395 sequence_number: 1,
396 source_id: 0,
397 },
398 flowsets: vec![V9FlowSet {
399 header: V9FlowSetHeader {
400 flowset_id: 0,
401 length: 0,
402 },
403 body: V9FlowSetBody::Data(V9Data {
404 padding: vec![],
405 fields: vec![BTreeMap::from([
406 (
407 0,
408 (
409 V9Field::Ipv4SrcAddr,
410 FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 1)),
411 ),
412 ),
413 (
414 1,
415 (
416 V9Field::Ipv4DstAddr,
417 FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 2)),
418 ),
419 ),
420 (
421 2,
422 (
423 V9Field::L4SrcPort,
424 FieldValue::DataNumber(DataNumber::U16(1234)),
425 ),
426 ),
427 (
428 3,
429 (
430 V9Field::L4DstPort,
431 FieldValue::DataNumber(DataNumber::U16(80)),
432 ),
433 ),
434 (
435 4,
436 (V9Field::Protocol, FieldValue::DataNumber(DataNumber::U8(6))),
437 ),
438 (
439 5,
440 (
441 V9Field::FirstSwitched,
442 FieldValue::DataNumber(DataNumber::U32(100)),
443 ),
444 ),
445 (
446 6,
447 (
448 V9Field::LastSwitched,
449 FieldValue::DataNumber(DataNumber::U32(200)),
450 ),
451 ),
452 (
453 7,
454 (
455 V9Field::InSrcMac,
456 FieldValue::MacAddr("00:00:00:00:00:01".to_string()),
457 ),
458 ),
459 (
460 8,
461 (
462 V9Field::InDstMac,
463 FieldValue::MacAddr("00:00:00:00:00:02".to_string()),
464 ),
465 ),
466 ])],
467 }),
468 }],
469 };
470
471 let common: NetflowCommon = NetflowCommon::try_from(&v9).unwrap();
472 assert_eq!(common.version, 9);
473 assert_eq!(common.timestamp, 100);
474 assert_eq!(common.flowsets.len(), 1);
475 let flowset = &common.flowsets[0];
476 assert_eq!(
477 flowset.src_addr.unwrap(),
478 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
479 );
480 assert_eq!(
481 flowset.dst_addr.unwrap(),
482 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
483 );
484 assert_eq!(flowset.src_port.unwrap(), 1234);
485 assert_eq!(flowset.dst_port.unwrap(), 80);
486 assert_eq!(flowset.protocol_number.unwrap(), 6);
487 assert_eq!(
488 flowset.protocol_type.unwrap(),
489 crate::protocol::ProtocolTypes::Tcp
490 );
491 assert_eq!(flowset.first_seen.unwrap(), 100);
492 assert_eq!(flowset.last_seen.unwrap(), 200);
493 assert_eq!(flowset.src_mac.as_ref().unwrap(), "00:00:00:00:00:01");
494 assert_eq!(flowset.dst_mac.as_ref().unwrap(), "00:00:00:00:00:02");
495 }
496
497 #[test]
498 fn it_converts_ipfix_to_common() {
499 let ipfix = IPFix {
501 header: IPFixHeader {
502 version: 10,
503 length: 0,
504 export_time: 100,
505 sequence_number: 1,
506 observation_domain_id: 0,
507 },
508 flowsets: vec![IPFixFlowSet {
509 header: IPFixFlowSetHeader {
510 header_id: 0,
511 length: 0,
512 },
513 body: IPFixFlowSetBody::Data(IPFixData {
514 padding: vec![],
515 fields: vec![BTreeMap::from([
516 (
517 0,
518 (
519 IPFixField::SourceIpv4address,
520 FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 1)),
521 ),
522 ),
523 (
524 1,
525 (
526 IPFixField::DestinationIpv4address,
527 FieldValue::Ip4Addr(Ipv4Addr::new(192, 168, 1, 2)),
528 ),
529 ),
530 (
531 2,
532 (
533 IPFixField::SourceTransportPort,
534 FieldValue::DataNumber(DataNumber::U16(1234)),
535 ),
536 ),
537 (
538 3,
539 (
540 IPFixField::DestinationTransportPort,
541 FieldValue::DataNumber(DataNumber::U16(80)),
542 ),
543 ),
544 (
545 4,
546 (
547 IPFixField::ProtocolIdentifier,
548 FieldValue::DataNumber(DataNumber::U8(6)),
549 ),
550 ),
551 (
552 5,
553 (
554 IPFixField::FlowStartSysUpTime,
555 FieldValue::DataNumber(DataNumber::U32(100)),
556 ),
557 ),
558 (
559 6,
560 (
561 IPFixField::FlowEndSysUpTime,
562 FieldValue::DataNumber(DataNumber::U32(200)),
563 ),
564 ),
565 (
566 7,
567 (
568 IPFixField::SourceMacaddress,
569 FieldValue::MacAddr("00:00:00:00:00:01".to_string()),
570 ),
571 ),
572 (
573 8,
574 (
575 IPFixField::DestinationMacaddress,
576 FieldValue::MacAddr("00:00:00:00:00:02".to_string()),
577 ),
578 ),
579 ])],
580 }),
581 }],
582 };
583
584 let common: NetflowCommon = NetflowCommon::try_from(&ipfix).unwrap();
585 assert_eq!(common.version, 10);
586 assert_eq!(common.timestamp, 100);
587 assert_eq!(common.flowsets.len(), 1);
588 let flowset = &common.flowsets[0];
589 assert_eq!(
590 flowset.src_addr.unwrap(),
591 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))
592 );
593 assert_eq!(
594 flowset.dst_addr.unwrap(),
595 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2))
596 );
597 assert_eq!(flowset.src_port.unwrap(), 1234);
598 assert_eq!(flowset.dst_port.unwrap(), 80);
599 assert_eq!(flowset.protocol_number.unwrap(), 6);
600 assert_eq!(
601 flowset.protocol_type.unwrap(),
602 crate::protocol::ProtocolTypes::Tcp
603 );
604 assert_eq!(flowset.first_seen.unwrap(), 100);
605 assert_eq!(flowset.last_seen.unwrap(), 200);
606 assert_eq!(flowset.src_mac.as_ref().unwrap(), "00:00:00:00:00:01");
607 assert_eq!(flowset.dst_mac.as_ref().unwrap(), "00:00:00:00:00:02");
608 }
609}