#![deny(missing_docs)]
#[macro_use]
extern crate log;
extern crate fnv;
extern crate time;
extern crate mowl;
extern crate linked_hash_map;
#[macro_use]
pub mod error;
use error::{PathResult, ErrorType};
use std::fmt;
use std::hash::{BuildHasherDefault, Hash};
use std::net::IpAddr;
use time::{Duration, precise_time_ns};
use fnv::FnvHasher;
use linked_hash_map::LinkedHashMap;
use log::LogLevel;
type HashMapFnv<K, C> = LinkedHashMap<Identifier<K>, Data<C>, BuildHasherDefault<FnvHasher>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Path<K, C>
where K: Hash + Eq + PartialEq
{
hashmap: HashMapFnv<K, C>,
pub timeout: Duration,
pub max_connections: u64,
}
impl<K, C> Path<K, C>
where C: Clone,
K: fmt::Debug + Clone + Hash + Eq + PartialEq
{
pub fn new() -> Self {
Path {
hashmap: HashMapFnv::default(),
timeout: Duration::minutes(10),
max_connections: 1_000_000, }
}
pub fn set_log_level(self, level: LogLevel) -> Self {
if mowl::init_with_level(level).is_err() {
error!("Logger already set.");
} else {
info!("Log level set to: {:?}", level);
}
self
}
pub fn track(&mut self, identifier: Identifier<K>) -> PathResult<Connection<K, C>> {
let now = precise_time_ns();
let connection_state = match self.hashmap.get_refresh(&identifier) {
Some(data) => {
if Duration::nanoseconds((now - data.timestamp) as i64) <= self.timeout {
match data.packet_counter.checked_add(1) {
Some(value) => data.packet_counter = value,
None => bail!(ErrorType::PacketCounterOverflow, "Packet counter overflow"),
}
data.timestamp = now;
ConnectionState::Ok
} else {
ConnectionState::Timeout
}
}
None => ConnectionState::New,
};
match connection_state {
ConnectionState::Timeout => {
self.hashmap.remove(&identifier);
warn!("Connection removed (timeout): {}", identifier);
bail!(ErrorType::Timeout, "Connection removed because of timeout");
}
ConnectionState::New => {
if self.max_connections > 0 && self.hashmap.len() as u64 >= self.max_connections {
let removed = self.hashmap.pop_front();
warn!("Connection removed (HashMap full): {}", removed.unwrap().0);
}
debug!("Connection inserted: {}", identifier);
self.hashmap.insert(identifier, Data::new());
}
ConnectionState::Ok => {}
}
Ok(self.last_mut().unwrap())
}
pub fn connection_count(&self) -> usize {
self.hashmap.len()
}
pub fn last_mut(&mut self) -> Option<Connection<K, C>> {
self.hashmap.iter_mut().rev().next().map(|(i, d)| Connection::new(i, d))
}
pub fn remove(&mut self, identifier: &Identifier<K>) {
self.hashmap.remove(identifier);
}
pub fn flush(&mut self) -> Vec<Identifier<K>> {
let now = precise_time_ns();
let identifiers = self.hashmap
.iter()
.filter_map(|(identifier, data)| if Duration::nanoseconds((now - data.timestamp) as i64) > self.timeout {
Some(identifier.clone())
} else {
None
})
.collect::<Vec<Identifier<K>>>();
for identifier in &identifiers {
self.remove(identifier);
}
identifiers
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct Connection<'a, 'b, K: 'a, C: 'b> {
pub identifier: &'a Identifier<K>,
pub data: &'b mut Data<C>,
}
impl<'a, 'b, K, C> Connection<'a, 'b, K, C> {
pub fn new(identifier: &'a Identifier<K>, data: &'b mut Data<C>) -> Self {
Connection {
identifier: identifier,
data: data,
}
}
}
impl<'a, 'b, K, C> fmt::Display for Connection<'a, 'b, K, C>
where K: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.identifier)
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Identifier<K> {
pub lower: Subscriber,
pub greater: Subscriber,
pub key: K,
}
impl<K> Identifier<K> {
pub fn new(source_ip: IpAddr, source_port: u16, destination_ip: IpAddr, destination_port: u16, key: K) -> Self {
let source_tuple = (source_ip, source_port);
let destination_tuple = (destination_ip, destination_port);
let connection_tuple = if source_tuple > destination_tuple {
(destination_tuple, source_tuple)
} else {
(source_tuple, destination_tuple)
};
Identifier {
lower: Subscriber {
address: (connection_tuple.0).0,
port: (connection_tuple.0).1,
},
greater: Subscriber {
address: (connection_tuple.1).0,
port: (connection_tuple.1).1,
},
key: key,
}
}
}
impl<K: fmt::Debug> fmt::Display for Identifier<K> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"{}:{} ↹ {}:{} ({:?})",
self.lower.address,
self.lower.port,
self.greater.address,
self.greater.port,
self.key)
}
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct Subscriber {
pub address: IpAddr,
pub port: u16,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Data<C> {
pub custom: Option<C>,
packet_counter: u64,
timestamp: u64,
}
impl<C> Data<C> {
pub fn new() -> Self {
Data {
packet_counter: 1,
timestamp: precise_time_ns(),
custom: None,
}
}
pub fn packet_counter(&self) -> u64 {
self.packet_counter
}
}
enum ConnectionState {
Ok,
New,
Timeout,
}