#![doc(html_logo_url =
"https://raw.githubusercontent.com/maidsafe/QA/master/Images/maidsafe_logo.png",
html_favicon_url = "http://maidsafe.net/img/favicon.ico",
html_root_url = "http://maidsafe.github.io/message_filter")]
#![forbid(bad_style, exceeding_bitshifts, mutable_transmutes, no_mangle_const_items,
unknown_crate_types, warnings)]
#![deny(deprecated, drop_with_repr_extern, improper_ctypes, missing_docs,
non_shorthand_field_patterns, overflowing_literals, plugin_as_library,
private_no_mangle_fns, private_no_mangle_statics, stable_features, unconditional_recursion,
unknown_lints, unsafe_code, unused, unused_allocation, unused_attributes,
unused_comparisons, unused_features, unused_parens, while_true)]
#![warn(trivial_casts, trivial_numeric_casts, unused_extern_crates, unused_import_braces,
unused_qualifications, unused_results)]
#![allow(box_pointers, fat_ptr_transmutes, missing_copy_implementations,
missing_debug_implementations, variant_size_differences)]
#![cfg_attr(feature="clippy", feature(plugin))]
#![cfg_attr(feature="clippy", plugin(clippy))]
#![cfg_attr(feature="clippy", deny(clippy, clippy_pedantic))]
#![cfg_attr(feature="clippy", allow(use_debug))]
#[cfg(test)]
extern crate rand;
use std::hash::{Hash, Hasher, SipHasher};
use std::marker::PhantomData;
use std::time::{Duration, SystemTime};
fn hash<T: Hash>(t: &T) -> u64 {
let mut s = SipHasher::new();
t.hash(&mut s);
s.finish()
}
pub struct MessageFilter<Message> {
entries: Vec<TimestampedMessage>,
capacity: Option<usize>,
time_to_live: Option<Duration>,
phantom: PhantomData<Message>,
}
impl<Message: Hash> MessageFilter<Message> {
pub fn with_capacity(capacity: usize) -> MessageFilter<Message> {
MessageFilter {
entries: vec![],
capacity: Some(capacity),
time_to_live: None,
phantom: PhantomData,
}
}
pub fn with_expiry_duration(time_to_live: Duration) -> MessageFilter<Message> {
MessageFilter {
entries: vec![],
capacity: None,
time_to_live: Some(time_to_live),
phantom: PhantomData,
}
}
pub fn with_expiry_duration_and_capacity(time_to_live: Duration,
capacity: usize)
-> MessageFilter<Message> {
MessageFilter {
entries: vec![],
capacity: Some(capacity),
time_to_live: Some(time_to_live),
phantom: PhantomData,
}
}
pub fn insert(&mut self, message: &Message) -> usize {
self.remove_expired();
let hash_code = hash(message);
if let Some(index) = self.entries.iter().position(|ref t| t.hash_code == hash_code) {
let mut timestamped_message = self.entries.remove(index);
timestamped_message.update_expiry_point(self.time_to_live);
let count = timestamped_message.increment_count();
self.entries.push(timestamped_message);
count
} else {
self.entries.push(TimestampedMessage::new(hash_code, self.time_to_live));
self.remove_excess();
0
}
}
pub fn remove(&mut self, message: &Message) {
self.remove_expired();
let hash_code = hash(message);
if let Some(index) = self.entries.iter().position(|ref t| t.hash_code == hash_code) {
let _ = self.entries.remove(index);
}
}
pub fn count(&self, message: &Message) -> usize {
let hash_code = hash(message);
self.entries.iter().find(|t| t.hash_code == hash_code).map_or(0, |t| t.count)
}
pub fn contains(&mut self, message: &Message) -> bool {
self.remove_expired();
let hash_code = hash(message);
self.entries.iter().any(|ref entry| entry.hash_code == hash_code)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn clear(&mut self) {
self.entries.clear();
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn remove_excess(&mut self) {
if let Some(capacity) = self.capacity {
if self.entries.len() > capacity {
let _ = self.entries.remove(0);
debug_assert!(self.entries.len() == capacity);
}
}
}
fn remove_expired(&mut self) {
if self.time_to_live.is_some() {
let now = SystemTime::now();
if let Some(at) = self.entries.iter().position(|ref entry| entry.expiry_point > now) {
self.entries = self.entries.split_off(at)
} else {
self.entries.clear();
}
}
}
}
struct TimestampedMessage {
pub hash_code: u64,
pub expiry_point: SystemTime,
pub count: usize,
}
impl TimestampedMessage {
pub fn new(hash_code: u64, time_to_live: Option<Duration>) -> TimestampedMessage {
TimestampedMessage {
hash_code: hash_code,
expiry_point: match time_to_live {
Some(time_to_live) => SystemTime::now() + time_to_live,
None => SystemTime::now(),
},
count: 0,
}
}
pub fn update_expiry_point(&mut self, time_to_live: Option<Duration>) {
self.expiry_point = match time_to_live {
Some(time_to_live) => SystemTime::now() + time_to_live,
None => SystemTime::now(),
};
}
pub fn increment_count(&mut self) -> usize {
self.count += 1;
self.count
}
}
#[cfg(test)]
mod test {
use super::*;
use rand;
use rand::Rng;
use std::thread;
use std::time::Duration;
#[test]
fn size_only() {
let size = rand::random::<u8>() as usize + 1;
let mut msg_filter = MessageFilter::<usize>::with_capacity(size);
assert!(msg_filter.time_to_live.is_none());
assert_eq!(Some(size), msg_filter.capacity);
for i in 0..size {
assert_eq!(msg_filter.len(), i);
assert_eq!(0, msg_filter.insert(&i));
assert_eq!(msg_filter.len(), i + 1);
}
assert!((0..size).all(|index| msg_filter.contains(&index)));
for i in size..1000 {
assert_eq!(0, msg_filter.insert(&i));
assert_eq!(msg_filter.len(), size);
assert!(msg_filter.contains(&i));
if size > 1 {
assert!(msg_filter.contains(&(i - 1)));
assert!(msg_filter.contains(&(i - size + 1)));
}
assert!(!msg_filter.contains(&(i - size)));
}
}
#[test]
fn time_only() {
let time_to_live = Duration::from_millis(rand::thread_rng().gen_range(50, 150));
let mut msg_filter = MessageFilter::<usize>::with_expiry_duration(time_to_live);
assert_eq!(Some(time_to_live), msg_filter.time_to_live);
assert_eq!(None, msg_filter.capacity);
for i in 0..10 {
assert_eq!(0, msg_filter.insert(&i));
assert!(msg_filter.contains(&i));
}
assert_eq!(msg_filter.len(), 10);
let sleep_duration =
Duration::from_millis(time_to_live.subsec_nanos() as u64 / 1000000 + 10);
thread::sleep(sleep_duration);
assert_eq!(0, msg_filter.insert(&11));
assert!(msg_filter.contains(&11));
assert_eq!(msg_filter.len(), 1);
for i in 0..10 {
assert_eq!(msg_filter.len(), i + 1);
assert_eq!(0, msg_filter.insert(&i));
assert!(msg_filter.contains(&i));
assert_eq!(msg_filter.len(), i + 2);
}
}
#[test]
fn time_and_size() {
let size = rand::random::<u8>() as usize + 1;
let time_to_live = Duration::from_millis(rand::thread_rng().gen_range(50, 150));
let mut msg_filter =
MessageFilter::<usize>::with_expiry_duration_and_capacity(time_to_live, size);
assert_eq!(Some(time_to_live), msg_filter.time_to_live);
assert_eq!(Some(size), msg_filter.capacity);
for i in 0..1000 {
if i < size {
assert_eq!(msg_filter.len(), i);
} else {
assert_eq!(msg_filter.len(), size);
}
assert_eq!(0, msg_filter.insert(&i));
assert!(msg_filter.contains(&i));
if i < size {
assert_eq!(msg_filter.len(), i + 1);
} else {
assert_eq!(msg_filter.len(), size);
}
}
let sleep_duration =
Duration::from_millis(time_to_live.subsec_nanos() as u64 / 1000000 + 10);
thread::sleep(sleep_duration);
assert!(!msg_filter.contains(&1000));
assert_eq!(msg_filter.len(), 0);
}
#[test]
fn time_size_struct_value() {
#[derive(PartialEq, PartialOrd, Ord, Clone, Eq, Hash)]
struct Temp {
id: Vec<u8>,
}
impl Default for Temp {
fn default() -> Temp {
let mut rng = rand::thread_rng();
Temp { id: rand::sample(&mut rng, 0u8..255, 64) }
}
}
let size = rand::random::<u8>() as usize + 1;
let time_to_live = Duration::from_millis(rand::thread_rng().gen_range(50, 150));
let mut msg_filter = MessageFilter::<Temp>::with_expiry_duration_and_capacity(time_to_live,
size);
assert_eq!(Some(time_to_live), msg_filter.time_to_live);
assert_eq!(Some(size), msg_filter.capacity);
for i in 0..1000 {
if i < size {
assert_eq!(msg_filter.len(), i);
} else {
assert_eq!(msg_filter.len(), size);
}
let temp: Temp = Default::default();
assert_eq!(0, msg_filter.insert(&temp));
assert!(msg_filter.contains(&temp));
if i < size {
assert_eq!(msg_filter.len(), i + 1);
} else {
assert_eq!(msg_filter.len(), size);
}
}
let sleep_duration =
Duration::from_millis(time_to_live.subsec_nanos() as u64 / 1000000 + 10);
thread::sleep(sleep_duration);
let temp: Temp = Default::default();
assert_eq!(0, msg_filter.insert(&temp));
assert_eq!(msg_filter.len(), 1);
assert!(msg_filter.contains(&temp));
}
#[test]
fn add_duplicate() {
let size = 3;
let mut capacity_filter = MessageFilter::<usize>::with_capacity(size);
for i in 0..size {
assert_eq!(0, capacity_filter.insert(&i));
}
assert!((0..size).all(|index| capacity_filter.contains(&index)));
assert_eq!(0, capacity_filter.count(&0));
assert_eq!(1, capacity_filter.insert(&0));
assert_eq!(1, capacity_filter.count(&0));
assert_eq!(0, capacity_filter.insert(&3));
assert!(capacity_filter.contains(&0));
assert!(!capacity_filter.contains(&1));
assert!(capacity_filter.contains(&2));
assert!(capacity_filter.contains(&3));
assert_eq!(2, capacity_filter.insert(&0));
assert_eq!(2, capacity_filter.count(&0));
let time_to_live = Duration::from_millis(200);
let mut time_filter = MessageFilter::<usize>::with_expiry_duration(time_to_live);
assert_eq!(0, time_filter.insert(&0));
let sleep_duration =
Duration::from_millis(time_to_live.subsec_nanos() as u64 / 1000000 / 2 + 10);
thread::sleep(sleep_duration);
assert_eq!(1, time_filter.insert(&0));
thread::sleep(sleep_duration);
assert!(time_filter.contains(&0));
thread::sleep(sleep_duration);
assert!(!time_filter.contains(&0));
}
}