use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::net::SocketAddr;
use bip_util::bt::InfoHash;
use chrono::{UTC, DateTime, Duration};
const MAX_ITEMS_STORED: usize = 500;
pub struct AnnounceStorage {
storage: HashMap<InfoHash, Vec<AnnounceItem>>,
expires: Vec<ItemExpiration>,
}
impl AnnounceStorage {
pub fn new() -> AnnounceStorage {
AnnounceStorage {
storage: HashMap::new(),
expires: Vec::new(),
}
}
pub fn add_item(&mut self, info_hash: InfoHash, address: SocketAddr) -> bool {
self.add(info_hash, address, UTC::now())
}
fn add(&mut self, info_hash: InfoHash, address: SocketAddr, curr_time: DateTime<UTC>) -> bool {
self.remove_expired_items(curr_time);
let item = AnnounceItem::new(info_hash, address);
let item_expiration = item.expiration();
match self.insert_contact(item) {
Some(true) => {
self.expires.retain(|i| i != &item_expiration);
self.expires.push(item_expiration);
true
}
Some(false) => {
self.expires.push(item_expiration);
true
}
None => false,
}
}
pub fn find_items<F>(&mut self, info_hash: &InfoHash, item_func: F)
where F: FnMut(SocketAddr)
{
self.find(info_hash, item_func, UTC::now())
}
fn find<F>(&mut self, info_hash: &InfoHash, mut item_func: F, curr_time: DateTime<UTC>)
where F: FnMut(SocketAddr)
{
self.remove_expired_items(curr_time);
if let Some(items) = self.storage.get(info_hash) {
for item in items {
item_func(item.address());
}
}
}
fn insert_contact(&mut self, item: AnnounceItem) -> Option<bool> {
let item_info_hash = item.info_hash();
let already_in_list = if let Some(items) = self.storage.get_mut(&item_info_hash) {
items.iter().any(|a| a == &item)
} else {
false
};
match (already_in_list, self.expires.len() < MAX_ITEMS_STORED) {
(false, true) => {
match self.storage.entry(item_info_hash) {
Entry::Occupied(mut occ) => occ.get_mut().push(item),
Entry::Vacant(vac) => {
vac.insert(vec![item]);
}
};
Some(false)
}
(false, false) => None,
(true, false) => Some(true),
(true, true) => Some(true),
}
}
fn remove_expired_items(&mut self, curr_time: DateTime<UTC>) {
let num_expired_items = self.expires.iter().take_while(|i| i.is_expired(curr_time)).count();
for item_expiration in self.expires.drain(0..num_expired_items) {
let info_hash = item_expiration.info_hash();
let remove_info_hash = if let Some(items) = self.storage.get_mut(&info_hash) {
items.retain(|a| a.expiration() != item_expiration);
items.is_empty()
} else {
false
};
if remove_info_hash {
self.storage.remove(&info_hash);
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct AnnounceItem {
expiration: ItemExpiration,
}
impl AnnounceItem {
pub fn new(info_hash: InfoHash, address: SocketAddr) -> AnnounceItem {
AnnounceItem { expiration: ItemExpiration::new(info_hash, address) }
}
pub fn expiration(&self) -> ItemExpiration {
self.expiration.clone()
}
pub fn address(&self) -> SocketAddr {
self.expiration.address()
}
pub fn info_hash(&self) -> InfoHash {
self.expiration.info_hash()
}
}
const EXPIRATION_TIME_HOURS: i64 = 24;
#[derive(Debug, Clone)]
struct ItemExpiration {
address: SocketAddr,
inserted: DateTime<UTC>,
info_hash: InfoHash,
}
impl ItemExpiration {
pub fn new(info_hash: InfoHash, address: SocketAddr) -> ItemExpiration {
ItemExpiration {
address: address,
inserted: UTC::now(),
info_hash: info_hash,
}
}
pub fn is_expired(&self, now: DateTime<UTC>) -> bool {
now - self.inserted >= Duration::hours(EXPIRATION_TIME_HOURS)
}
pub fn info_hash(&self) -> InfoHash {
self.info_hash
}
pub fn address(&self) -> SocketAddr {
self.address
}
}
impl PartialEq for ItemExpiration {
fn eq(&self, other: &ItemExpiration) -> bool {
self.address() == other.address() && self.info_hash() == other.info_hash()
}
}
impl Eq for ItemExpiration {}
#[cfg(test)]
mod tests {
use bip_util::bt;
use bip_util::test as bip_test;
use chrono::Duration;
use storage::{self, AnnounceStorage};
#[test]
fn positive_add_and_retrieve_contact() {
let mut announce_store = AnnounceStorage::new();
let info_hash = [0u8; bt::INFO_HASH_LEN].into();
let sock_addr = bip_test::dummy_socket_addr_v4();
assert!(announce_store.add_item(info_hash, sock_addr));
let mut items = Vec::new();
announce_store.find_items(&info_hash, |a| items.push(a));
assert_eq!(items.len(), 1);
assert_eq!(items[0], sock_addr);
}
#[test]
fn positive_add_and_retrieve_contacts() {
let mut announce_store = AnnounceStorage::new();
let info_hash = [0u8; bt::INFO_HASH_LEN].into();
let sock_addrs = bip_test::dummy_block_socket_addrs(storage::MAX_ITEMS_STORED as u16);
for sock_addr in sock_addrs.iter() {
assert!(announce_store.add_item(info_hash, *sock_addr));
}
let mut items = Vec::new();
announce_store.find_items(&info_hash, |a| items.push(a));
assert_eq!(items.len(), storage::MAX_ITEMS_STORED);
for item in items.iter() {
assert!(sock_addrs.iter().any(|s| s == item));
}
}
#[test]
fn positive_renew_contacts() {
let mut announce_store = AnnounceStorage::new();
let info_hash = [0u8; bt::INFO_HASH_LEN].into();
let sock_addrs = bip_test::dummy_block_socket_addrs((storage::MAX_ITEMS_STORED + 1) as u16);
for sock_addr in sock_addrs.iter().take(storage::MAX_ITEMS_STORED) {
assert!(announce_store.add_item(info_hash, *sock_addr));
}
let other_info_hash = [1u8; bt::INFO_HASH_LEN].into();
assert!(!announce_store.add_item(other_info_hash, sock_addrs[sock_addrs.len() - 1]));
let mut times_invoked = 0;
announce_store.find_items(&other_info_hash, |_| times_invoked += 1);
assert_eq!(times_invoked, 0);
for sock_addr in sock_addrs.iter().take(storage::MAX_ITEMS_STORED) {
assert!(announce_store.add_item(info_hash, *sock_addr));
}
}
#[test]
fn positive_full_storage_expire_one_infohash() {
let mut announce_store = AnnounceStorage::new();
let info_hash = [0u8; bt::INFO_HASH_LEN].into();
let sock_addrs = bip_test::dummy_block_socket_addrs((storage::MAX_ITEMS_STORED + 1) as u16);
for sock_addr in sock_addrs.iter().take(storage::MAX_ITEMS_STORED) {
assert!(announce_store.add_item(info_hash, *sock_addr));
}
let other_info_hash = [1u8; bt::INFO_HASH_LEN].into();
assert!(!announce_store.add_item(other_info_hash, sock_addrs[sock_addrs.len() - 1]));
let mut times_invoked = 0;
announce_store.find_items(&other_info_hash, |_| times_invoked += 1);
assert_eq!(times_invoked, 0);
let mock_current_time =
bip_test::travel_into_future(Duration::hours(storage::EXPIRATION_TIME_HOURS));
assert!(announce_store.add(other_info_hash,
sock_addrs[sock_addrs.len() - 1],
mock_current_time));
announce_store.find_items(&other_info_hash, |_| times_invoked += 1);
assert_eq!(times_invoked, 1);
}
#[test]
fn positive_full_storage_expire_two_infohash() {
let mut announce_store = AnnounceStorage::new();
let info_hash_one = [0u8; bt::INFO_HASH_LEN].into();
let info_hash_two = [1u8; bt::INFO_HASH_LEN].into();
let sock_addrs = bip_test::dummy_block_socket_addrs((storage::MAX_ITEMS_STORED + 1) as u16);
let num_contacts_first = storage::MAX_ITEMS_STORED / 2;
for sock_addr in sock_addrs.iter().take(num_contacts_first) {
assert!(announce_store.add_item(info_hash_one, *sock_addr));
}
let num_contacts_second = storage::MAX_ITEMS_STORED - num_contacts_first;
for sock_addr in sock_addrs.iter().skip(num_contacts_first).take(num_contacts_second) {
assert!(announce_store.add_item(info_hash_two, *sock_addr));
}
let info_hash_three = [2u8; bt::INFO_HASH_LEN].into();
assert!(!announce_store.add_item(info_hash_three, sock_addrs[sock_addrs.len() - 1]));
let mut times_invoked = 0;
announce_store.find_items(&info_hash_three, |_| times_invoked += 1);
assert_eq!(times_invoked, 0);
let mock_current_time =
bip_test::travel_into_future(Duration::hours(storage::EXPIRATION_TIME_HOURS));
assert!(announce_store.add(info_hash_three,
sock_addrs[sock_addrs.len() - 1],
mock_current_time));
announce_store.find_items(&info_hash_three, |_| times_invoked += 1);
assert_eq!(times_invoked, 1);
}
}