1use crate::MAX_FETCH_TIMEOUT_IN_MS;
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkvm::{
19 console::network::{Network, consensus_config_value},
20 prelude::Result,
21};
22
23use anyhow::anyhow;
24#[cfg(feature = "locktick")]
25use locktick::parking_lot::RwLock;
26#[cfg(not(feature = "locktick"))]
27use parking_lot::RwLock;
28use std::{
29 collections::{HashMap, HashSet},
30 hash::Hash,
31 net::SocketAddr,
32 sync::Arc,
33};
34use time::OffsetDateTime;
35use tokio::sync::oneshot;
36
37pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64;
40
41pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> Result<usize> {
43 let num_validators =
45 if let Ok(n) = ledger.get_committee_lookback_for_round(round).map(|committee| committee.num_members()) {
46 n
47 } else {
48 let max_committee_size = consensus_config_value!(N, MAX_CERTIFICATES, ledger.latest_block_height())
49 .ok_or_else(|| anyhow!("Couldn't obtain MAX_CERTIFICATES"))?;
50 max_committee_size as usize
51 };
52
53 Ok(1 + num_validators.saturating_div(3))
57}
58
59#[derive(Debug)]
60pub struct Pending<T: PartialEq + Eq + Hash, V: Clone> {
61 pending: RwLock<HashMap<T, HashMap<SocketAddr, Vec<(oneshot::Sender<V>, i64, bool)>>>>,
64}
65
66impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Default for Pending<T, V> {
67 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
74 pub fn new() -> Self {
76 Self { pending: Default::default() }
77 }
78
79 pub fn is_empty(&self) -> bool {
81 self.pending.read().is_empty()
82 }
83
84 pub fn len(&self) -> usize {
86 self.pending.read().len()
87 }
88
89 pub fn contains(&self, item: impl Into<T>) -> bool {
91 self.pending.read().contains_key(&item.into())
92 }
93
94 pub fn contains_peer(&self, item: impl Into<T>, peer_ip: SocketAddr) -> bool {
96 self.pending.read().get(&item.into()).is_some_and(|peer_ips| peer_ips.contains_key(&peer_ip))
97 }
98
99 pub fn contains_peer_with_sent_request(&self, item: impl Into<T>, peer_ip: SocketAddr) -> bool {
101 self.pending.read().get(&item.into()).is_some_and(|peer_ips| {
102 peer_ips
103 .get(&peer_ip)
104 .map(|callbacks| callbacks.iter().any(|(_, _, request_sent)| *request_sent))
105 .unwrap_or(false)
106 })
107 }
108
109 pub fn get_peers(&self, item: impl Into<T>) -> Option<HashSet<SocketAddr>> {
111 self.pending.read().get(&item.into()).map(|map| map.keys().cloned().collect())
112 }
113
114 pub fn num_callbacks(&self, item: impl Into<T>) -> usize {
116 let item = item.into();
117 let now = OffsetDateTime::now_utc().unix_timestamp();
118 self.clear_expired_callbacks_for_item(now, item);
120 self.pending.read().get(&item).map_or(0, |peers| peers.values().fold(0, |acc, v| acc.saturating_add(v.len())))
122 }
123
124 pub fn num_sent_requests(&self, item: impl Into<T>) -> usize {
126 let item = item.into();
127 let now = OffsetDateTime::now_utc().unix_timestamp();
128 self.clear_expired_callbacks_for_item(now, item);
130 self.pending
132 .read()
133 .get(&item)
134 .map_or(0, |peers| peers.values().flatten().filter(|(_, _, request_sent)| *request_sent).count())
135 }
136
137 pub fn insert(
143 &self,
144 item: impl Into<T>,
145 peer_ip: SocketAddr,
146 callback: Option<(oneshot::Sender<V>, bool)>,
147 ) -> bool {
148 let item = item.into();
149 let now = OffsetDateTime::now_utc().unix_timestamp();
150 let result = {
152 let mut pending = self.pending.write();
154
155 let entry = pending.entry(item).or_default();
157
158 let is_new_peer = !entry.contains_key(&peer_ip);
160
161 let peer_entry = entry.entry(peer_ip).or_default();
163
164 if let Some((callback, request_sent)) = callback {
166 peer_entry.push((callback, now, request_sent));
167 }
168
169 is_new_peer
170 };
171
172 self.clear_expired_callbacks_for_item(now, item);
174
175 result
177 }
178
179 pub fn remove(&self, item: impl Into<T>, callback_value: Option<V>) -> Option<HashSet<SocketAddr>> {
183 let item = item.into();
184 match self.pending.write().remove(&item) {
186 Some(callbacks) => {
187 let peer_ips = callbacks.keys().copied().collect();
189 if let Some(callback_value) = callback_value {
191 for (callback, _, _) in callbacks.into_values().flat_map(|callbacks| callbacks.into_iter()) {
193 callback.send(callback_value.clone()).ok();
194 }
195 }
196 Some(peer_ips)
198 }
199 None => None,
200 }
201 }
202
203 pub fn clear_expired_callbacks_for_item(&self, now: i64, item: impl Into<T>) {
205 let item = item.into();
206
207 let mut pending = self.pending.write();
209
210 if let Some(peer_map) = pending.get_mut(&item) {
212 for (_, callbacks) in peer_map.iter_mut() {
214 callbacks.retain(|(_, timestamp, _)| now - *timestamp <= CALLBACK_EXPIRATION_IN_SECS);
215 }
216
217 peer_map.retain(|_, callbacks| !callbacks.is_empty());
219
220 if peer_map.is_empty() {
222 pending.remove(&item);
223 }
224 }
225 }
226
227 pub fn clear_expired_callbacks(&self) {
229 let now = OffsetDateTime::now_utc().unix_timestamp();
230 let mut pending = self.pending.write();
232
233 pending.retain(|_, peer_map| {
235 for (_, callbacks) in peer_map.iter_mut() {
237 callbacks.retain(|(_, timestamp, _)| now - *timestamp <= CALLBACK_EXPIRATION_IN_SECS);
238 }
239
240 peer_map.retain(|_, callbacks| !callbacks.is_empty());
242
243 !peer_map.is_empty()
245 });
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use snarkvm::{
253 ledger::narwhal::TransmissionID,
254 prelude::{Rng, TestRng},
255 };
256
257 use std::{thread, time::Duration};
258
259 type CurrentNetwork = snarkvm::prelude::MainnetV0;
260
261 const ITERATIONS: usize = 100;
262
263 #[test]
264 fn test_pending() {
265 let rng = &mut TestRng::default();
266
267 let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
269
270 assert!(pending.is_empty());
272 assert_eq!(pending.len(), 0);
273
274 let solution_id_1 = TransmissionID::Solution(
276 rng.r#gen::<u64>().into(),
277 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
278 );
279 let solution_id_2 = TransmissionID::Solution(
280 rng.r#gen::<u64>().into(),
281 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
282 );
283 let solution_id_3 = TransmissionID::Solution(
284 rng.r#gen::<u64>().into(),
285 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
286 );
287 let solution_id_4 = TransmissionID::Solution(
288 rng.r#gen::<u64>().into(),
289 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
290 );
291
292 let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
294 let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
295 let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));
296 let addr_4 = SocketAddr::from(([127, 0, 0, 1], 4567));
297
298 let (callback_sender_1, _) = oneshot::channel();
300 let (callback_sender_2, _) = oneshot::channel();
301 let (callback_sender_3, _) = oneshot::channel();
302 let (callback_sender_4, _) = oneshot::channel();
303
304 assert!(pending.insert(solution_id_1, addr_1, Some((callback_sender_1, true))));
306 assert!(pending.insert(solution_id_2, addr_2, Some((callback_sender_2, true))));
307 assert!(pending.insert(solution_id_3, addr_3, Some((callback_sender_3, true))));
308 assert!(pending.insert(solution_id_4, addr_4, Some((callback_sender_4, false))));
310
311 assert_eq!(pending.len(), 4);
313 assert!(!pending.is_empty());
314
315 let ids = [solution_id_1, solution_id_2, solution_id_3];
317 let peers = [addr_1, addr_2, addr_3];
318
319 for i in 0..3 {
320 let id = ids[i];
321 assert!(pending.contains(id));
322 assert!(pending.contains_peer(id, peers[i]));
323 assert!(pending.contains_peer_with_sent_request(id, peers[i]));
324 }
325 assert!(pending.contains_peer(solution_id_4, addr_4));
327 assert!(!pending.contains_peer_with_sent_request(solution_id_4, addr_4));
328
329 let unknown_id = TransmissionID::Solution(
330 rng.r#gen::<u64>().into(),
331 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
332 );
333 assert!(!pending.contains(unknown_id));
334
335 assert_eq!(pending.get_peers(solution_id_1), Some(HashSet::from([addr_1])));
337 assert_eq!(pending.get_peers(solution_id_2), Some(HashSet::from([addr_2])));
338 assert_eq!(pending.get_peers(solution_id_3), Some(HashSet::from([addr_3])));
339 assert_eq!(pending.get_peers(solution_id_4), Some(HashSet::from([addr_4])));
340 assert_eq!(pending.get_peers(unknown_id), None);
341
342 assert!(pending.remove(solution_id_1, None).is_some());
344 assert!(pending.remove(solution_id_2, None).is_some());
345 assert!(pending.remove(solution_id_3, None).is_some());
346 assert!(pending.remove(solution_id_4, None).is_some());
347 assert!(pending.remove(unknown_id, None).is_none());
348
349 assert!(pending.is_empty());
351 }
352
353 #[test]
354 fn test_expired_callbacks() {
355 let rng = &mut TestRng::default();
356
357 let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
359
360 assert!(pending.is_empty());
362 assert_eq!(pending.len(), 0);
363
364 let solution_id_1 = TransmissionID::Solution(
366 rng.r#gen::<u64>().into(),
367 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
368 );
369
370 let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
372 let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
373 let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));
374
375 let (callback_sender_1, _) = oneshot::channel();
377 let (callback_sender_2, _) = oneshot::channel();
378 let (callback_sender_3, _) = oneshot::channel();
379
380 assert!(pending.insert(solution_id_1, addr_1, Some((callback_sender_1, true))));
382 assert!(pending.insert(solution_id_1, addr_2, Some((callback_sender_2, true))));
383
384 thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 - 1));
386
387 assert!(pending.insert(solution_id_1, addr_3, Some((callback_sender_3, true))));
388
389 assert_eq!(pending.num_callbacks(solution_id_1), 3);
391
392 thread::sleep(Duration::from_secs(2));
394
395 assert_eq!(pending.num_callbacks(solution_id_1), 1);
397
398 thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64));
400
401 assert_eq!(pending.num_callbacks(solution_id_1), 0);
403 }
404
405 #[test]
406 fn test_num_sent_requests() {
407 let rng = &mut TestRng::default();
408
409 let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
411
412 for _ in 0..ITERATIONS {
413 let solution_id = TransmissionID::Solution(
415 rng.r#gen::<u64>().into(),
416 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
417 );
418 let mut expected_num_sent_requests = 0;
420 for i in 0..ITERATIONS {
421 let addr = SocketAddr::from(([127, 0, 0, 1], i as u16));
423 let (callback_sender, _) = oneshot::channel();
425 let is_sent_request = rng.r#gen();
427 if is_sent_request {
429 expected_num_sent_requests += 1;
430 }
431 assert!(pending.insert(solution_id, addr, Some((callback_sender, is_sent_request))));
433 }
434 assert_eq!(pending.num_sent_requests(solution_id), expected_num_sent_requests);
436 }
437 }
438
439 #[test]
440 fn test_expired_items() {
441 let rng = &mut TestRng::default();
442
443 let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
445
446 assert!(pending.is_empty());
448 assert_eq!(pending.len(), 0);
449
450 let solution_id_1 = TransmissionID::Solution(
452 rng.r#gen::<u64>().into(),
453 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
454 );
455 let solution_id_2 = TransmissionID::Solution(
456 rng.r#gen::<u64>().into(),
457 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
458 );
459
460 let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
462 let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
463 let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));
464
465 let (callback_sender_1, _) = oneshot::channel();
467 let (callback_sender_2, _) = oneshot::channel();
468 let (callback_sender_3, _) = oneshot::channel();
469
470 assert!(pending.insert(solution_id_1, addr_1, Some((callback_sender_1, true))));
472 assert!(pending.insert(solution_id_1, addr_2, Some((callback_sender_2, true))));
473 assert!(pending.insert(solution_id_2, addr_3, Some((callback_sender_3, true))));
474
475 assert_eq!(pending.num_callbacks(solution_id_1), 2);
477 assert_eq!(pending.num_callbacks(solution_id_2), 1);
478 assert_eq!(pending.len(), 2);
479
480 thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1));
482
483 pending.clear_expired_callbacks();
485
486 assert_eq!(pending.num_callbacks(solution_id_1), 0);
488 assert_eq!(pending.num_callbacks(solution_id_2), 0);
489 assert!(pending.is_empty());
490 }
491}
492
493#[cfg(test)]
494mod prop_tests {
495 use super::*;
496
497 use test_strategy::{Arbitrary, proptest};
498
499 #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
500 pub struct Item {
501 pub id: usize,
502 }
503
504 #[derive(Arbitrary, Clone, Debug)]
505 pub struct PendingInput {
506 #[strategy(1..5_000usize)]
507 pub count: usize,
508 }
509
510 impl PendingInput {
511 pub fn to_pending(&self) -> Pending<Item, ()> {
512 let pending = Pending::<Item, ()>::new();
513 for i in 0..self.count {
514 pending.insert(
515 Item { id: i },
516 SocketAddr::from(([127, 0, 0, 1], i as u16)),
517 Some((oneshot::channel().0, true)),
518 );
519 }
520 pending
521 }
522 }
523
524 #[proptest]
525 fn test_pending_proptest(input: PendingInput) {
526 let pending = input.to_pending();
527 assert_eq!(pending.len(), input.count);
528 assert!(!pending.is_empty());
529 assert!(!pending.contains(Item { id: input.count + 1 }));
530 assert_eq!(pending.get_peers(Item { id: input.count + 1 }), None);
531 assert!(pending.remove(Item { id: input.count + 1 }, None).is_none());
532 for i in 0..input.count {
533 assert!(pending.contains(Item { id: i }));
534 let peer_ip = SocketAddr::from(([127, 0, 0, 1], i as u16));
535 assert!(pending.contains_peer(Item { id: i }, peer_ip));
536 assert_eq!(pending.get_peers(Item { id: i }), Some(HashSet::from([peer_ip])));
537 assert!(pending.remove(Item { id: i }, None).is_some());
538 }
539 assert!(pending.is_empty());
540 }
541}