use crate::messages::BlockRequest;
use snarkvm::prelude::{puzzle::SolutionID, Network};
use core::hash::Hash;
use linked_hash_map::LinkedHashMap;
use parking_lot::RwLock;
use std::{
collections::{HashMap, HashSet, VecDeque},
net::{IpAddr, SocketAddr},
};
use time::{Duration, OffsetDateTime};
const MAX_CACHE_SIZE: usize = 1 << 17;
type SolutionKey<N> = (SocketAddr, SolutionID<N>);
type TransactionKey<N> = (SocketAddr, <N as Network>::TransactionID);
#[derive(Debug)]
pub struct Cache<N: Network> {
seen_inbound_connections: RwLock<HashMap<IpAddr, VecDeque<OffsetDateTime>>>,
seen_inbound_messages: RwLock<HashMap<SocketAddr, VecDeque<OffsetDateTime>>>,
seen_inbound_puzzle_requests: RwLock<HashMap<SocketAddr, VecDeque<OffsetDateTime>>>,
seen_inbound_block_requests: RwLock<HashMap<SocketAddr, VecDeque<OffsetDateTime>>>,
seen_inbound_solutions: RwLock<LinkedHashMap<SolutionKey<N>, OffsetDateTime>>,
seen_inbound_transactions: RwLock<LinkedHashMap<TransactionKey<N>, OffsetDateTime>>,
seen_outbound_block_requests: RwLock<HashMap<SocketAddr, HashSet<BlockRequest>>>,
seen_outbound_puzzle_requests: RwLock<HashMap<SocketAddr, u32>>,
seen_outbound_solutions: RwLock<LinkedHashMap<SolutionKey<N>, OffsetDateTime>>,
seen_outbound_transactions: RwLock<LinkedHashMap<TransactionKey<N>, OffsetDateTime>>,
seen_outbound_peer_requests: RwLock<HashMap<SocketAddr, u32>>,
}
impl<N: Network> Default for Cache<N> {
fn default() -> Self {
Self::new()
}
}
impl<N: Network> Cache<N> {
const INBOUND_BLOCK_REQUEST_INTERVAL: i64 = 60;
const INBOUND_PUZZLE_REQUEST_INTERVAL: i64 = 60;
pub fn new() -> Self {
Self {
seen_inbound_connections: Default::default(),
seen_inbound_messages: Default::default(),
seen_inbound_puzzle_requests: Default::default(),
seen_inbound_block_requests: Default::default(),
seen_inbound_solutions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
seen_inbound_transactions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
seen_outbound_block_requests: Default::default(),
seen_outbound_puzzle_requests: Default::default(),
seen_outbound_solutions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
seen_outbound_transactions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
seen_outbound_peer_requests: Default::default(),
}
}
}
impl<N: Network> Cache<N> {
pub fn insert_inbound_connection(&self, peer_ip: IpAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_connections, peer_ip, interval_in_secs)
}
pub fn insert_inbound_message(&self, peer_ip: SocketAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_messages, peer_ip, interval_in_secs)
}
pub fn insert_inbound_puzzle_request(&self, peer_ip: SocketAddr) -> usize {
Self::retain_and_insert(&self.seen_inbound_puzzle_requests, peer_ip, Self::INBOUND_PUZZLE_REQUEST_INTERVAL)
}
pub fn insert_inbound_block_request(&self, peer_ip: SocketAddr) -> usize {
Self::retain_and_insert(&self.seen_inbound_block_requests, peer_ip, Self::INBOUND_BLOCK_REQUEST_INTERVAL)
}
pub fn insert_inbound_solution(&self, peer_ip: SocketAddr, solution_id: SolutionID<N>) -> Option<OffsetDateTime> {
Self::refresh_and_insert(&self.seen_inbound_solutions, (peer_ip, solution_id))
}
pub fn insert_inbound_transaction(
&self,
peer_ip: SocketAddr,
transaction: N::TransactionID,
) -> Option<OffsetDateTime> {
Self::refresh_and_insert(&self.seen_inbound_transactions, (peer_ip, transaction))
}
}
impl<N: Network> Cache<N> {
pub fn contains_inbound_block_request(&self, peer_ip: &SocketAddr) -> bool {
Self::retain(&self.seen_inbound_block_requests, *peer_ip, Self::INBOUND_BLOCK_REQUEST_INTERVAL) > 0
}
pub fn num_outbound_block_requests(&self, peer_ip: &SocketAddr) -> usize {
self.seen_outbound_block_requests.read().get(peer_ip).map(|r| r.len()).unwrap_or(0)
}
pub fn contains_outbound_block_request(&self, peer_ip: &SocketAddr, request: &BlockRequest) -> bool {
self.seen_outbound_block_requests.read().get(peer_ip).map(|r| r.contains(request)).unwrap_or(false)
}
pub fn insert_outbound_block_request(&self, peer_ip: SocketAddr, request: BlockRequest) -> usize {
let mut map_write = self.seen_outbound_block_requests.write();
let requests = map_write.entry(peer_ip).or_default();
requests.insert(request);
requests.len()
}
pub fn remove_outbound_block_request(&self, peer_ip: SocketAddr, request: &BlockRequest) -> bool {
let mut map_write = self.seen_outbound_block_requests.write();
if let Some(requests) = map_write.get_mut(&peer_ip) { requests.remove(request) } else { false }
}
pub fn contains_outbound_puzzle_request(&self, peer_ip: &SocketAddr) -> bool {
self.seen_outbound_puzzle_requests.read().get(peer_ip).map(|r| *r > 0).unwrap_or(false)
}
pub fn increment_outbound_puzzle_requests(&self, peer_ip: SocketAddr) -> u32 {
Self::increment_counter(&self.seen_outbound_puzzle_requests, peer_ip)
}
pub fn decrement_outbound_puzzle_requests(&self, peer_ip: SocketAddr) -> u32 {
Self::decrement_counter(&self.seen_outbound_puzzle_requests, peer_ip)
}
pub fn insert_outbound_solution(&self, peer_ip: SocketAddr, solution_id: SolutionID<N>) -> Option<OffsetDateTime> {
Self::refresh_and_insert(&self.seen_outbound_solutions, (peer_ip, solution_id))
}
pub fn insert_outbound_transaction(
&self,
peer_ip: SocketAddr,
transaction: N::TransactionID,
) -> Option<OffsetDateTime> {
Self::refresh_and_insert(&self.seen_outbound_transactions, (peer_ip, transaction))
}
pub fn contains_outbound_peer_request(&self, peer_ip: SocketAddr) -> bool {
self.seen_outbound_peer_requests.read().get(&peer_ip).map(|r| *r > 0).unwrap_or(false)
}
pub fn increment_outbound_peer_requests(&self, peer_ip: SocketAddr) -> u32 {
Self::increment_counter(&self.seen_outbound_peer_requests, peer_ip)
}
pub fn decrement_outbound_peer_requests(&self, peer_ip: SocketAddr) -> u32 {
Self::decrement_counter(&self.seen_outbound_peer_requests, peer_ip)
}
}
impl<N: Network> Cache<N> {
fn retain_and_insert<K: Eq + Hash + Clone>(
map: &RwLock<HashMap<K, VecDeque<OffsetDateTime>>>,
key: K,
interval_in_secs: i64,
) -> usize {
let now = OffsetDateTime::now_utc();
let mut map_write = map.write();
let timestamps = map_write.entry(key).or_default();
timestamps.push_back(now);
while timestamps.front().map_or(false, |t| now - *t > Duration::seconds(interval_in_secs)) {
timestamps.pop_front();
}
timestamps.len()
}
fn retain<K: Eq + Hash + Clone>(
map: &RwLock<HashMap<K, VecDeque<OffsetDateTime>>>,
key: K,
interval_in_secs: i64,
) -> usize {
let now = OffsetDateTime::now_utc();
let mut map_write = map.write();
let timestamps = map_write.entry(key).or_default();
while timestamps.front().map_or(false, |t| now - *t > Duration::seconds(interval_in_secs)) {
timestamps.pop_front();
}
timestamps.len()
}
fn increment_counter<K: Hash + Eq>(map: &RwLock<HashMap<K, u32>>, key: K) -> u32 {
let mut map_write = map.write();
let entry = map_write.entry(key).or_default();
*entry = entry.saturating_add(1);
*entry
}
fn decrement_counter<K: Copy + Hash + Eq>(map: &RwLock<HashMap<K, u32>>, key: K) -> u32 {
let mut map_write = map.write();
let entry = map_write.entry(key).or_default();
let value = entry.saturating_sub(1);
if *entry == 0 {
map_write.remove(&key);
} else {
*entry = value;
}
value
}
fn refresh<K: Eq + Hash, V>(map: &RwLock<LinkedHashMap<K, V>>) {
let mut map_write = map.write();
while map_write.len() >= MAX_CACHE_SIZE {
map_write.pop_front();
}
}
fn refresh_and_insert<K: Eq + Hash>(
map: &RwLock<LinkedHashMap<K, OffsetDateTime>>,
key: K,
) -> Option<OffsetDateTime> {
let previous_timestamp = map.write().insert(key, OffsetDateTime::now_utc());
Self::refresh(map);
previous_timestamp
}
}
#[cfg(test)]
mod tests {
use super::*;
use snarkvm::prelude::MainnetV0;
use std::net::Ipv4Addr;
type CurrentNetwork = MainnetV0;
#[test]
fn test_inbound_block_request() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
assert_eq!(cache.seen_inbound_block_requests.read().len(), 0);
assert_eq!(cache.insert_inbound_block_request(peer_ip), 1);
assert!(cache.contains_inbound_block_request(&peer_ip));
assert_eq!(cache.insert_inbound_block_request(peer_ip), 2);
assert!(cache.contains_inbound_block_request(&peer_ip));
}
#[test]
fn test_inbound_solution() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
let solution_id = SolutionID::<CurrentNetwork>::from(123456789);
assert_eq!(cache.seen_inbound_solutions.read().len(), 0);
assert!(cache.insert_inbound_solution(peer_ip, solution_id).is_none());
assert_eq!(cache.seen_inbound_solutions.read().len(), 1);
assert!(cache.insert_inbound_solution(peer_ip, solution_id).is_some());
assert_eq!(cache.seen_inbound_solutions.read().len(), 1);
}
#[test]
fn test_inbound_transaction() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
let transaction = Default::default();
assert_eq!(cache.seen_inbound_transactions.read().len(), 0);
assert!(cache.insert_inbound_transaction(peer_ip, transaction).is_none());
assert_eq!(cache.seen_inbound_transactions.read().len(), 1);
assert!(cache.insert_inbound_transaction(peer_ip, transaction).is_some());
assert_eq!(cache.seen_inbound_transactions.read().len(), 1);
}
#[test]
fn test_outbound_solution() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
let solution_id = SolutionID::<CurrentNetwork>::from(123456789);
assert_eq!(cache.seen_outbound_solutions.read().len(), 0);
assert!(cache.insert_outbound_solution(peer_ip, solution_id).is_none());
assert_eq!(cache.seen_outbound_solutions.read().len(), 1);
assert!(cache.insert_outbound_solution(peer_ip, solution_id).is_some());
assert_eq!(cache.seen_outbound_solutions.read().len(), 1);
}
#[test]
fn test_outbound_transaction() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
let transaction = Default::default();
assert_eq!(cache.seen_outbound_transactions.read().len(), 0);
assert!(cache.insert_outbound_transaction(peer_ip, transaction).is_none());
assert_eq!(cache.seen_outbound_transactions.read().len(), 1);
assert!(cache.insert_outbound_transaction(peer_ip, transaction).is_some());
assert_eq!(cache.seen_outbound_transactions.read().len(), 1);
}
#[test]
fn test_outbound_peer_request() {
let cache = Cache::<CurrentNetwork>::default();
let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
assert!(cache.seen_outbound_peer_requests.read().is_empty());
assert!(!cache.contains_outbound_peer_request(peer_ip));
assert_eq!(cache.increment_outbound_peer_requests(peer_ip), 1);
assert!(cache.contains_outbound_peer_request(peer_ip));
assert_eq!(cache.increment_outbound_peer_requests(peer_ip), 2);
assert!(cache.contains_outbound_peer_request(peer_ip));
assert_eq!(cache.decrement_outbound_peer_requests(peer_ip), 1);
assert_eq!(cache.decrement_outbound_peer_requests(peer_ip), 0);
assert!(!cache.contains_outbound_peer_request(peer_ip));
}
}