1use crate::{
17 helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18 locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_router::{PeerPoolHandling, messages::DataBlocks};
22use snarkos_node_sync_communication_service::CommunicationService;
23use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
24use snarkvm::prelude::{Network, block::Block};
25
26use anyhow::{Result, bail, ensure};
27use indexmap::{IndexMap, IndexSet};
28use itertools::Itertools;
29#[cfg(feature = "locktick")]
30use locktick::parking_lot::RwLock;
31#[cfg(feature = "locktick")]
32use locktick::tokio::Mutex as TMutex;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::RwLock;
35use rand::seq::{IteratorRandom, SliceRandom};
36use std::{
37 collections::{BTreeMap, HashMap, HashSet, hash_map},
38 net::{IpAddr, Ipv4Addr, SocketAddr},
39 sync::Arc,
40 time::{Duration, Instant},
41};
42#[cfg(not(feature = "locktick"))]
43use tokio::sync::Mutex as TMutex;
44use tokio::sync::Notify;
45
46mod helpers;
47use helpers::rangify_heights;
48
49mod sync_state;
50use sync_state::SyncState;
51
52mod metrics;
53use metrics::BlockSyncMetrics;
54
55#[cfg(not(test))]
59pub const REDUNDANCY_FACTOR: usize = 1;
60#[cfg(test)]
61pub const REDUNDANCY_FACTOR: usize = 3;
62
63pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
70
71const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
72const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
73
74const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
75
76const MAX_BLOCK_REQUESTS: usize = 50; pub const MAX_BLOCKS_BEHIND: u32 = 1; pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
86
87#[derive(Clone)]
90struct OutstandingRequest<N: Network> {
91 request: SyncRequest<N>,
92 timestamp: Instant,
93 response: Option<Block<N>>,
96}
97
98#[derive(Clone, serde::Serialize)]
100pub struct BlockRequestInfo {
101 elapsed: u64,
103 done: bool,
105}
106
107#[derive(Clone, serde::Serialize)]
109pub struct BlockRequestsSummary {
110 outstanding: String,
111 completed: String,
112}
113
114impl<N: Network> OutstandingRequest<N> {
115 fn sync_ips(&self) -> &IndexSet<SocketAddr> {
117 let (_, _, sync_ips) = &self.request;
118 sync_ips
119 }
120
121 fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
123 let (_, _, sync_ips) = &mut self.request;
124 sync_ips
125 }
126}
127
128pub struct BlockSync<N: Network> {
141 ledger: Arc<dyn LedgerService<N>>,
143
144 locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
147
148 common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
153
154 requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
156
157 sync_state: RwLock<SyncState>,
161
162 advance_with_sync_blocks_lock: TMutex<()>,
164
165 peer_notify: Notify,
167
168 response_notify: Notify,
170
171 metrics: BlockSyncMetrics,
173}
174
175impl<N: Network> BlockSync<N> {
176 pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
178 let sync_state = SyncState::new_with_height(ledger.latest_block_height());
180
181 Self {
182 ledger,
183 sync_state: RwLock::new(sync_state),
184 peer_notify: Default::default(),
185 response_notify: Default::default(),
186 locators: Default::default(),
187 requests: Default::default(),
188 common_ancestors: Default::default(),
189 advance_with_sync_blocks_lock: Default::default(),
190 metrics: Default::default(),
191 }
192 }
193
194 pub async fn wait_for_peer_update(&self) {
199 self.peer_notify.notified().await
200 }
201
202 pub async fn wait_for_block_responses(&self) {
206 self.response_notify.notified().await
207 }
208
209 #[inline]
211 pub fn is_block_synced(&self) -> bool {
212 self.sync_state.read().is_block_synced()
213 }
214
215 #[inline]
221 pub fn can_block_sync(&self) -> bool {
222 self.sync_state.read().can_block_sync() || self.has_pending_responses()
223 }
224
225 #[inline]
228 pub fn num_blocks_behind(&self) -> Option<u32> {
229 self.sync_state.read().num_blocks_behind()
230 }
231
232 #[inline]
234 pub fn greatest_peer_block_height(&self) -> Option<u32> {
235 self.sync_state.read().get_greatest_peer_height()
236 }
237
238 #[inline]
241 pub fn get_sync_height(&self) -> u32 {
242 self.sync_state.read().get_sync_height()
243 }
244
245 #[inline]
247 pub fn num_outstanding_block_requests(&self) -> usize {
248 self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
249 }
250
251 #[inline]
253 pub fn num_total_block_requests(&self) -> usize {
254 self.requests.read().len()
255 }
256
257 pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
259 self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
260 }
261
262 pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
264 self.requests
265 .read()
266 .iter()
267 .map(|(height, request)| {
268 (*height, BlockRequestInfo {
269 done: request.sync_ips().is_empty(),
270 elapsed: request.timestamp.elapsed().as_secs(),
271 })
272 })
273 .collect()
274 }
275
276 pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
278 let completed = self
279 .requests
280 .read()
281 .iter()
282 .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
283 .collect::<Vec<_>>();
284
285 let outstanding = self
286 .requests
287 .read()
288 .iter()
289 .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
290 .collect::<Vec<_>>();
291
292 BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
293 }
294
295 pub fn get_sync_speed(&self) -> f64 {
296 self.metrics.get_sync_speed()
297 }
298}
299
300#[cfg(test)]
302impl<N: Network> BlockSync<N> {
303 fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
305 self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
306 }
307
308 fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
310 self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
311 }
312
313 fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
315 self.requests.read().get(&height).map(|e| e.request.clone())
316 }
317
318 fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
320 self.requests.read().get(&height).map(|e| e.timestamp)
321 }
322}
323
324impl<N: Network> BlockSync<N> {
325 #[inline]
327 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
328 let latest_height = self.ledger.latest_block_height();
330
331 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
334 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
336 recents.insert(height, self.ledger.get_block_hash(height)?);
337 }
338
339 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
341 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
343 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
344 }
345
346 BlockLocators::new(recents, checkpoints)
348 }
349
350 pub fn has_pending_responses(&self) -> bool {
352 self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
353 }
354
355 pub async fn send_block_requests<C: CommunicationService>(
357 &self,
358 communication: &C,
359 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
360 requests: &[(u32, PrepareSyncRequest<N>)],
361 ) -> bool {
362 let (start_height, max_num_sync_ips) = match requests.first() {
363 Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
364 None => {
365 warn!("Block sync failed - no block requests");
366 return false;
367 }
368 };
369
370 debug!("Sending {len} block requests to peers {peers:?}", len = requests.len(), peers = sync_peers.keys());
371
372 let sync_ips: IndexSet<_> =
374 sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
375
376 let end_height = start_height.saturating_add(requests.len() as u32);
378
379 for (height, (hash, previous_hash, _)) in requests.iter() {
381 if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
383 warn!("Block sync failed - {error}");
384 return false;
385 }
386 }
387
388 let message = C::prepare_block_request(start_height, end_height);
392
393 let mut tasks = Vec::with_capacity(sync_ips.len());
395 for sync_ip in sync_ips {
396 let sender = communication.send(sync_ip, message.clone()).await;
397 let task = tokio::spawn(async move {
398 match sender {
400 Some(sender) => {
401 if let Err(err) = sender.await {
402 warn!("Failed to send block request to peer '{sync_ip}': {err}");
403 false
404 } else {
405 true
406 }
407 }
408 None => {
409 warn!("Failed to send block request to peer '{sync_ip}': no such peer");
410 false
411 }
412 }
413 });
414
415 tasks.push(task);
416 }
417
418 for result in futures::future::join_all(tasks).await {
420 let success = match result {
421 Ok(success) => success,
422 Err(err) => {
423 error!("tokio join error: {err}");
424 false
425 }
426 };
427
428 if !success {
430 let mut requests = self.requests.write();
432 for height in start_height..end_height {
433 requests.remove(&height);
434 }
435 return false;
437 }
438 }
439 true
440 }
441
442 #[inline]
450 pub fn insert_block_responses(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
451 for block in blocks {
453 if let Err(error) = self.insert_block_response(peer_ip, block) {
454 self.remove_block_requests_to_peer(&peer_ip);
455 bail!("{error}");
456 }
457 }
458 Ok(())
459 }
460
461 #[inline]
464 pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
465 if let Some(entry) = self.requests.read().get(&next_height) {
468 let is_complete = entry.sync_ips().is_empty();
469 if !is_complete {
470 return None;
471 }
472
473 if entry.response.is_none() {
475 warn!("Request for height {next_height} is complete but no response exists");
476 }
477 entry.response.clone()
478 } else {
479 None
480 }
481 }
482
483 #[inline]
492 pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
493 let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
500 trace!("Skipping attempt to advance block synchronziation as it is already in progress");
501 return Ok(false);
502 };
503
504 let mut current_height = self.ledger.latest_block_height();
506 let start_height = current_height;
507 trace!(
508 "Try advancing with block responses (at block {current_height}, current sync speed is {})",
509 self.get_sync_speed()
510 );
511
512 loop {
513 let next_height = current_height + 1;
514
515 let Some(block) = self.peek_next_block(next_height) else {
516 break;
517 };
518
519 if block.height() != next_height {
521 warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
522 break;
523 }
524
525 let ledger = self.ledger.clone();
526 let advanced = tokio::task::spawn_blocking(move || {
527 match ledger.check_next_block(&block) {
529 Ok(_) => match ledger.advance_to_next_block(&block) {
530 Ok(_) => true,
531 Err(err) => {
532 warn!(
533 "Failed to advance to next block (height: {}, hash: '{}'): {err}",
534 block.height(),
535 block.hash()
536 );
537 false
538 }
539 },
540 Err(err) => {
541 warn!(
542 "The next block (height: {}, hash: '{}') is invalid - {err}",
543 block.height(),
544 block.hash()
545 );
546 false
547 }
548 }
549 })
550 .await?;
551
552 if advanced {
554 self.count_request_completed();
555 }
556
557 self.remove_block_response(next_height);
559
560 if !advanced {
562 break;
563 }
564
565 current_height = next_height;
567 }
568
569 if current_height > start_height {
570 self.set_sync_height(current_height);
571 Ok(true)
572 } else {
573 Ok(false)
574 }
575 }
576}
577
578impl<N: Network> BlockSync<N> {
579 pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
586 let current_height = self.get_sync_height();
588
589 if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
590 let sync_peers =
592 sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
593 Some((sync_peers, min_common_ancestor))
595 } else {
596 None
597 }
598 }
599
600 pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
608 match self.locators.write().entry(peer_ip) {
611 hash_map::Entry::Occupied(mut e) => {
612 if e.get() == locators {
614 return Ok(());
615 }
616
617 let old_height = e.get().latest_locator_height();
618 let new_height = locators.latest_locator_height();
619
620 if old_height > new_height {
621 debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
622 }
623 e.insert(locators.clone());
624 }
625 hash_map::Entry::Vacant(e) => {
626 e.insert(locators.clone());
627 }
628 }
629
630 let new_local_ancestor = {
632 let mut ancestor = 0;
633 for (height, hash) in locators.clone().into_iter() {
637 if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
638 match ledger_hash == hash {
639 true => ancestor = height,
640 false => {
641 debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
642 break;
643 }
644 }
645 }
646 }
647 ancestor
648 };
649
650 let ancestor_updates: Vec<_> = self
653 .locators
654 .read()
655 .iter()
656 .filter_map(|(other_ip, other_locators)| {
657 if other_ip == &peer_ip {
659 return None;
660 }
661 let mut ancestor = 0;
663 for (height, hash) in other_locators.clone().into_iter() {
664 if let Some(expected_hash) = locators.get_hash(height) {
665 match expected_hash == hash {
666 true => ancestor = height,
667 false => {
668 debug!(
669 "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
670 );
671 break;
672 }
673 }
674 }
675 }
676
677 Some((PeerPair(peer_ip, *other_ip), ancestor))
678 })
679 .collect();
680
681 {
684 let mut common_ancestors = self.common_ancestors.write();
685 common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
686
687 for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
688 common_ancestors.insert(peer_pair, new_ancestor);
689 }
690 }
691
692 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
694 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
695 }
696
697 self.peer_notify.notify_one();
699
700 Ok(())
701 }
702
703 pub fn remove_peer(&self, peer_ip: &SocketAddr) {
707 trace!("Removing peer {peer_ip} from block sync");
708
709 self.locators.write().remove(peer_ip);
711 self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
713 self.remove_block_requests_to_peer(peer_ip);
715
716 self.peer_notify.notify_one();
718 }
719}
720
721pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
723
724impl<N: Network> BlockSync<N> {
725 pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
737 let print_requests = || {
739 if tracing::enabled!(tracing::Level::TRACE) {
740 let summary = self.get_block_requests_summary();
741
742 trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
743 trace!("The following requests are still outstanding: {:?}", summary.outstanding);
744 }
745 };
746
747 let current_height = self.get_sync_height();
749
750 let max_outstanding_block_requests =
752 (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
753 let max_total_requests = 4 * max_outstanding_block_requests;
754 let max_new_blocks_to_request =
755 max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
756
757 if self.num_total_block_requests() >= max_total_requests as usize {
759 trace!(
760 "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
761 );
762
763 print_requests();
764
765 (Default::default(), Default::default())
767 } else if max_new_blocks_to_request == 0 {
768 trace!(
769 "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
770 );
771 print_requests();
772
773 (Default::default(), Default::default())
775 } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
776 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
778 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
780 (
782 self.construct_requests(
783 &sync_peers,
784 current_height,
785 min_common_ancestor,
786 max_new_blocks_to_request,
787 greatest_peer_height,
788 ),
789 sync_peers,
790 )
791 } else {
792 if self.requests.read().is_empty() {
794 trace!("All requests have been processed. Will set block synced to true.");
795 self.sync_state.write().set_greatest_peer_height(0);
798 } else {
799 trace!("No new blocks can be requests, but there are still outstanding requests.");
800 }
801
802 (Default::default(), Default::default())
804 }
805 }
806
807 pub fn count_request_completed(&self) {
812 self.metrics.count_request_completed();
813 }
814
815 pub fn set_sync_height(&self, new_height: u32) {
818 let fully_synced = {
820 let mut state = self.sync_state.write();
821 state.set_sync_height(new_height);
822 !state.can_block_sync()
823 };
824
825 if fully_synced {
826 self.metrics.mark_fully_synced();
827 }
828 }
829
830 fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
832 self.check_block_request(height)?;
834 ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
836 self.requests.write().insert(height, OutstandingRequest {
838 request: (hash, previous_hash, sync_ips),
839 timestamp: Instant::now(),
840 response: None,
841 });
842 Ok(())
843 }
844
845 fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
848 let height = block.height();
850 let mut requests = self.requests.write();
851
852 if self.ledger.contains_block_height(height) {
853 bail!("The sync request was removed because we already advanced");
854 }
855
856 let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
857
858 let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
860
861 if let Some(expected_hash) = expected_hash {
863 if block.hash() != *expected_hash {
864 bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
865 }
866 }
867 if let Some(expected_previous_hash) = expected_previous_hash {
869 if block.previous_hash() != *expected_previous_hash {
870 bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
871 }
872 }
873 if !sync_ips.contains(&peer_ip) {
875 bail!("The sync pool did not request block {height} from '{peer_ip}'")
876 }
877
878 entry.sync_ips_mut().swap_remove(&peer_ip);
880
881 if let Some(existing_block) = &entry.response {
882 if block != *existing_block {
884 bail!("Candidate block {height} from '{peer_ip}' is malformed");
885 }
886 } else {
887 entry.response = Some(block.clone());
888 }
889
890 self.response_notify.notify_one();
892
893 Ok(())
894 }
895
896 fn check_block_request(&self, height: u32) -> Result<()> {
898 if self.ledger.contains_block_height(height) {
900 bail!("Failed to add block request, as block {height} exists in the ledger");
901 }
902 if self.requests.read().contains_key(&height) {
904 bail!("Failed to add block request, as block {height} exists in the requests map");
905 }
906
907 Ok(())
908 }
909
910 pub fn remove_block_response(&self, height: u32) {
917 if let Some(e) = self.requests.write().remove(&height) {
919 trace!(
920 "Block request for height {height} was completed in {}ms (sync speed is {})",
921 e.timestamp.elapsed().as_millis(),
922 self.get_sync_speed()
923 );
924
925 self.peer_notify.notify_one();
927 }
928 }
929
930 fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
934 trace!("Block sync is removing all block requests to peer {peer_ip}...");
935
936 self.requests.write().retain(|height, e| {
939 let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
940
941 let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
944 if !retain {
945 trace!("Removed block request timestamp for {peer_ip} at height {height}");
946 }
947 retain
948 });
949
950 }
952
953 pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
960 &self,
961 peer_pool_handler: &P,
962 ) -> Option<BlockRequestBatch<N>> {
963 let mut requests = self.requests.write();
965
966 let now = Instant::now();
968
969 let current_height = self.ledger.latest_block_height();
971
972 let mut timed_out_requests = vec![];
974
975 let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
977
978 requests.retain(|height, e| {
980 let is_obsolete = *height <= current_height;
981 let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
983 let is_complete = e.sync_ips().is_empty();
985
986 let is_timeout = timer_elapsed && !is_complete;
988
989 let retain = !is_timeout && !is_obsolete;
991
992 if is_timeout {
993 trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
994
995 timed_out_requests.push(*height);
997 } else if is_obsolete {
998 trace!("Block request at height {height} became obsolete (current_height={current_height})");
999 }
1000
1001 if is_timeout {
1003 for peer_ip in e.sync_ips().iter() {
1004 peers_to_ban.insert(*peer_ip);
1005 }
1006 }
1007
1008 retain
1009 });
1010
1011 if !timed_out_requests.is_empty() {
1012 debug!("{num} block requests timed out", num = timed_out_requests.len());
1013 }
1014
1015 let next_request_height = requests.iter().next().map(|(h, _)| *h);
1016
1017 drop(requests);
1019
1020 for peer_ip in peers_to_ban {
1022 self.remove_peer(&peer_ip);
1023 peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1024 }
1025
1026 let sync_height = self.get_sync_height();
1032 if let Some(next_height) = next_request_height {
1033 let start = sync_height + 1;
1034
1035 if next_height > start {
1037 let end = next_height; let max_new_blocks_to_request = end - start;
1040
1041 let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start) else {
1042 warn!("Block requests timed out, but found no other peers to re-request from");
1043 return None;
1044 };
1045
1046 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
1048
1049 debug!("Re-requesting blocks starting at height {start}");
1050
1051 return Some((
1052 self.construct_requests(
1053 &sync_peers,
1054 sync_height,
1055 min_common_ancestor,
1056 max_new_blocks_to_request,
1057 greatest_peer_height,
1058 ),
1059 sync_peers,
1060 ));
1061 }
1062 }
1063
1064 None
1065 }
1066
1067 fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1075 let latest_ledger_height = self.ledger.latest_block_height();
1077
1078 let candidate_locators: IndexMap<_, _> = self
1081 .locators
1082 .read()
1083 .iter()
1084 .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1085 .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1086 .take(NUM_SYNC_CANDIDATE_PEERS)
1087 .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1088 .collect();
1089
1090 if candidate_locators.is_empty() {
1092 trace!("Found no sync peers with height greater {current_height}");
1093 return None;
1094 }
1095
1096 let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1103
1104 for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1107 let mut min_common_ancestor = peer_locators.latest_locator_height();
1109
1110 let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1113
1114 for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1116 if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1118 if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1120 min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1122
1123 sync_peers.push((*other_ip, other_locators.clone()));
1125 }
1126 }
1127 }
1128
1129 if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1131 sync_peers.shuffle(&mut rand::thread_rng());
1134
1135 return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1137 }
1138 }
1139
1140 None
1142 }
1143
1144 fn construct_requests(
1146 &self,
1147 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1148 sync_height: u32,
1149 min_common_ancestor: u32,
1150 max_blocks_to_request: u32,
1151 greatest_peer_height: u32,
1152 ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1153 let start_height = {
1155 let requests = self.requests.read();
1156 let mut start_height = sync_height + 1;
1157
1158 loop {
1159 if requests.contains_key(&start_height) {
1160 start_height += 1;
1161 } else {
1162 break;
1163 }
1164 }
1165
1166 start_height
1167 };
1168
1169 if min_common_ancestor < start_height {
1171 if start_height < greatest_peer_height {
1172 trace!(
1173 "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1174 );
1175 }
1176 return Default::default();
1177 }
1178
1179 let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1181
1182 let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1184 let mut max_num_sync_ips = 1;
1186
1187 for height in start_height..end_height {
1188 if let Err(err) = self.check_block_request(height) {
1190 trace!("Failed to issue new request for height {height}: {err}");
1191
1192 match request_hashes.is_empty() {
1195 true => continue,
1196 false => break,
1197 }
1198 }
1199
1200 let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1202
1203 if !is_honest {
1205 warn!("Detected dishonest peer(s) when preparing block request");
1207 if sync_peers.len() < num_sync_ips {
1209 break;
1210 }
1211 }
1212
1213 max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1215
1216 request_hashes.insert(height, (hash, previous_hash));
1218 }
1219
1220 request_hashes
1222 .into_iter()
1223 .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1224 .collect()
1225 }
1226}
1227
1228fn construct_request<N: Network>(
1231 height: u32,
1232 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1233) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1234 let mut hash = None;
1235 let mut hash_redundancy: usize = 0;
1236 let mut previous_hash = None;
1237 let mut is_honest = true;
1238
1239 for peer_locators in sync_peers.values() {
1240 if let Some(candidate_hash) = peer_locators.get_hash(height) {
1241 match hash {
1242 Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1244 Some(_) => {
1246 hash = None;
1247 hash_redundancy = 0;
1248 previous_hash = None;
1249 is_honest = false;
1250 break;
1251 }
1252 None => {
1254 hash = Some(candidate_hash);
1255 hash_redundancy = 1;
1256 }
1257 }
1258 }
1259 if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1260 match previous_hash {
1261 Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1263 Some(_) => {
1265 hash = None;
1266 hash_redundancy = 0;
1267 previous_hash = None;
1268 is_honest = false;
1269 break;
1270 }
1271 None => previous_hash = Some(candidate_previous_hash),
1273 }
1274 }
1275 }
1276
1277 let num_sync_ips = {
1280 if !is_honest {
1282 EXTRA_REDUNDANCY_FACTOR
1284 }
1285 else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1287 1
1289 }
1290 else {
1292 REDUNDANCY_FACTOR
1294 }
1295 };
1296
1297 (hash, previous_hash, num_sync_ips, is_honest)
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302 use super::*;
1303 use crate::locators::{
1304 CHECKPOINT_INTERVAL,
1305 NUM_RECENT_BLOCKS,
1306 test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1307 };
1308
1309 use snarkos_node_bft_ledger_service::MockLedgerService;
1310 use snarkos_node_router::{Peer, Resolver};
1311 use snarkos_node_tcp::{P2P, Tcp};
1312 use snarkvm::{
1313 ledger::committee::Committee,
1314 prelude::{Field, TestRng},
1315 };
1316
1317 use indexmap::{IndexSet, indexset};
1318 #[cfg(feature = "locktick")]
1319 use locktick::parking_lot::RwLock;
1320 #[cfg(not(feature = "locktick"))]
1321 use parking_lot::RwLock;
1322 use rand::Rng;
1323 use std::net::{IpAddr, Ipv4Addr};
1324
1325 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1326
1327 #[derive(Default)]
1328 struct DummyPeerPoolHandler {
1329 peers_to_ban: RwLock<Vec<SocketAddr>>,
1330 }
1331
1332 impl P2P for DummyPeerPoolHandler {
1333 fn tcp(&self) -> &Tcp {
1334 unreachable!();
1335 }
1336 }
1337
1338 impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1339 const MAXIMUM_POOL_SIZE: usize = 10;
1340 const OWNER: &str = "[DummyPeerPoolHandler]";
1341 const PEER_SLASHING_COUNT: usize = 0;
1342
1343 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1344 unreachable!();
1345 }
1346
1347 fn resolver(&self) -> &RwLock<Resolver<N>> {
1348 unreachable!();
1349 }
1350
1351 fn is_dev(&self) -> bool {
1352 true
1353 }
1354
1355 fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1356 self.peers_to_ban.write().push(listener_addr);
1357 }
1358 }
1359
1360 fn sample_peer_ip(id: u16) -> SocketAddr {
1362 assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1363 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1364 }
1365
1366 fn sample_committee() -> Committee<CurrentNetwork> {
1368 let rng = &mut TestRng::default();
1369 snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1370 }
1371
1372 fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1374 MockLedgerService::new_at_height(sample_committee(), height)
1375 }
1376
1377 fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1379 BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1380 }
1381
1382 fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1386 assert!(num_values > 0, "Cannot generate an empty vector");
1387 assert!((max_height as usize) >= num_values);
1388
1389 let mut rng = TestRng::default();
1390
1391 let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1392
1393 heights.push(max_height);
1394
1395 heights
1396 }
1397
1398 fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1400 BlockSync::<CurrentNetwork> {
1401 peer_notify: Notify::new(),
1402 response_notify: Default::default(),
1403 ledger: Arc::new(sample_ledger_service(height)),
1404 locators: RwLock::new(sync.locators.read().clone()),
1405 common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1406 requests: RwLock::new(sync.requests.read().clone()),
1407 sync_state: RwLock::new(sync.sync_state.read().clone()),
1408 advance_with_sync_blocks_lock: Default::default(),
1409 metrics: Default::default(),
1410 }
1411 }
1412
1413 fn check_prepare_block_requests(
1415 sync: BlockSync<CurrentNetwork>,
1416 min_common_ancestor: u32,
1417 peers: IndexSet<SocketAddr>,
1418 ) {
1419 let rng = &mut TestRng::default();
1420
1421 assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1423
1424 let num_peers_within_recent_range_of_ledger = {
1426 if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1428 0
1429 }
1430 else {
1432 peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1433 }
1434 };
1435
1436 let (requests, sync_peers) = sync.prepare_block_requests();
1438
1439 if peers.is_empty() {
1441 assert!(requests.is_empty());
1442 return;
1443 }
1444
1445 let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1447 assert_eq!(requests.len(), expected_num_requests);
1448
1449 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1450 let sync_ips: IndexSet<_> =
1452 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1453 assert_eq!(height, 1 + idx as u32);
1454 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1455 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1456
1457 if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1458 assert_eq!(sync_ips.len(), 1);
1459 } else {
1460 assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1461 assert_eq!(sync_ips, peers);
1462 }
1463 }
1464 }
1465
1466 #[test]
1468 fn test_latest_block_height() {
1469 for height in generate_block_heights(100_001, 5000) {
1470 let sync = sample_sync_at_height(height);
1471 assert_eq!(sync.ledger.latest_block_height(), height);
1473
1474 assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1476 assert_eq!(
1477 sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1478 height
1479 );
1480 }
1481 }
1482
1483 #[test]
1484 fn test_get_block_hash() {
1485 for height in generate_block_heights(100_001, 5000) {
1486 let sync = sample_sync_at_height(height);
1487
1488 assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1490 assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1491 }
1492 }
1493
1494 #[test]
1495 fn test_prepare_block_requests() {
1496 for num_peers in 0..111 {
1497 println!("Testing with {num_peers} peers");
1498
1499 let sync = sample_sync_at_height(0);
1500
1501 let mut peers = indexset![];
1502
1503 for peer_id in 1..=num_peers {
1504 sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1506 peers.insert(sample_peer_ip(peer_id));
1508 }
1509
1510 check_prepare_block_requests(sync, 10, peers);
1512 }
1513 }
1514
1515 #[test]
1516 fn test_prepare_block_requests_with_leading_fork_at_11() {
1517 let sync = sample_sync_at_height(0);
1518
1519 let peer_1 = sample_peer_ip(1);
1530 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1531
1532 let peer_2 = sample_peer_ip(2);
1534 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1535
1536 let peer_3 = sample_peer_ip(3);
1538 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1539
1540 let (requests, _) = sync.prepare_block_requests();
1542 assert_eq!(requests.len(), 10);
1543
1544 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1546 assert_eq!(height, 1 + idx as u32);
1547 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1548 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1549 assert_eq!(num_sync_ips, 1); }
1551 }
1552
1553 #[test]
1554 fn test_prepare_block_requests_with_leading_fork_at_10() {
1555 let rng = &mut TestRng::default();
1556 let sync = sample_sync_at_height(0);
1557
1558 let peer_1 = sample_peer_ip(1);
1573 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1574
1575 let peer_2 = sample_peer_ip(2);
1577 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1578
1579 let peer_3 = sample_peer_ip(3);
1581 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1582
1583 let (requests, _) = sync.prepare_block_requests();
1585 assert_eq!(requests.len(), 0);
1586
1587 let peer_4 = sample_peer_ip(4);
1591 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1592
1593 let (requests, sync_peers) = sync.prepare_block_requests();
1595 assert_eq!(requests.len(), 10);
1596
1597 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1599 let sync_ips: IndexSet<_> =
1601 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1602 assert_eq!(height, 1 + idx as u32);
1603 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1604 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1605 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_1); }
1608 }
1609
1610 #[test]
1611 fn test_prepare_block_requests_with_trailing_fork_at_9() {
1612 let rng = &mut TestRng::default();
1613 let sync = sample_sync_at_height(0);
1614
1615 let peer_1 = sample_peer_ip(1);
1621 sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1622
1623 let peer_2 = sample_peer_ip(2);
1625 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1626
1627 let peer_3 = sample_peer_ip(3);
1629 sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1630
1631 let (requests, _) = sync.prepare_block_requests();
1633 assert_eq!(requests.len(), 0);
1634
1635 let peer_4 = sample_peer_ip(4);
1639 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1640
1641 let (requests, sync_peers) = sync.prepare_block_requests();
1643 assert_eq!(requests.len(), 10);
1644
1645 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1647 let sync_ips: IndexSet<_> =
1649 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1650 assert_eq!(height, 1 + idx as u32);
1651 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1652 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1653 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_3); }
1656 }
1657
1658 #[test]
1659 fn test_insert_block_requests() {
1660 let rng = &mut TestRng::default();
1661 let sync = sample_sync_at_height(0);
1662
1663 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1665
1666 let (requests, sync_peers) = sync.prepare_block_requests();
1668 assert_eq!(requests.len(), 10);
1669
1670 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1671 let sync_ips: IndexSet<_> =
1673 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1674 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1676 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1678 assert!(sync.get_block_request_timestamp(height).is_some());
1679 }
1680
1681 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1682 let sync_ips: IndexSet<_> =
1684 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1685 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1687 assert!(sync.get_block_request_timestamp(height).is_some());
1688 }
1689
1690 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1691 let sync_ips: IndexSet<_> =
1693 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1694 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1696 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1698 assert!(sync.get_block_request_timestamp(height).is_some());
1699 }
1700 }
1701
1702 #[test]
1703 fn test_insert_block_requests_fails() {
1704 let sync = sample_sync_at_height(9);
1705
1706 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1708
1709 sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1711 sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1713 }
1714
1715 #[test]
1716 fn test_update_peer_locators() {
1717 let sync = sample_sync_at_height(0);
1718
1719 let peer1_ip = sample_peer_ip(1);
1721 for peer1_height in 0..500u32 {
1722 sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1723 assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1724
1725 let peer2_ip = sample_peer_ip(2);
1726 for peer2_height in 0..500u32 {
1727 println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1728
1729 sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1730 assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1731
1732 let distance = peer1_height.abs_diff(peer2_height);
1734
1735 if distance < NUM_RECENT_BLOCKS as u32 {
1737 let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1738 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1739 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1740 } else {
1741 let min_checkpoints =
1742 core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1743 let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1744 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1745 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1746 }
1747 }
1748 }
1749 }
1750
1751 #[test]
1752 fn test_remove_peer() {
1753 let sync = sample_sync_at_height(0);
1754
1755 let peer_ip = sample_peer_ip(1);
1756 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1757 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1758
1759 sync.remove_peer(&peer_ip);
1760 assert_eq!(sync.get_peer_height(&peer_ip), None);
1761
1762 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1763 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1764
1765 sync.remove_peer(&peer_ip);
1766 assert_eq!(sync.get_peer_height(&peer_ip), None);
1767 }
1768
1769 #[test]
1770 fn test_locators_insert_remove_insert() {
1771 let sync = sample_sync_at_height(0);
1772
1773 let peer_ip = sample_peer_ip(1);
1774 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1775 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1776
1777 sync.remove_peer(&peer_ip);
1778 assert_eq!(sync.get_peer_height(&peer_ip), None);
1779
1780 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1781 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1782 }
1783
1784 #[test]
1785 fn test_requests_insert_remove_insert() {
1786 let rng = &mut TestRng::default();
1787 let sync = sample_sync_at_height(0);
1788
1789 let peer_ip = sample_peer_ip(1);
1791 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1792
1793 let (requests, sync_peers) = sync.prepare_block_requests();
1795 assert_eq!(requests.len(), 10);
1796
1797 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1798 let sync_ips: IndexSet<_> =
1800 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1801 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1803 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1805 assert!(sync.get_block_request_timestamp(height).is_some());
1806 }
1807
1808 sync.remove_peer(&peer_ip);
1810
1811 for (height, _) in requests {
1812 assert_eq!(sync.get_block_request(height), None);
1814 assert!(sync.get_block_request_timestamp(height).is_none());
1815 }
1816
1817 let (requests, _) = sync.prepare_block_requests();
1819 assert_eq!(requests.len(), 0);
1820
1821 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1823
1824 let (requests, _) = sync.prepare_block_requests();
1826 assert_eq!(requests.len(), 10);
1827
1828 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1829 let sync_ips: IndexSet<_> =
1831 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1832 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1834 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1836 assert!(sync.get_block_request_timestamp(height).is_some());
1837 }
1838 }
1839
1840 #[test]
1841 fn test_obsolete_block_requests() {
1842 let rng = &mut TestRng::default();
1843 let sync = sample_sync_at_height(0);
1844
1845 let locator_height = rng.gen_range(0..50);
1846
1847 let locators = sample_block_locators(locator_height);
1849 sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1850
1851 let (requests, sync_peers) = sync.prepare_block_requests();
1853 assert_eq!(requests.len(), locator_height as usize);
1854
1855 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1857 let sync_ips: IndexSet<_> =
1859 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1860 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1862 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1864 assert!(sync.get_block_request_timestamp(height).is_some());
1865 }
1866
1867 let ledger_height = rng.gen_range(0..=locator_height);
1871 let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1872
1873 assert_eq!(new_sync.requests.read().len(), requests.len());
1875
1876 let c = DummyPeerPoolHandler::default();
1878 new_sync.handle_block_request_timeouts(&c);
1879
1880 assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1882 }
1883
1884 #[test]
1885 fn test_timed_out_block_request() {
1886 let sync = sample_sync_at_height(0);
1887 let peer_ip = sample_peer_ip(1);
1888 let locators = sample_block_locators(10);
1889 let block_hash = locators.get_hash(1);
1890
1891 sync.update_peer_locators(peer_ip, &locators).unwrap();
1892
1893 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1894
1895 sync.requests.write().insert(1, OutstandingRequest {
1897 request: (block_hash, None, [peer_ip].into()),
1898 timestamp,
1899 response: None,
1900 });
1901
1902 assert_eq!(sync.requests.read().len(), 1);
1903 assert_eq!(sync.locators.read().len(), 1);
1904
1905 let c = DummyPeerPoolHandler::default();
1907 sync.handle_block_request_timeouts(&c);
1908
1909 let ban_list = c.peers_to_ban.write();
1910 assert_eq!(ban_list.len(), 1);
1911 assert_eq!(ban_list.iter().next(), Some(&peer_ip));
1912
1913 assert!(sync.requests.read().is_empty());
1914 assert!(sync.locators.read().is_empty());
1915 }
1916
1917 #[test]
1918 fn test_reissue_timed_out_block_request() {
1919 let sync = sample_sync_at_height(0);
1920 let peer_ip1 = sample_peer_ip(1);
1921 let peer_ip2 = sample_peer_ip(2);
1922 let peer_ip3 = sample_peer_ip(3);
1923
1924 let locators = sample_block_locators(10);
1925 let block_hash1 = locators.get_hash(1);
1926 let block_hash2 = locators.get_hash(2);
1927
1928 sync.update_peer_locators(peer_ip1, &locators).unwrap();
1929 sync.update_peer_locators(peer_ip2, &locators).unwrap();
1930 sync.update_peer_locators(peer_ip3, &locators).unwrap();
1931
1932 assert_eq!(sync.locators.read().len(), 3);
1933
1934 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1935
1936 sync.requests.write().insert(1, OutstandingRequest {
1938 request: (block_hash1, None, [peer_ip1].into()),
1939 timestamp,
1940 response: None,
1941 });
1942
1943 sync.requests.write().insert(2, OutstandingRequest {
1945 request: (block_hash2, None, [peer_ip2].into()),
1946 timestamp: Instant::now(),
1947 response: None,
1948 });
1949
1950 assert_eq!(sync.requests.read().len(), 2);
1951
1952 let c = DummyPeerPoolHandler::default();
1954
1955 let re_requests = sync.handle_block_request_timeouts(&c);
1956
1957 let ban_list = c.peers_to_ban.write();
1958 assert_eq!(ban_list.len(), 1);
1959 assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
1960
1961 assert_eq!(sync.requests.read().len(), 1);
1962 assert_eq!(sync.locators.read().len(), 2);
1963
1964 let (new_requests, new_sync_ips) = re_requests.unwrap();
1965 assert_eq!(new_requests.len(), 1);
1966
1967 let (height, (hash, _, _)) = new_requests.first().unwrap();
1968 assert_eq!(*height, 1);
1969 assert_eq!(*hash, block_hash1);
1970 assert_eq!(new_sync_ips.len(), 2);
1971
1972 let mut iter = new_sync_ips.iter();
1974 assert_ne!(iter.next().unwrap().0, &peer_ip1);
1975 assert_ne!(iter.next().unwrap().0, &peer_ip1);
1976 }
1977}