1use crate::{
17 helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18 locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_network::PeerPoolHandling;
22use snarkos_node_router::messages::DataBlocks;
23use snarkos_node_sync_communication_service::CommunicationService;
24use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
25
26use snarkvm::{
27 console::network::{ConsensusVersion, Network},
28 prelude::block::Block,
29 utilities::flatten_error,
30};
31
32use anyhow::{Result, bail, ensure};
33use indexmap::{IndexMap, IndexSet};
34use itertools::Itertools;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(feature = "locktick")]
38use locktick::tokio::Mutex as TMutex;
39#[cfg(not(feature = "locktick"))]
40use parking_lot::RwLock;
41use rand::seq::{IteratorRandom, SliceRandom};
42use std::{
43 collections::{BTreeMap, HashMap, HashSet, hash_map},
44 net::{IpAddr, Ipv4Addr, SocketAddr},
45 sync::Arc,
46 time::{Duration, Instant},
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::sync::Notify;
51
52mod helpers;
53use helpers::rangify_heights;
54
55mod sync_state;
56use sync_state::SyncState;
57
58mod metrics;
59use metrics::BlockSyncMetrics;
60
61#[cfg(not(test))]
65pub const REDUNDANCY_FACTOR: usize = 1;
66#[cfg(test)]
67pub const REDUNDANCY_FACTOR: usize = 3;
68
69pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
76
77const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
78const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
79
80const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
81
82const MAX_BLOCK_REQUESTS: usize = 50; pub const MAX_BLOCKS_BEHIND: u32 = 1; pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
92
93#[derive(Clone)]
96struct OutstandingRequest<N: Network> {
97 request: SyncRequest<N>,
98 timestamp: Instant,
99 response: Option<Block<N>>,
102}
103
104#[derive(Clone, serde::Serialize)]
106pub struct BlockRequestInfo {
107 elapsed: u64,
109 done: bool,
111}
112
113#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestsSummary {
116 outstanding: String,
117 completed: String,
118}
119
120#[derive(thiserror::Error, Debug)]
121pub enum InsertBlockResponseError {
122 #[error("Empty block response")]
123 EmptyBlockResponse,
124 #[error("The peer did not send a consensus version")]
125 NoConsensusVersion,
126 #[error(
127 "The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
128 )]
129 ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
130 #[error("{}", flatten_error(.0))]
131 Other(#[from] anyhow::Error),
132}
133
134impl<N: Network> OutstandingRequest<N> {
135 fn sync_ips(&self) -> &IndexSet<SocketAddr> {
137 let (_, _, sync_ips) = &self.request;
138 sync_ips
139 }
140
141 fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
143 let (_, _, sync_ips) = &mut self.request;
144 sync_ips
145 }
146}
147
148pub struct BlockSync<N: Network> {
161 ledger: Arc<dyn LedgerService<N>>,
163
164 locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
167
168 common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
173
174 requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
176
177 sync_state: RwLock<SyncState>,
181
182 advance_with_sync_blocks_lock: TMutex<()>,
184
185 peer_notify: Notify,
187
188 response_notify: Notify,
190
191 metrics: BlockSyncMetrics,
193}
194
195impl<N: Network> BlockSync<N> {
196 pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
198 let sync_state = SyncState::new_with_height(ledger.latest_block_height());
200
201 Self {
202 ledger,
203 sync_state: RwLock::new(sync_state),
204 peer_notify: Default::default(),
205 response_notify: Default::default(),
206 locators: Default::default(),
207 requests: Default::default(),
208 common_ancestors: Default::default(),
209 advance_with_sync_blocks_lock: Default::default(),
210 metrics: Default::default(),
211 }
212 }
213
214 pub async fn wait_for_peer_update(&self) {
219 self.peer_notify.notified().await
220 }
221
222 pub async fn wait_for_block_responses(&self) {
226 self.response_notify.notified().await
227 }
228
229 #[inline]
231 pub fn is_block_synced(&self) -> bool {
232 self.sync_state.read().is_block_synced()
233 }
234
235 #[inline]
241 pub fn can_block_sync(&self) -> bool {
242 self.sync_state.read().can_block_sync() || self.has_pending_responses()
243 }
244
245 #[inline]
248 pub fn num_blocks_behind(&self) -> Option<u32> {
249 self.sync_state.read().num_blocks_behind()
250 }
251
252 #[inline]
254 pub fn greatest_peer_block_height(&self) -> Option<u32> {
255 self.sync_state.read().get_greatest_peer_height()
256 }
257
258 #[inline]
261 pub fn get_sync_height(&self) -> u32 {
262 self.sync_state.read().get_sync_height()
263 }
264
265 #[inline]
267 pub fn num_outstanding_block_requests(&self) -> usize {
268 self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
269 }
270
271 #[inline]
273 pub fn num_total_block_requests(&self) -> usize {
274 self.requests.read().len()
275 }
276
277 pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
279 self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
280 }
281
282 pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
284 self.requests
285 .read()
286 .iter()
287 .map(|(height, request)| {
288 (*height, BlockRequestInfo {
289 done: request.sync_ips().is_empty(),
290 elapsed: request.timestamp.elapsed().as_secs(),
291 })
292 })
293 .collect()
294 }
295
296 pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
298 let completed = self
299 .requests
300 .read()
301 .iter()
302 .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
303 .collect::<Vec<_>>();
304
305 let outstanding = self
306 .requests
307 .read()
308 .iter()
309 .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
310 .collect::<Vec<_>>();
311
312 BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
313 }
314
315 pub fn get_sync_speed(&self) -> f64 {
316 self.metrics.get_sync_speed()
317 }
318}
319
320#[cfg(test)]
322impl<N: Network> BlockSync<N> {
323 fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
325 self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
326 }
327
328 fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
330 self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
331 }
332
333 fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
335 self.requests.read().get(&height).map(|e| e.request.clone())
336 }
337
338 fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
340 self.requests.read().get(&height).map(|e| e.timestamp)
341 }
342}
343
344impl<N: Network> BlockSync<N> {
345 #[inline]
347 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
348 let latest_height = self.ledger.latest_block_height();
350
351 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
354 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
356 recents.insert(height, self.ledger.get_block_hash(height)?);
357 }
358
359 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
361 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
363 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
364 }
365
366 BlockLocators::new(recents, checkpoints)
368 }
369
370 pub fn has_pending_responses(&self) -> bool {
372 self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
373 }
374
375 pub async fn send_block_requests<C: CommunicationService>(
377 &self,
378 communication: &C,
379 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
380 requests: &[(u32, PrepareSyncRequest<N>)],
381 ) -> bool {
382 let (start_height, max_num_sync_ips) = match requests.first() {
383 Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
384 None => {
385 warn!("Block sync failed - no block requests");
386 return false;
387 }
388 };
389
390 debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
391
392 let sync_ips: IndexSet<_> =
394 sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
395
396 let end_height = start_height.saturating_add(requests.len() as u32);
398
399 for (height, (hash, previous_hash, _)) in requests.iter() {
401 if let Err(err) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
403 let err = err.context(format!("Failed to insert block request for height {height}"));
404 warn!("{}", flatten_error(&err));
405 return false;
406 }
407 }
408
409 let message = C::prepare_block_request(start_height, end_height);
413
414 let mut tasks = Vec::with_capacity(sync_ips.len());
416 for sync_ip in sync_ips {
417 let sender = communication.send(sync_ip, message.clone()).await;
418 let task = tokio::spawn(async move {
419 match sender {
421 Some(sender) => {
422 if let Err(err) = sender.await {
423 warn!("Failed to send block request to peer '{sync_ip}': {err}");
424 false
425 } else {
426 true
427 }
428 }
429 None => {
430 warn!("Failed to send block request to peer '{sync_ip}': no such peer");
431 false
432 }
433 }
434 });
435
436 tasks.push(task);
437 }
438
439 for result in futures::future::join_all(tasks).await {
441 let success = match result {
442 Ok(success) => success,
443 Err(err) => {
444 error!("tokio join error: {err}");
445 false
446 }
447 };
448
449 if !success {
451 let mut requests = self.requests.write();
453 for height in start_height..end_height {
454 requests.remove(&height);
455 }
456 return false;
458 }
459 }
460 true
461 }
462
463 #[inline]
471 pub fn insert_block_responses(
472 &self,
473 peer_ip: SocketAddr,
474 blocks: Vec<Block<N>>,
475 latest_consensus_version: Option<ConsensusVersion>,
476 ) -> Result<(), InsertBlockResponseError> {
477 let result = 'outer: {
479 let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
480 break 'outer Err(InsertBlockResponseError::EmptyBlockResponse);
481 };
482
483 let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
484
485 if expected_consensus_version >= ConsensusVersion::V12 {
488 if let Some(peer_version) = latest_consensus_version {
489 if peer_version != expected_consensus_version {
490 break 'outer Err(InsertBlockResponseError::ConsensusVersionMismatch {
491 peer_version,
492 expected_version: expected_consensus_version,
493 last_height,
494 });
495 }
496 } else {
497 break 'outer Err(InsertBlockResponseError::NoConsensusVersion);
498 }
499 }
500
501 for block in blocks {
503 if let Err(error) = self.insert_block_response(peer_ip, block) {
504 break 'outer Err(error.into());
505 }
506 }
507
508 Ok(())
509 };
510
511 if result.is_err() {
513 self.remove_block_requests_to_peer(&peer_ip);
514 }
515
516 result
518 }
519
520 #[inline]
523 pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
524 if let Some(entry) = self.requests.read().get(&next_height) {
527 let is_complete = entry.sync_ips().is_empty();
528 if !is_complete {
529 return None;
530 }
531
532 if entry.response.is_none() {
534 warn!("Request for height {next_height} is complete but no response exists");
535 }
536 entry.response.clone()
537 } else {
538 None
539 }
540 }
541
542 #[inline]
551 pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
552 let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
559 trace!("Skipping attempt to advance block synchronziation as it is already in progress");
560 return Ok(false);
561 };
562
563 let mut current_height = self.ledger.latest_block_height();
565 let start_height = current_height;
566 trace!(
567 "Try advancing with block responses (at block {current_height}, current sync speed is {})",
568 self.get_sync_speed()
569 );
570
571 loop {
572 let next_height = current_height + 1;
573
574 let Some(block) = self.peek_next_block(next_height) else {
575 break;
576 };
577
578 if block.height() != next_height {
580 warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
581 break;
582 }
583
584 let ledger = self.ledger.clone();
585 let advanced = tokio::task::spawn_blocking(move || {
586 match ledger.check_next_block(&block) {
588 Ok(_) => match ledger.advance_to_next_block(&block) {
589 Ok(_) => true,
590 Err(err) => {
591 let err = err.context(format!(
592 "Failed to advance to next block (height: {}, hash: '{}')",
593 block.height(),
594 block.hash()
595 ));
596 warn!("{}", flatten_error(&err));
597 false
598 }
599 },
600 Err(err) => {
601 let err = err.context(format!(
602 "The next block (height: {}, hash: '{}') is invalid",
603 block.height(),
604 block.hash()
605 ));
606 warn!("{}", flatten_error(&err));
607 false
608 }
609 }
610 })
611 .await?;
612
613 if advanced {
615 self.count_request_completed();
616 }
617
618 self.remove_block_response(next_height);
620
621 if !advanced {
623 break;
624 }
625
626 current_height = next_height;
628 }
629
630 if current_height > start_height {
631 self.set_sync_height(current_height);
632 Ok(true)
633 } else {
634 Ok(false)
635 }
636 }
637}
638
639impl<N: Network> BlockSync<N> {
640 pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
647 let current_height = self.get_sync_height();
649
650 if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
651 let sync_peers =
653 sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
654 Some((sync_peers, min_common_ancestor))
656 } else {
657 None
658 }
659 }
660
661 pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
669 match self.locators.write().entry(peer_ip) {
672 hash_map::Entry::Occupied(mut e) => {
673 if e.get() == locators {
675 return Ok(());
676 }
677
678 let old_height = e.get().latest_locator_height();
679 let new_height = locators.latest_locator_height();
680
681 if old_height > new_height {
682 debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
683 }
684 e.insert(locators.clone());
685 }
686 hash_map::Entry::Vacant(e) => {
687 e.insert(locators.clone());
688 }
689 }
690
691 let new_local_ancestor = {
693 let mut ancestor = 0;
694 for (height, hash) in locators.clone().into_iter() {
698 if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
699 match ledger_hash == hash {
700 true => ancestor = height,
701 false => {
702 debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
703 break;
704 }
705 }
706 }
707 }
708 ancestor
709 };
710
711 let ancestor_updates: Vec<_> = self
714 .locators
715 .read()
716 .iter()
717 .filter_map(|(other_ip, other_locators)| {
718 if other_ip == &peer_ip {
720 return None;
721 }
722 let mut ancestor = 0;
724 for (height, hash) in other_locators.clone().into_iter() {
725 if let Some(expected_hash) = locators.get_hash(height) {
726 match expected_hash == hash {
727 true => ancestor = height,
728 false => {
729 debug!(
730 "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
731 );
732 break;
733 }
734 }
735 }
736 }
737
738 Some((PeerPair(peer_ip, *other_ip), ancestor))
739 })
740 .collect();
741
742 {
745 let mut common_ancestors = self.common_ancestors.write();
746 common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
747
748 for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
749 common_ancestors.insert(peer_pair, new_ancestor);
750 }
751 }
752
753 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
755 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
756 } else {
757 error!("Got new block locators but greatest peer height is zero.");
758 }
759
760 self.peer_notify.notify_one();
762
763 Ok(())
764 }
765
766 pub fn remove_peer(&self, peer_ip: &SocketAddr) {
770 trace!("Removing peer {peer_ip} from block sync");
771
772 self.locators.write().remove(peer_ip);
774 self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
776 self.remove_block_requests_to_peer(peer_ip);
778
779 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
781 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
782 } else {
783 self.sync_state.write().clear_greatest_peer_height();
785 }
786
787 self.peer_notify.notify_one();
789 }
790}
791
792pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
794
795impl<N: Network> BlockSync<N> {
796 pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
810 let print_requests = || {
812 if tracing::enabled!(tracing::Level::TRACE) {
813 let summary = self.get_block_requests_summary();
814
815 trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
816 trace!("The following requests are still outstanding: {:?}", summary.outstanding);
817 }
818 };
819
820 let current_height = self.get_sync_height();
822
823 let max_outstanding_block_requests =
825 (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
826
827 let max_total_requests = 4 * max_outstanding_block_requests;
829
830 let max_new_blocks_to_request =
831 max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
832
833 if self.num_total_block_requests() >= max_total_requests as usize {
835 trace!(
836 "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
837 );
838
839 print_requests();
840 Default::default()
841 } else if max_new_blocks_to_request == 0 {
842 trace!(
843 "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
844 );
845
846 print_requests();
847 Default::default()
848 } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
849 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
852
853 let requests = self.construct_requests(
855 &sync_peers,
856 current_height,
857 min_common_ancestor,
858 max_new_blocks_to_request,
859 greatest_peer_height,
860 );
861
862 (requests, sync_peers)
863 } else if self.requests.read().is_empty() {
864 Default::default()
870 } else {
871 trace!("No new blocks can be requested, but there are still outstanding requests.");
873
874 print_requests();
875 Default::default()
876 }
877 }
878
879 pub fn count_request_completed(&self) {
884 self.metrics.count_request_completed();
885 }
886
887 pub fn set_sync_height(&self, new_height: u32) {
890 let fully_synced = {
892 let mut state = self.sync_state.write();
893 state.set_sync_height(new_height);
894 !state.can_block_sync()
895 };
896
897 if fully_synced {
898 self.metrics.mark_fully_synced();
899 }
900 }
901
902 fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
904 self.check_block_request(height)?;
906 ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
908 self.requests.write().insert(height, OutstandingRequest {
910 request: (hash, previous_hash, sync_ips),
911 timestamp: Instant::now(),
912 response: None,
913 });
914 Ok(())
915 }
916
917 fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
920 let height = block.height();
922 let mut requests = self.requests.write();
923
924 if self.ledger.contains_block_height(height) {
925 bail!("The sync request was removed because we already advanced");
926 }
927
928 let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
929
930 let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
932
933 if let Some(expected_hash) = expected_hash {
935 if block.hash() != *expected_hash {
936 bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
937 }
938 }
939 if let Some(expected_previous_hash) = expected_previous_hash {
941 if block.previous_hash() != *expected_previous_hash {
942 bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
943 }
944 }
945 if !sync_ips.contains(&peer_ip) {
947 bail!("The sync pool did not request block {height} from '{peer_ip}'")
948 }
949
950 entry.sync_ips_mut().swap_remove(&peer_ip);
952
953 if let Some(existing_block) = &entry.response {
954 if block != *existing_block {
956 bail!("Candidate block {height} from '{peer_ip}' is malformed");
957 }
958 } else {
959 entry.response = Some(block.clone());
960 }
961
962 trace!("Received a new and valid block response for height {height}");
963
964 self.response_notify.notify_one();
966
967 Ok(())
968 }
969
970 fn check_block_request(&self, height: u32) -> Result<()> {
972 if self.ledger.contains_block_height(height) {
974 bail!("Failed to add block request, as block {height} exists in the ledger");
975 }
976 if self.requests.read().contains_key(&height) {
978 bail!("Failed to add block request, as block {height} exists in the requests map");
979 }
980
981 Ok(())
982 }
983
984 pub fn remove_block_response(&self, height: u32) {
991 if let Some(e) = self.requests.write().remove(&height) {
993 trace!(
994 "Block request for height {height} was completed in {}ms (sync speed is {})",
995 e.timestamp.elapsed().as_millis(),
996 self.get_sync_speed()
997 );
998
999 self.peer_notify.notify_one();
1001 }
1002 }
1003
1004 fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
1008 trace!("Block sync is removing all block requests to peer {peer_ip}...");
1009
1010 self.requests.write().retain(|height, e| {
1013 let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
1014
1015 let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
1018 if !retain {
1019 trace!("Removed block request timestamp for {peer_ip} at height {height}");
1020 }
1021 retain
1022 });
1023
1024 }
1026
1027 pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1039 &self,
1040 _peer_pool_handler: &P,
1041 ) -> Result<Option<BlockRequestBatch<N>>> {
1042 let mut requests = self.requests.write();
1044
1045 let now = Instant::now();
1047
1048 let current_height = self.ledger.latest_block_height();
1050
1051 let mut timed_out_requests = vec![];
1053
1054 let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1056
1057 requests.retain(|height, e| {
1059 let is_obsolete = *height <= current_height;
1060 let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1062 let is_complete = e.sync_ips().is_empty();
1064
1065 let is_timeout = timer_elapsed && !is_complete;
1067
1068 let retain = !is_timeout && !is_obsolete;
1070
1071 if is_timeout {
1072 trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1073
1074 timed_out_requests.push(*height);
1076 } else if is_obsolete {
1077 trace!("Block request at height {height} became obsolete (current_height={current_height})");
1078 }
1079
1080 if is_timeout {
1082 for peer_ip in e.sync_ips().iter() {
1083 peers_to_ban.insert(*peer_ip);
1084 }
1085 }
1086
1087 retain
1088 });
1089
1090 if !timed_out_requests.is_empty() {
1091 debug!("{num} block requests timed out", num = timed_out_requests.len());
1092 }
1093
1094 let next_request_height = requests.iter().next().map(|(h, _)| *h);
1095
1096 drop(requests);
1098
1099 for peer_ip in peers_to_ban {
1101 self.remove_peer(&peer_ip);
1102 }
1105
1106 let sync_height = self.get_sync_height();
1114 let start_height = sync_height + 1;
1115
1116 let end_height = if let Some(next_height) = next_request_height
1117 && next_height > start_height
1118 {
1119 next_height
1121 } else {
1122 return Ok(None);
1125 };
1126
1127 let max_new_blocks_to_request = end_height - start_height;
1129
1130 let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1131 bail!("Cannot re-request blocks because no or not enough peers are connected");
1133 };
1134
1135 let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1137 bail!("Cannot re-request blocks because no or not enough peers are connected");
1139 };
1140
1141 let requests = self.construct_requests(
1143 &sync_peers,
1144 sync_height,
1145 min_common_ancestor,
1146 max_new_blocks_to_request,
1147 greatest_peer_height,
1148 );
1149
1150 if let Some((height, _)) = requests.as_slice().first() {
1153 debug!("Re-requesting blocks starting at height {height}");
1154 Ok(Some((requests, sync_peers)))
1155 } else {
1156 Ok(None)
1158 }
1159 }
1160
1161 fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1169 let latest_ledger_height = self.ledger.latest_block_height();
1171
1172 let candidate_locators: IndexMap<_, _> = self
1175 .locators
1176 .read()
1177 .iter()
1178 .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1179 .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1180 .take(NUM_SYNC_CANDIDATE_PEERS)
1181 .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1182 .collect();
1183
1184 if candidate_locators.is_empty() {
1186 trace!("Found no sync peers with height greater {current_height}");
1187 return None;
1188 }
1189
1190 let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1197
1198 for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1201 let mut min_common_ancestor = peer_locators.latest_locator_height();
1203
1204 let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1207
1208 for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1210 if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1212 if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1214 min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1216
1217 sync_peers.push((*other_ip, other_locators.clone()));
1219 }
1220 }
1221 }
1222
1223 if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1225 sync_peers.shuffle(&mut rand::thread_rng());
1228
1229 return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1231 }
1232 }
1233
1234 None
1236 }
1237
1238 fn construct_requests(
1240 &self,
1241 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1242 sync_height: u32,
1243 min_common_ancestor: u32,
1244 max_blocks_to_request: u32,
1245 greatest_peer_height: u32,
1246 ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1247 let start_height = {
1249 let requests = self.requests.read();
1250 let ledger_height = self.ledger.latest_block_height();
1251
1252 let mut start_height = ledger_height.max(sync_height + 1);
1254
1255 while requests.contains_key(&start_height) {
1257 start_height += 1;
1258 }
1259
1260 start_height
1261 };
1262
1263 if min_common_ancestor < start_height {
1265 if start_height < greatest_peer_height {
1266 trace!(
1267 "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1268 );
1269 }
1270 return Default::default();
1271 }
1272
1273 let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1275
1276 let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1278 let mut max_num_sync_ips = 1;
1280
1281 for height in start_height..end_height {
1282 if let Err(err) = self.check_block_request(height) {
1284 trace!("{err}");
1285
1286 match request_hashes.is_empty() {
1289 true => continue,
1290 false => break,
1291 }
1292 }
1293
1294 let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1296
1297 if !is_honest {
1299 warn!("Detected dishonest peer(s) when preparing block request");
1301 if sync_peers.len() < num_sync_ips {
1303 break;
1304 }
1305 }
1306
1307 max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1309
1310 request_hashes.insert(height, (hash, previous_hash));
1312 }
1313
1314 request_hashes
1316 .into_iter()
1317 .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1318 .collect()
1319 }
1320}
1321
1322fn construct_request<N: Network>(
1325 height: u32,
1326 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1327) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1328 let mut hash = None;
1329 let mut hash_redundancy: usize = 0;
1330 let mut previous_hash = None;
1331 let mut is_honest = true;
1332
1333 for peer_locators in sync_peers.values() {
1334 if let Some(candidate_hash) = peer_locators.get_hash(height) {
1335 match hash {
1336 Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1338 Some(_) => {
1340 hash = None;
1341 hash_redundancy = 0;
1342 previous_hash = None;
1343 is_honest = false;
1344 break;
1345 }
1346 None => {
1348 hash = Some(candidate_hash);
1349 hash_redundancy = 1;
1350 }
1351 }
1352 }
1353 if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1354 match previous_hash {
1355 Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1357 Some(_) => {
1359 hash = None;
1360 hash_redundancy = 0;
1361 previous_hash = None;
1362 is_honest = false;
1363 break;
1364 }
1365 None => previous_hash = Some(candidate_previous_hash),
1367 }
1368 }
1369 }
1370
1371 let num_sync_ips = {
1374 if !is_honest {
1376 EXTRA_REDUNDANCY_FACTOR
1378 }
1379 else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1381 1
1383 }
1384 else {
1386 REDUNDANCY_FACTOR
1388 }
1389 };
1390
1391 (hash, previous_hash, num_sync_ips, is_honest)
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396 use super::*;
1397 use crate::locators::{
1398 CHECKPOINT_INTERVAL,
1399 NUM_RECENT_BLOCKS,
1400 test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1401 };
1402
1403 use snarkos_node_bft_ledger_service::MockLedgerService;
1404 use snarkos_node_network::{NodeType, Peer, Resolver};
1405 use snarkos_node_tcp::{P2P, Tcp};
1406 use snarkvm::{
1407 ledger::committee::Committee,
1408 prelude::{Field, TestRng},
1409 };
1410
1411 use indexmap::{IndexSet, indexset};
1412 #[cfg(feature = "locktick")]
1413 use locktick::parking_lot::RwLock;
1414 #[cfg(not(feature = "locktick"))]
1415 use parking_lot::RwLock;
1416 use rand::Rng;
1417 use std::net::{IpAddr, Ipv4Addr};
1418
1419 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1420
1421 #[derive(Default)]
1422 struct DummyPeerPoolHandler {
1423 peers_to_ban: RwLock<Vec<SocketAddr>>,
1424 }
1425
1426 impl P2P for DummyPeerPoolHandler {
1427 fn tcp(&self) -> &Tcp {
1428 unreachable!();
1429 }
1430 }
1431
1432 impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1433 const MAXIMUM_POOL_SIZE: usize = 10;
1434 const OWNER: &str = "[DummyPeerPoolHandler]";
1435 const PEER_SLASHING_COUNT: usize = 0;
1436
1437 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1438 unreachable!();
1439 }
1440
1441 fn resolver(&self) -> &RwLock<Resolver<N>> {
1442 unreachable!();
1443 }
1444
1445 fn is_dev(&self) -> bool {
1446 true
1447 }
1448
1449 fn trusted_peers_only(&self) -> bool {
1450 false
1451 }
1452
1453 fn node_type(&self) -> NodeType {
1454 NodeType::Client
1455 }
1456
1457 fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1458 self.peers_to_ban.write().push(listener_addr);
1459 }
1460 }
1461
1462 fn sample_peer_ip(id: u16) -> SocketAddr {
1464 assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1465 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1466 }
1467
1468 fn sample_committee() -> Committee<CurrentNetwork> {
1470 let rng = &mut TestRng::default();
1471 snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1472 }
1473
1474 fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1476 MockLedgerService::new_at_height(sample_committee(), height)
1477 }
1478
1479 fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1481 BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1482 }
1483
1484 fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1488 assert!(num_values > 0, "Cannot generate an empty vector");
1489 assert!((max_height as usize) >= num_values);
1490
1491 let mut rng = TestRng::default();
1492
1493 let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1494
1495 heights.push(max_height);
1496
1497 heights
1498 }
1499
1500 fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1502 BlockSync::<CurrentNetwork> {
1503 peer_notify: Notify::new(),
1504 response_notify: Default::default(),
1505 ledger: Arc::new(sample_ledger_service(height)),
1506 locators: RwLock::new(sync.locators.read().clone()),
1507 common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1508 requests: RwLock::new(sync.requests.read().clone()),
1509 sync_state: RwLock::new(sync.sync_state.read().clone()),
1510 advance_with_sync_blocks_lock: Default::default(),
1511 metrics: Default::default(),
1512 }
1513 }
1514
1515 fn check_prepare_block_requests(
1517 sync: BlockSync<CurrentNetwork>,
1518 min_common_ancestor: u32,
1519 peers: IndexSet<SocketAddr>,
1520 ) {
1521 let rng = &mut TestRng::default();
1522
1523 assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1525
1526 let num_peers_within_recent_range_of_ledger = {
1528 if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1530 0
1531 }
1532 else {
1534 peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1535 }
1536 };
1537
1538 let (requests, sync_peers) = sync.prepare_block_requests();
1540
1541 if peers.is_empty() {
1543 assert!(requests.is_empty());
1544 return;
1545 }
1546
1547 let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1549 assert_eq!(requests.len(), expected_num_requests);
1550
1551 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1552 let sync_ips: IndexSet<_> =
1554 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1555 assert_eq!(height, 1 + idx as u32);
1556 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1557 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1558
1559 if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1560 assert_eq!(sync_ips.len(), 1);
1561 } else {
1562 assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1563 assert_eq!(sync_ips, peers);
1564 }
1565 }
1566 }
1567
1568 #[test]
1570 fn test_latest_block_height() {
1571 for height in generate_block_heights(100_001, 5000) {
1572 let sync = sample_sync_at_height(height);
1573 assert_eq!(sync.ledger.latest_block_height(), height);
1575
1576 assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1578 assert_eq!(
1579 sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1580 height
1581 );
1582 }
1583 }
1584
1585 #[test]
1586 fn test_get_block_hash() {
1587 for height in generate_block_heights(100_001, 5000) {
1588 let sync = sample_sync_at_height(height);
1589
1590 assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1592 assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1593 }
1594 }
1595
1596 #[test]
1597 fn test_prepare_block_requests() {
1598 for num_peers in 0..111 {
1599 println!("Testing with {num_peers} peers");
1600
1601 let sync = sample_sync_at_height(0);
1602
1603 let mut peers = indexset![];
1604
1605 for peer_id in 1..=num_peers {
1606 sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1608 peers.insert(sample_peer_ip(peer_id));
1610 }
1611
1612 check_prepare_block_requests(sync, 10, peers);
1614 }
1615 }
1616
1617 #[test]
1618 fn test_prepare_block_requests_with_leading_fork_at_11() {
1619 let sync = sample_sync_at_height(0);
1620
1621 let peer_1 = sample_peer_ip(1);
1632 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1633
1634 let peer_2 = sample_peer_ip(2);
1636 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1637
1638 let peer_3 = sample_peer_ip(3);
1640 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1641
1642 let (requests, _) = sync.prepare_block_requests();
1644 assert_eq!(requests.len(), 10);
1645
1646 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1648 assert_eq!(height, 1 + idx as u32);
1649 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1650 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1651 assert_eq!(num_sync_ips, 1); }
1653 }
1654
1655 #[test]
1656 fn test_prepare_block_requests_with_leading_fork_at_10() {
1657 let rng = &mut TestRng::default();
1658 let sync = sample_sync_at_height(0);
1659
1660 let peer_1 = sample_peer_ip(1);
1675 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1676
1677 let peer_2 = sample_peer_ip(2);
1679 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1680
1681 let peer_3 = sample_peer_ip(3);
1683 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1684
1685 let (requests, _) = sync.prepare_block_requests();
1687 assert_eq!(requests.len(), 0);
1688
1689 let peer_4 = sample_peer_ip(4);
1693 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1694
1695 let (requests, sync_peers) = sync.prepare_block_requests();
1697 assert_eq!(requests.len(), 10);
1698
1699 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1701 let sync_ips: IndexSet<_> =
1703 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1704 assert_eq!(height, 1 + idx as u32);
1705 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1706 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1707 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_1); }
1710 }
1711
1712 #[test]
1713 fn test_prepare_block_requests_with_trailing_fork_at_9() {
1714 let rng = &mut TestRng::default();
1715 let sync = sample_sync_at_height(0);
1716
1717 let peer_1 = sample_peer_ip(1);
1723 sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1724
1725 let peer_2 = sample_peer_ip(2);
1727 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1728
1729 let peer_3 = sample_peer_ip(3);
1731 sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1732
1733 let (requests, _) = sync.prepare_block_requests();
1735 assert_eq!(requests.len(), 0);
1736
1737 let peer_4 = sample_peer_ip(4);
1741 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1742
1743 let (requests, sync_peers) = sync.prepare_block_requests();
1745 assert_eq!(requests.len(), 10);
1746
1747 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1749 let sync_ips: IndexSet<_> =
1751 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1752 assert_eq!(height, 1 + idx as u32);
1753 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1754 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1755 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_3); }
1758 }
1759
1760 #[test]
1761 fn test_insert_block_requests() {
1762 let rng = &mut TestRng::default();
1763 let sync = sample_sync_at_height(0);
1764
1765 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1767
1768 let (requests, sync_peers) = sync.prepare_block_requests();
1770 assert_eq!(requests.len(), 10);
1771
1772 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1773 let sync_ips: IndexSet<_> =
1775 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1776 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1778 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1780 assert!(sync.get_block_request_timestamp(height).is_some());
1781 }
1782
1783 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1784 let sync_ips: IndexSet<_> =
1786 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1787 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1789 assert!(sync.get_block_request_timestamp(height).is_some());
1790 }
1791
1792 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1793 let sync_ips: IndexSet<_> =
1795 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1796 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1798 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1800 assert!(sync.get_block_request_timestamp(height).is_some());
1801 }
1802 }
1803
1804 #[test]
1805 fn test_insert_block_requests_fails() {
1806 let sync = sample_sync_at_height(9);
1807
1808 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1810
1811 sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1813 sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1815 }
1816
1817 #[test]
1818 fn test_update_peer_locators() {
1819 let sync = sample_sync_at_height(0);
1820
1821 let peer1_ip = sample_peer_ip(1);
1823 for peer1_height in 0..500u32 {
1824 sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1825 assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1826
1827 let peer2_ip = sample_peer_ip(2);
1828 for peer2_height in 0..500u32 {
1829 println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1830
1831 sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1832 assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1833
1834 let distance = peer1_height.abs_diff(peer2_height);
1836
1837 if distance < NUM_RECENT_BLOCKS as u32 {
1839 let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1840 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1841 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1842 } else {
1843 let min_checkpoints =
1844 core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1845 let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1846 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1847 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1848 }
1849 }
1850 }
1851 }
1852
1853 #[test]
1854 fn test_remove_peer() {
1855 let sync = sample_sync_at_height(0);
1856
1857 let peer_ip = sample_peer_ip(1);
1858 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1859 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1860
1861 sync.remove_peer(&peer_ip);
1862 assert_eq!(sync.get_peer_height(&peer_ip), None);
1863
1864 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1865 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1866
1867 sync.remove_peer(&peer_ip);
1868 assert_eq!(sync.get_peer_height(&peer_ip), None);
1869 }
1870
1871 #[test]
1872 fn test_locators_insert_remove_insert() {
1873 let sync = sample_sync_at_height(0);
1874
1875 let peer_ip = sample_peer_ip(1);
1876 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1877 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1878
1879 sync.remove_peer(&peer_ip);
1880 assert_eq!(sync.get_peer_height(&peer_ip), None);
1881
1882 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1883 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1884 }
1885
1886 #[test]
1887 fn test_requests_insert_remove_insert() {
1888 let rng = &mut TestRng::default();
1889 let sync = sample_sync_at_height(0);
1890
1891 let peer_ip = sample_peer_ip(1);
1893 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1894
1895 let (requests, sync_peers) = sync.prepare_block_requests();
1897 assert_eq!(requests.len(), 10);
1898
1899 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1900 let sync_ips: IndexSet<_> =
1902 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1903 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1905 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1907 assert!(sync.get_block_request_timestamp(height).is_some());
1908 }
1909
1910 sync.remove_peer(&peer_ip);
1912
1913 for (height, _) in requests {
1914 assert_eq!(sync.get_block_request(height), None);
1916 assert!(sync.get_block_request_timestamp(height).is_none());
1917 }
1918
1919 let (requests, _) = sync.prepare_block_requests();
1921 assert_eq!(requests.len(), 0);
1922
1923 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1925
1926 let (requests, _) = sync.prepare_block_requests();
1928 assert_eq!(requests.len(), 10);
1929
1930 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1931 let sync_ips: IndexSet<_> =
1933 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1934 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1936 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1938 assert!(sync.get_block_request_timestamp(height).is_some());
1939 }
1940 }
1941
1942 #[test]
1943 fn test_obsolete_block_requests() {
1944 let rng = &mut TestRng::default();
1945 let sync = sample_sync_at_height(0);
1946
1947 let locator_height = rng.gen_range(0..50);
1948
1949 let locators = sample_block_locators(locator_height);
1951 sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1952
1953 let (requests, sync_peers) = sync.prepare_block_requests();
1955 assert_eq!(requests.len(), locator_height as usize);
1956
1957 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1959 let sync_ips: IndexSet<_> =
1961 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1962 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1964 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1966 assert!(sync.get_block_request_timestamp(height).is_some());
1967 }
1968
1969 let ledger_height = rng.gen_range(0..=locator_height);
1973 let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1974
1975 assert_eq!(new_sync.requests.read().len(), requests.len());
1977
1978 let c = DummyPeerPoolHandler::default();
1980 new_sync.handle_block_request_timeouts(&c).unwrap();
1981
1982 assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1984 }
1985
1986 #[test]
1987 fn test_timed_out_block_request() {
1988 let sync = sample_sync_at_height(0);
1989 let peer_ip = sample_peer_ip(1);
1990 let locators = sample_block_locators(10);
1991 let block_hash = locators.get_hash(1);
1992
1993 sync.update_peer_locators(peer_ip, &locators).unwrap();
1994
1995 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1996
1997 sync.requests.write().insert(1, OutstandingRequest {
1999 request: (block_hash, None, [peer_ip].into()),
2000 timestamp,
2001 response: None,
2002 });
2003
2004 assert_eq!(sync.requests.read().len(), 1);
2005 assert_eq!(sync.locators.read().len(), 1);
2006
2007 let c = DummyPeerPoolHandler::default();
2009 sync.handle_block_request_timeouts(&c).unwrap();
2010
2011 assert!(sync.requests.read().is_empty());
2016 assert!(sync.locators.read().is_empty());
2017 }
2018
2019 #[test]
2020 fn test_reissue_timed_out_block_request() {
2021 let sync = sample_sync_at_height(0);
2022 let peer_ip1 = sample_peer_ip(1);
2023 let peer_ip2 = sample_peer_ip(2);
2024 let peer_ip3 = sample_peer_ip(3);
2025
2026 let locators = sample_block_locators(10);
2027 let block_hash1 = locators.get_hash(1);
2028 let block_hash2 = locators.get_hash(2);
2029
2030 sync.update_peer_locators(peer_ip1, &locators).unwrap();
2031 sync.update_peer_locators(peer_ip2, &locators).unwrap();
2032 sync.update_peer_locators(peer_ip3, &locators).unwrap();
2033
2034 assert_eq!(sync.locators.read().len(), 3);
2035
2036 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2037
2038 sync.requests.write().insert(1, OutstandingRequest {
2040 request: (block_hash1, None, [peer_ip1].into()),
2041 timestamp,
2042 response: None,
2043 });
2044
2045 sync.requests.write().insert(2, OutstandingRequest {
2047 request: (block_hash2, None, [peer_ip2].into()),
2048 timestamp: Instant::now(),
2049 response: None,
2050 });
2051
2052 assert_eq!(sync.requests.read().len(), 2);
2053
2054 let c = DummyPeerPoolHandler::default();
2056
2057 let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2058
2059 assert_eq!(sync.requests.read().len(), 1);
2064 assert_eq!(sync.locators.read().len(), 2);
2065
2066 let (new_requests, new_sync_ips) = re_requests.unwrap();
2067 assert_eq!(new_requests.len(), 1);
2068
2069 let (height, (hash, _, _)) = new_requests.first().unwrap();
2070 assert_eq!(*height, 1);
2071 assert_eq!(*hash, block_hash1);
2072 assert_eq!(new_sync_ips.len(), 2);
2073
2074 let mut iter = new_sync_ips.iter();
2076 assert_ne!(iter.next().unwrap().0, &peer_ip1);
2077 assert_ne!(iter.next().unwrap().0, &peer_ip1);
2078 }
2079}