1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use crate::{PacketData, ReportKey, ReportValue};
use crossbeam_channel::{select, tick, Receiver, Sender};
use etherparse::PacketHeaders;
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::time::Duration;
pub(crate) fn parser_task(
rx_prod_parser: Receiver<PacketData>,
tx_parser_writer: Sender<HashMap<ReportKey, ReportValue>>,
timeout_s: u32,
) {
let ticker = tick(Duration::from_millis((timeout_s * 1000 / 4) as u64));
let mut map: Option<HashMap<ReportKey, ReportValue>> = Some(HashMap::new());
loop {
select! {
recv(rx_prod_parser) -> packet => {
if let Ok(data) = packet {
let ts = data.timestamp_ms;
let bytes = data.bytes;
let packet = data.data;
match PacketHeaders::from_ethernet_slice(&packet){
Err(_) => continue,
Ok(value) => {
if value.ip.is_some() && value.transport.is_some() {
let map = map.as_mut().unwrap();
let ports = match value.transport.unwrap() {
etherparse::TransportHeader::Tcp(value) => (value.source_port, value.destination_port, 6),
etherparse::TransportHeader::Udp(value) => (value.source_port, value.destination_port, 17),
_ => continue,
};
let ip_header = value.ip.unwrap();
let ip_info = match ip_header {
etherparse::IpHeader::Version4(value, ..) => (IpAddr::V4(Ipv4Addr::from(value.source)), IpAddr::V4(Ipv4Addr::from(value.destination))),
etherparse::IpHeader::Version6(value, ..) => (IpAddr::V6(Ipv6Addr::from(value.source)), IpAddr::V6(Ipv6Addr::from(value.destination))),
};
let key = ReportKey {
source_ip: ip_info.0,
source_port: ports.0,
destination_ip: ip_info.1,
destination_port: ports.1,
};
let mut info = map.entry(key).or_insert(ReportValue {
protocols: HashSet::new(),
first_timestamp_ms: ts,
last_timestamp_ms: ts,
bytes: 0,
packets_count: 0,
});
info.protocols.insert(ports.2);
info.bytes += bytes as u64;
info.last_timestamp_ms = info.last_timestamp_ms.max(ts);
info.first_timestamp_ms = info.first_timestamp_ms.min(ts);
info.packets_count += 1;
}
}
}
} else {
break;
}
}
recv(ticker) -> _ => {
tx_parser_writer.send(map.take().unwrap()).unwrap();
map = Some(HashMap::new());
}
}
}
tx_parser_writer.send(map.take().unwrap()).unwrap();
}