snarkos_node_router/helpers/
cache.rs1use crate::messages::BlockRequest;
17use snarkvm::prelude::{Network, puzzle::SolutionID};
18
19use core::hash::Hash;
20use linked_hash_map::LinkedHashMap;
21#[cfg(feature = "locktick")]
22use locktick::parking_lot::RwLock;
23#[cfg(not(feature = "locktick"))]
24use parking_lot::RwLock;
25use std::{
26 collections::{HashMap, HashSet, VecDeque},
27 net::{IpAddr, SocketAddr},
28};
29use time::{Duration, OffsetDateTime};
30
31const MAX_CACHE_SIZE: usize = 1 << 17;
33
34type SolutionKey<N> = (SocketAddr, SolutionID<N>);
36type TransactionKey<N> = (SocketAddr, <N as Network>::TransactionID);
38
39#[derive(Debug)]
40pub struct Cache<N: Network> {
41 seen_inbound_connections: RwLock<HashMap<IpAddr, VecDeque<OffsetDateTime>>>,
43 seen_inbound_messages: RwLock<HashMap<SocketAddr, VecDeque<OffsetDateTime>>>,
45 seen_inbound_puzzle_requests: RwLock<HashMap<SocketAddr, VecDeque<OffsetDateTime>>>,
47 seen_inbound_block_requests: RwLock<HashMap<SocketAddr, VecDeque<OffsetDateTime>>>,
49 seen_inbound_solutions: RwLock<LinkedHashMap<SolutionKey<N>, OffsetDateTime>>,
51 seen_inbound_transactions: RwLock<LinkedHashMap<TransactionKey<N>, OffsetDateTime>>,
53 seen_outbound_block_requests: RwLock<HashMap<SocketAddr, HashSet<BlockRequest>>>,
55 seen_outbound_puzzle_requests: RwLock<HashMap<SocketAddr, u32>>,
57 seen_outbound_solutions: RwLock<LinkedHashMap<SolutionKey<N>, OffsetDateTime>>,
59 seen_outbound_transactions: RwLock<LinkedHashMap<TransactionKey<N>, OffsetDateTime>>,
61 seen_outbound_peer_requests: RwLock<HashMap<SocketAddr, u32>>,
63}
64
65impl<N: Network> Default for Cache<N> {
66 fn default() -> Self {
68 Self::new()
69 }
70}
71
72impl<N: Network> Cache<N> {
73 const INBOUND_BLOCK_REQUEST_INTERVAL: i64 = 60;
74 const INBOUND_PUZZLE_REQUEST_INTERVAL: i64 = 60;
75
76 pub fn new() -> Self {
78 Self {
79 seen_inbound_connections: Default::default(),
80 seen_inbound_messages: Default::default(),
81 seen_inbound_puzzle_requests: Default::default(),
82 seen_inbound_block_requests: Default::default(),
83 seen_inbound_solutions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
84 seen_inbound_transactions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
85 seen_outbound_block_requests: Default::default(),
86 seen_outbound_puzzle_requests: Default::default(),
87 seen_outbound_solutions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
88 seen_outbound_transactions: RwLock::new(LinkedHashMap::with_capacity(MAX_CACHE_SIZE)),
89 seen_outbound_peer_requests: Default::default(),
90 }
91 }
92}
93
94impl<N: Network> Cache<N> {
95 pub fn insert_inbound_connection(&self, peer_ip: IpAddr, interval_in_secs: i64) -> usize {
97 Self::retain_and_insert(&self.seen_inbound_connections, peer_ip, interval_in_secs)
98 }
99
100 pub fn insert_inbound_message(&self, peer_ip: SocketAddr, interval_in_secs: i64) -> usize {
102 Self::retain_and_insert(&self.seen_inbound_messages, peer_ip, interval_in_secs)
103 }
104
105 pub fn insert_inbound_puzzle_request(&self, peer_ip: SocketAddr) -> usize {
107 Self::retain_and_insert(&self.seen_inbound_puzzle_requests, peer_ip, Self::INBOUND_PUZZLE_REQUEST_INTERVAL)
108 }
109
110 pub fn insert_inbound_block_request(&self, peer_ip: SocketAddr) -> usize {
112 Self::retain_and_insert(&self.seen_inbound_block_requests, peer_ip, Self::INBOUND_BLOCK_REQUEST_INTERVAL)
113 }
114
115 pub fn insert_inbound_solution(&self, peer_ip: SocketAddr, solution_id: SolutionID<N>) -> Option<OffsetDateTime> {
117 Self::refresh_and_insert(&self.seen_inbound_solutions, (peer_ip, solution_id))
118 }
119
120 pub fn insert_inbound_transaction(
122 &self,
123 peer_ip: SocketAddr,
124 transaction: N::TransactionID,
125 ) -> Option<OffsetDateTime> {
126 Self::refresh_and_insert(&self.seen_inbound_transactions, (peer_ip, transaction))
127 }
128}
129
130impl<N: Network> Cache<N> {
131 pub fn contains_inbound_block_request(&self, peer_ip: &SocketAddr) -> bool {
133 Self::retain(&self.seen_inbound_block_requests, *peer_ip, Self::INBOUND_BLOCK_REQUEST_INTERVAL) > 0
134 }
135
136 pub fn num_outbound_block_requests(&self, peer_ip: &SocketAddr) -> usize {
138 self.seen_outbound_block_requests.read().get(peer_ip).map(|r| r.len()).unwrap_or(0)
139 }
140
141 pub fn contains_outbound_block_request(&self, peer_ip: &SocketAddr, request: &BlockRequest) -> bool {
143 self.seen_outbound_block_requests.read().get(peer_ip).map(|r| r.contains(request)).unwrap_or(false)
144 }
145
146 pub fn insert_outbound_block_request(&self, peer_ip: SocketAddr, request: BlockRequest) -> usize {
148 let mut map_write = self.seen_outbound_block_requests.write();
149 let requests = map_write.entry(peer_ip).or_default();
150 requests.insert(request);
151 requests.len()
152 }
153
154 pub fn remove_outbound_block_request(&self, peer_ip: SocketAddr, request: &BlockRequest) -> bool {
156 let mut map_write = self.seen_outbound_block_requests.write();
157 if let Some(requests) = map_write.get_mut(&peer_ip) { requests.remove(request) } else { false }
158 }
159
160 pub fn contains_outbound_puzzle_request(&self, peer_ip: &SocketAddr) -> bool {
162 self.seen_outbound_puzzle_requests.read().get(peer_ip).map(|r| *r > 0).unwrap_or(false)
163 }
164
165 pub fn increment_outbound_puzzle_requests(&self, peer_ip: SocketAddr) -> u32 {
167 Self::increment_counter(&self.seen_outbound_puzzle_requests, peer_ip)
168 }
169
170 pub fn decrement_outbound_puzzle_requests(&self, peer_ip: SocketAddr) -> u32 {
172 Self::decrement_counter(&self.seen_outbound_puzzle_requests, peer_ip)
173 }
174
175 pub fn insert_outbound_solution(&self, peer_ip: SocketAddr, solution_id: SolutionID<N>) -> Option<OffsetDateTime> {
177 Self::refresh_and_insert(&self.seen_outbound_solutions, (peer_ip, solution_id))
178 }
179
180 pub fn insert_outbound_transaction(
182 &self,
183 peer_ip: SocketAddr,
184 transaction: N::TransactionID,
185 ) -> Option<OffsetDateTime> {
186 Self::refresh_and_insert(&self.seen_outbound_transactions, (peer_ip, transaction))
187 }
188
189 pub fn contains_outbound_peer_request(&self, peer_ip: SocketAddr) -> bool {
191 self.seen_outbound_peer_requests.read().get(&peer_ip).map(|r| *r > 0).unwrap_or(false)
192 }
193
194 pub fn increment_outbound_peer_requests(&self, peer_ip: SocketAddr) -> u32 {
196 Self::increment_counter(&self.seen_outbound_peer_requests, peer_ip)
197 }
198
199 pub fn decrement_outbound_peer_requests(&self, peer_ip: SocketAddr) -> u32 {
201 Self::decrement_counter(&self.seen_outbound_peer_requests, peer_ip)
202 }
203
204 pub fn clear_peer_entries(&self, peer_ip: SocketAddr) {
206 self.seen_outbound_block_requests.write().remove(&peer_ip);
207 }
208}
209
210impl<N: Network> Cache<N> {
211 fn retain_and_insert<K: Eq + Hash + Clone>(
213 map: &RwLock<HashMap<K, VecDeque<OffsetDateTime>>>,
214 key: K,
215 interval_in_secs: i64,
216 ) -> usize {
217 let now = OffsetDateTime::now_utc();
219
220 let mut map_write = map.write();
221 let timestamps = map_write.entry(key).or_default();
223 timestamps.push_back(now);
225 while timestamps.front().map_or(false, |t| now - *t > Duration::seconds(interval_in_secs)) {
227 timestamps.pop_front();
228 }
229 timestamps.len()
231 }
232
233 fn retain<K: Eq + Hash + Clone>(
235 map: &RwLock<HashMap<K, VecDeque<OffsetDateTime>>>,
236 key: K,
237 interval_in_secs: i64,
238 ) -> usize {
239 let now = OffsetDateTime::now_utc();
241
242 let mut map_write = map.write();
243 let timestamps = map_write.entry(key).or_default();
245 while timestamps.front().map_or(false, |t| now - *t > Duration::seconds(interval_in_secs)) {
247 timestamps.pop_front();
248 }
249 timestamps.len()
251 }
252
253 fn increment_counter<K: Hash + Eq>(map: &RwLock<HashMap<K, u32>>, key: K) -> u32 {
255 let mut map_write = map.write();
256 let entry = map_write.entry(key).or_default();
258 *entry = entry.saturating_add(1);
259 *entry
261 }
262
263 fn decrement_counter<K: Copy + Hash + Eq>(map: &RwLock<HashMap<K, u32>>, key: K) -> u32 {
265 let mut map_write = map.write();
266 let entry = map_write.entry(key).or_default();
268 let value = entry.saturating_sub(1);
269 if *entry == 0 {
271 map_write.remove(&key);
272 } else {
273 *entry = value;
274 }
275 value
277 }
278
279 fn refresh<K: Eq + Hash, V>(map: &RwLock<LinkedHashMap<K, V>>) {
281 let mut map_write = map.write();
282 while map_write.len() >= MAX_CACHE_SIZE {
283 map_write.pop_front();
284 }
285 }
286
287 fn refresh_and_insert<K: Eq + Hash>(
290 map: &RwLock<LinkedHashMap<K, OffsetDateTime>>,
291 key: K,
292 ) -> Option<OffsetDateTime> {
293 let previous_timestamp = map.write().insert(key, OffsetDateTime::now_utc());
295 Self::refresh(map);
297 previous_timestamp
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use snarkvm::prelude::MainnetV0;
306
307 use std::net::Ipv4Addr;
308
309 type CurrentNetwork = MainnetV0;
310
311 #[test]
312 fn test_inbound_block_request() {
313 let cache = Cache::<CurrentNetwork>::default();
314 let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
315
316 assert_eq!(cache.seen_inbound_block_requests.read().len(), 0);
318
319 assert_eq!(cache.insert_inbound_block_request(peer_ip), 1);
321
322 assert!(cache.contains_inbound_block_request(&peer_ip));
324
325 assert_eq!(cache.insert_inbound_block_request(peer_ip), 2);
327
328 assert!(cache.contains_inbound_block_request(&peer_ip));
330 }
331
332 #[test]
333 fn test_inbound_solution() {
334 let cache = Cache::<CurrentNetwork>::default();
335 let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
336 let solution_id = SolutionID::<CurrentNetwork>::from(123456789);
337
338 assert_eq!(cache.seen_inbound_solutions.read().len(), 0);
340
341 assert!(cache.insert_inbound_solution(peer_ip, solution_id).is_none());
343
344 assert_eq!(cache.seen_inbound_solutions.read().len(), 1);
346
347 assert!(cache.insert_inbound_solution(peer_ip, solution_id).is_some());
349
350 assert_eq!(cache.seen_inbound_solutions.read().len(), 1);
352 }
353
354 #[test]
355 fn test_inbound_transaction() {
356 let cache = Cache::<CurrentNetwork>::default();
357 let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
358 let transaction = Default::default();
359
360 assert_eq!(cache.seen_inbound_transactions.read().len(), 0);
362
363 assert!(cache.insert_inbound_transaction(peer_ip, transaction).is_none());
365
366 assert_eq!(cache.seen_inbound_transactions.read().len(), 1);
368
369 assert!(cache.insert_inbound_transaction(peer_ip, transaction).is_some());
371
372 assert_eq!(cache.seen_inbound_transactions.read().len(), 1);
374 }
375
376 #[test]
377 fn test_outbound_solution() {
378 let cache = Cache::<CurrentNetwork>::default();
379 let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
380 let solution_id = SolutionID::<CurrentNetwork>::from(123456789);
381
382 assert_eq!(cache.seen_outbound_solutions.read().len(), 0);
384
385 assert!(cache.insert_outbound_solution(peer_ip, solution_id).is_none());
387
388 assert_eq!(cache.seen_outbound_solutions.read().len(), 1);
390
391 assert!(cache.insert_outbound_solution(peer_ip, solution_id).is_some());
393
394 assert_eq!(cache.seen_outbound_solutions.read().len(), 1);
396 }
397
398 #[test]
399 fn test_outbound_transaction() {
400 let cache = Cache::<CurrentNetwork>::default();
401 let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
402 let transaction = Default::default();
403
404 assert_eq!(cache.seen_outbound_transactions.read().len(), 0);
406
407 assert!(cache.insert_outbound_transaction(peer_ip, transaction).is_none());
409
410 assert_eq!(cache.seen_outbound_transactions.read().len(), 1);
412
413 assert!(cache.insert_outbound_transaction(peer_ip, transaction).is_some());
415
416 assert_eq!(cache.seen_outbound_transactions.read().len(), 1);
418 }
419
420 #[test]
421 fn test_outbound_peer_request() {
422 let cache = Cache::<CurrentNetwork>::default();
423 let peer_ip = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234);
424
425 assert!(cache.seen_outbound_peer_requests.read().is_empty());
427 assert!(!cache.contains_outbound_peer_request(peer_ip));
428
429 assert_eq!(cache.increment_outbound_peer_requests(peer_ip), 1);
431
432 assert!(cache.contains_outbound_peer_request(peer_ip));
434
435 assert_eq!(cache.increment_outbound_peer_requests(peer_ip), 2);
437
438 assert!(cache.contains_outbound_peer_request(peer_ip));
440
441 assert_eq!(cache.decrement_outbound_peer_requests(peer_ip), 1);
443
444 assert_eq!(cache.decrement_outbound_peer_requests(peer_ip), 0);
446
447 assert!(!cache.contains_outbound_peer_request(peer_ip));
449 }
450}