use std::net::{UdpSocket, SocketAddr};
use std::net::Ipv4Addr;
use std::convert::TryInto;
use std::io::{Error,ErrorKind};
use std::sync::{Arc, Mutex};
use rusqlite::{Connection};
use crate::fields::*;
use crate::senders::*;
use crate::settings::ServerSettings;
use crate::templates::*;
use crate::utils::*;
use crate::sql::*;
pub struct NetflowServer {
pub initial_template_received: bool,
pub socket: UdpSocket,
pub receive_buffer: [u8; 2500],
byte_count: usize,
pub senders: Vec<NetflowSender>,
pub db_conn: Arc<Mutex<Connection>>
}
impl NetflowServer {
pub fn new(addr_and_port: String, db_conn_srv: Arc<Mutex<Connection>>) -> Self {
NetflowServer {
initial_template_received: false,
socket: UdpSocket::bind(addr_and_port)
.expect("Unable to bind socket"),
receive_buffer: [0; 2500],
byte_count: 0,
senders: Vec::new(),
db_conn: db_conn_srv
}
}
pub fn run(&mut self) {
let source_address = self.wait_for_initial_template();
let template: NetflowTemplate = self.parse_flow_template();
self.update_or_create_sender(source_address, template);
loop {
let source_address = self.start_receiving();
let sender_ip = convert_socket_to_ipv4(source_address);
let packet_type = self.determine_packet_type();
match packet_type {
PacketType::Template => {
let template: NetflowTemplate = self.parse_flow_template();
self.update_or_create_sender(source_address, template);
},
PacketType::Data => {
let sender_index_result = self.match_sender(sender_ip);
let sender_index = match sender_index_result {
Ok(o) => o,
Err(e) => {
continue;
}
};
self.parse_data_to_packet(self.byte_count, sender_index);
let senders_len = self.senders.len();
for x in 0..senders_len {
self.senders[x].parse_packet_to_flow();
self.senders[x].prepare_and_update_flow_in_db(&mut self.db_conn);
}
},
}
}
}
pub fn update_or_create_sender(&mut self, source_address: SocketAddr, template: NetflowTemplate) {
let mut found_sender = false;
let new_sender_ip = convert_socket_to_ipv4(source_address);
let vec_len = self.senders.len();
for x in 0..vec_len {
if self.senders[x].ip_addr == new_sender_ip {
found_sender = true;
break;
}
}
if !found_sender {
let ip_as_str = convert_ipv4_to_string(new_sender_ip);
update_senders_in_db(&mut self.db_conn, ip_as_str.as_str());
let new_sender = NetflowSender {
ip_addr: new_sender_ip,
active_template: template,
flow_packets: Vec::new(),
flow_stats: Vec::new(),
};
self.senders.push(new_sender);
}
}
fn decode_field_order(&self, field_id: u16, received_template: &mut NetflowTemplate) {
match field_id {
1 => {
received_template.order_vec.push(FlowField::InOctets);
},
2 => {
received_template.order_vec.push(FlowField::InPkts);
},
3 => {
received_template.order_vec.push(FlowField::Flows);
},
4 => {
received_template.order_vec.push(FlowField::Protocol);
},
5 => {
received_template.order_vec.push(FlowField::SrcTOS);
},
6 => {
received_template.order_vec.push(FlowField::TCPFlags);
},
7 => {
received_template.order_vec.push(FlowField::SrcPort);
},
8 => {
received_template.order_vec.push(FlowField::SrcAddr);
},
9 => {
received_template.order_vec.push(FlowField::SrcMask);
},
10 => {
received_template.order_vec.push(FlowField::InputSNMP);
},
11 => {
received_template.order_vec.push(FlowField::DstPort);
},
12 => {
received_template.order_vec.push(FlowField::DstAddr);
},
13 => {
received_template.order_vec.push(FlowField::DstMask);
},
14 => {
received_template.order_vec.push(FlowField::OutputSNMP);
},
15 => {
received_template.order_vec.push(FlowField::NextHop);
},
80 => {
received_template.order_vec.push(FlowField::InDstMac);
},
81 => {
received_template.order_vec.push(FlowField::OutSrcMac);
},
_ => {
},
}
}
fn get_field_type(&self, flow_field: FlowField) -> FlowField {
match flow_field {
FlowField::InOctets => {
FlowField::InOctets
},
FlowField::InPkts => {
FlowField::InPkts
},
FlowField::Flows => {
FlowField::Flows
},
FlowField::Protocol => {
FlowField::Protocol
},
FlowField::SrcTOS => {
FlowField::SrcTOS
},
FlowField::TCPFlags => {
FlowField::TCPFlags
},
FlowField::SrcPort => {
FlowField::SrcPort
},
FlowField::SrcAddr => {
FlowField::SrcAddr
},
FlowField::SrcMask => {
FlowField::SrcMask
},
FlowField::InputSNMP => {
FlowField::InputSNMP
},
FlowField::DstPort => {
FlowField::DstPort
},
FlowField::DstAddr => {
FlowField::DstAddr
},
FlowField::DstMask => {
FlowField::DstMask
},
FlowField::OutputSNMP => {
FlowField::OutputSNMP
},
FlowField::NextHop => {
FlowField::NextHop
},
FlowField::InDstMac => {
FlowField::InDstMac
},
FlowField::OutSrcMac => {
FlowField::OutSrcMac
},
_ => {
FlowField::None
},
}
}
fn get_field_size(&self, flow_field: FlowField) -> usize {
match flow_field {
FlowField::InOctets => {
4
},
FlowField::InPkts => {
4
},
FlowField::Flows => {
4
},
FlowField::Protocol => {
1
},
FlowField::SrcTOS => {
1
},
FlowField::TCPFlags => {
1
},
FlowField::SrcPort => {
2
},
FlowField::SrcAddr => {
4
},
FlowField::SrcMask => {
1
},
FlowField::InputSNMP => {
4
},
FlowField::DstPort => {
2
},
FlowField::DstAddr => {
4
},
FlowField::DstMask => {
1
},
FlowField::OutputSNMP => {
4
},
FlowField::NextHop => {
4
},
FlowField::InDstMac => {
6
},
FlowField::OutSrcMac => {
6
},
_ => {
0
},
}
}
fn set_field_value(&self, flow_field: FlowField, new_packet: &mut NetflowTemplate, field_slice: &[u8]) {
match flow_field {
FlowField::InOctets => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u32::from_be_bytes(field_array);
new_packet.in_octets = Some(U32Field::Value(field_data));
},
FlowField::InPkts => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u32::from_be_bytes(field_array);
new_packet.in_packets = Some(U32Field::Value(field_data));
},
FlowField::Protocol => {
let field_array: [u8; 1] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u8::from_be_bytes(field_array);
new_packet.protocol = Some(U8Field::Value(field_data));
},
FlowField::SrcTOS => {
let field_array: [u8; 1] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u8::from_be_bytes(field_array);
new_packet.src_tos = Some(U8Field::Value(field_data));
},
FlowField::TCPFlags => {
let field_array: [u8; 1] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u8::from_be_bytes(field_array);
new_packet.tcp_flags = Some(U8Field::Value(field_data));
},
FlowField::SrcPort => {
let field_array: [u8; 2] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u16::from_be_bytes(field_array);
new_packet.src_port = Some(U16Field::Value(field_data));
},
FlowField::SrcAddr => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u32::from_be_bytes(field_array);
let field_data_ipv4: Ipv4Addr = Ipv4Addr::from_bits(field_data);
new_packet.src_addr = Some(Ipv4Field::Value(field_data_ipv4));
},
FlowField::SrcMask => {
let field_array: [u8; 1] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u8::from_be_bytes(field_array);
new_packet.src_mask = Some(U8Field::Value(field_data));
},
FlowField::InputSNMP => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u32::from_be_bytes(field_array);
new_packet.input_snmp = Some(U32Field::Value(field_data));
},
FlowField::DstPort => {
let field_array: [u8; 2] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u16::from_be_bytes(field_array);
new_packet.dst_port = Some(U16Field::Value(field_data));
},
FlowField::DstAddr => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data: u32 = u32::from_be_bytes(field_array);
let field_data_ipv4: Ipv4Addr = Ipv4Addr::from_bits(field_data);
new_packet.dst_addr = Some(Ipv4Field::Value(field_data_ipv4));
},
FlowField::DstMask => {
let field_array: [u8; 1] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u8::from_be_bytes(field_array);
new_packet.dst_mask = Some(U8Field::Value(field_data));
},
FlowField::OutputSNMP => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data = u32::from_be_bytes(field_array);
new_packet.output_snmp = Some(U32Field::Value(field_data));
},
FlowField::NextHop => {
let field_array: [u8; 4] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data: u32 = u32::from_be_bytes(field_array);
let field_data_ipv4: Ipv4Addr = Ipv4Addr::from_bits(field_data);
new_packet.next_hop = Some(Ipv4Field::Value(field_data_ipv4));
},
FlowField::InDstMac => {
let field_array: [u8; 6] = field_slice.try_into().expect("Unable to convert field_slice to array");
let mut field_array_64: [u8; 8] = [0; 8];
field_array_64[..6].clone_from_slice(&field_array);
let field_data: u64 = u64::from_be_bytes(field_array_64);
new_packet.in_dst_mac = Some(U64Field::Value(field_data));
},
FlowField::OutSrcMac => {
let field_array: [u8; 6] = field_slice.try_into().expect("Unable to convert field_slice to array");
let mut field_array_64: [u8; 8] = [0; 8];
field_array_64[..6].clone_from_slice(&field_array);
let field_data: u64 = u64::from_be_bytes(field_array_64);
new_packet.out_src_mac = Some(U64Field::Value(field_data));
},
_ => {
},
}
}
pub fn start_receiving(&mut self) -> (SocketAddr) {
let (byte_count, socket) = self.socket.recv_from(&mut self.receive_buffer)
.expect("Error receiving from the socket");
self.byte_count = byte_count;
socket
}
pub fn parse_flow_length(&self, message: &[u8]) -> u16 {
let data_len_slice: &[u8] = &message[22..24];
let data_len_array: [u8; 2] = data_len_slice.try_into().expect("Unable to convert data_len_slice to array");
let data_len: u16 = u16::from_be_bytes(data_len_array);
data_len
}
pub fn parse_flow_template_id_from_template(&self, message: &[u8]) -> u16 {
let template_id_slice: &[u8] = &message[24..26];
let template_id_array: [u8; 2] = template_id_slice.try_into().expect("Unable to convert template_id_slice to array");
let template_id: u16 = u16::from_be_bytes(template_id_array);
template_id
}
pub fn parse_flow_template_id_from_data(&self, message: &[u8]) -> u16 {
let template_id_slice: &[u8] = &message[20..22];
let template_id_array: [u8; 2] = template_id_slice.try_into().expect("Unable to convert template_id_slice to array");
let template_id: u16 = u16::from_be_bytes(template_id_array);
template_id
}
pub fn parse_flow_field_count(&self, message: &[u8]) -> u16 {
let field_count_slice: &[u8] = &message[26..28];
let field_count_array: [u8; 2] = field_count_slice.try_into().expect("Unable to convert field_count_slice to array");
let field_count: u16 = u16::from_be_bytes(field_count_array);
field_count
}
pub fn parse_flow_template(&mut self) -> NetflowTemplate {
let message: &[u8] = &self.receive_buffer[..self.byte_count];
let mut received_template = NetflowTemplate::default();
self.parse_flow_length(message);
let template_id = self.parse_flow_template_id_from_template(message);
received_template.id = Some(template_id);
let field_count = self.parse_flow_field_count(message);
received_template.field_count = Some(field_count);
let mut start_slice: usize = 28;
let mut end_slice: usize = 30;
let inc_size: usize = 4;
for x in 0..field_count {
let field_slice: &[u8] = &message[start_slice..end_slice];
let field_array: [u8; 2] = field_slice.try_into().expect("Unable to convert field_slice to array");
let field_data: u16 = u16::from_be_bytes(field_array);
self.decode_field_order(field_data, &mut received_template);
start_slice += inc_size;
end_slice += inc_size;
}
self.initial_template_received = true;
received_template
}
pub fn parse_data_to_packet(&mut self, byte_count: usize, sender_index: usize) {
let message: &[u8] = &self.receive_buffer[..byte_count];
self.parse_flow_length(message);
let template_id = self.parse_flow_template_id_from_data(message);
if template_id != self.senders[sender_index].active_template.id.expect("sender.active_template.id is None") {
return;
}
let field_count = self.senders[sender_index].active_template.field_count.unwrap();
let mut start_slice: usize = 24;
let mut field_type = self.get_field_type(self.senders[sender_index].active_template.order_vec[0]);
let mut inc_size: usize = self.get_field_size(self.senders[sender_index].active_template.order_vec[0]);
let mut end_slice: usize = start_slice + inc_size;
let mut initial_field_parsed = false;
let vec_len: u16 = self.senders[sender_index].active_template.order_vec.len().try_into().unwrap();
if field_count != vec_len {
return;
}
let mut new_packet: NetflowTemplate = NetflowTemplate::default();
let field_count_size: usize = field_count.into();
for x in 0..field_count_size {
if initial_field_parsed {
field_type = self.get_field_type(self.senders[sender_index].active_template.order_vec[x]);
inc_size = self.get_field_size(self.senders[sender_index].active_template.order_vec[x]);
start_slice = end_slice;
end_slice += inc_size;
}
let field_slice: &[u8] = &message[start_slice..end_slice];
self.set_field_value(field_type, &mut new_packet, field_slice);
if !initial_field_parsed {
initial_field_parsed = true;
}
}
self.senders[sender_index].flow_packets.push(new_packet);
}
pub fn determine_packet_type(&self) -> PacketType {
let received_message: &[u8] = &self.receive_buffer[..self.byte_count];
if received_message[20] == 0 && received_message[21] == 0 {
PacketType::Template
}
else {
PacketType::Data
}
}
pub fn wait_for_initial_template(&mut self) -> SocketAddr {
loop {
let source_address = self.start_receiving();
match check_packet_size(self.byte_count) {
Ok(x) => {
},
Err(e) => {
continue;
}
}
let received_message: &[u8] = &self.receive_buffer[..self.byte_count];
if received_message[20] == 0 && received_message[21] == 0 {
return source_address;
}
else {
}
}
}
pub fn wait_for_netflow_data(&mut self) -> SocketAddr {
loop {
let source_address = self.start_receiving();
match check_packet_size(self.byte_count) {
Ok(x) => {
},
Err(e) => {
continue;
}
}
let received_message: &[u8] = &self.receive_buffer[..self.byte_count];
if received_message[20] != 0 && received_message[21] != 0 {
return source_address;
}
else {
}
}
}
pub fn match_sender(&mut self, sender_ip: Ipv4Addr) -> std::result::Result<usize, std::io::Error> {
let vec_len = self.senders.len();
for x in 0..vec_len {
if self.senders[x].ip_addr == sender_ip {
return Ok(x);
}
}
Err(Error::new(ErrorKind::AddrNotAvailable, "Sender not found"))
}
}