1use crate::{
17 helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18 locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_network::PeerPoolHandling;
22use snarkos_node_router::messages::DataBlocks;
23use snarkos_node_sync_communication_service::CommunicationService;
24use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
25
26use snarkvm::{
27 console::network::{ConsensusVersion, Network},
28 prelude::block::Block,
29 utilities::flatten_error,
30};
31
32use anyhow::{Result, bail, ensure};
33use indexmap::{IndexMap, IndexSet};
34use itertools::Itertools;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(feature = "locktick")]
38use locktick::tokio::Mutex as TMutex;
39#[cfg(not(feature = "locktick"))]
40use parking_lot::RwLock;
41use rand::seq::{IteratorRandom, SliceRandom};
42use std::{
43 collections::{BTreeMap, HashMap, HashSet, hash_map},
44 net::{IpAddr, Ipv4Addr, SocketAddr},
45 sync::Arc,
46 time::{Duration, Instant},
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::sync::Notify;
51
52mod helpers;
53use helpers::rangify_heights;
54
55mod sync_state;
56use sync_state::SyncState;
57
58mod metrics;
59use metrics::BlockSyncMetrics;
60
61#[cfg(not(test))]
65pub const REDUNDANCY_FACTOR: usize = 1;
66#[cfg(test)]
67pub const REDUNDANCY_FACTOR: usize = 3;
68
69pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
76
77const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
78const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
79
80const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
81
82const MAX_BLOCK_REQUESTS: usize = 50; pub const MAX_BLOCKS_BEHIND: u32 = 1; pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
92
93#[derive(Clone)]
96struct OutstandingRequest<N: Network> {
97 request: SyncRequest<N>,
98 timestamp: Instant,
99 response: Option<Block<N>>,
102}
103
104#[derive(Clone, serde::Serialize)]
106pub struct BlockRequestInfo {
107 elapsed: u64,
109 done: bool,
111}
112
113#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestsSummary {
116 outstanding: String,
117 completed: String,
118}
119
120#[derive(thiserror::Error, Debug)]
121pub enum InsertBlockResponseError<N: Network> {
122 #[error("Empty block response")]
123 EmptyBlockResponse,
124 #[error("The peer did not send a consensus version")]
125 NoConsensusVersion,
126 #[error(
127 "The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
128 )]
129 ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
130 #[error("Block Sync already advanced to block {height}")]
131 BlockSyncAlreadyAdvanced { height: u32 },
132 #[error("No such request for height {height}")]
133 NoSuchRequest { height: u32 },
134 #[error("Invalid block hash for height {height} from '{peer_ip}'")]
135 InvalidBlockHash { height: u32, peer_ip: SocketAddr },
136 #[error(
137 "The previous block hash in candidate block {height} from '{peer_ip}' is incorrect: expected {expected}, but got {actual}"
138 )]
139 InvalidPreviousBlockHash { height: u32, peer_ip: SocketAddr, expected: N::BlockHash, actual: N::BlockHash },
140 #[error("Candidate block {height} from '{peer_ip}' is malformed")]
141 MalformedBlock { height: u32, peer_ip: SocketAddr },
142 #[error("The sync pool did not request block {height} from '{peer_ip}'")]
143 WrongSyncPeer { height: u32, peer_ip: SocketAddr },
144 #[error("{}", flatten_error(.0))]
145 Other(#[from] anyhow::Error),
146}
147
148impl<N: Network> InsertBlockResponseError<N> {
149 pub fn is_benign(&self) -> bool {
151 matches!(self, Self::NoSuchRequest { .. } | Self::BlockSyncAlreadyAdvanced { .. })
152 }
153
154 pub fn is_invalid_consensus_version(&self) -> bool {
156 matches!(self, Self::ConsensusVersionMismatch { .. } | Self::NoConsensusVersion)
157 }
158}
159
160impl<N: Network> OutstandingRequest<N> {
161 fn sync_ips(&self) -> &IndexSet<SocketAddr> {
163 let (_, _, sync_ips) = &self.request;
164 sync_ips
165 }
166
167 fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
169 let (_, _, sync_ips) = &mut self.request;
170 sync_ips
171 }
172}
173
174pub struct BlockSync<N: Network> {
187 ledger: Arc<dyn LedgerService<N>>,
189
190 locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
193
194 common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
199
200 requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
202
203 sync_state: RwLock<SyncState>,
207
208 advance_with_sync_blocks_lock: TMutex<()>,
210
211 peer_notify: Notify,
213
214 response_notify: Notify,
216
217 metrics: BlockSyncMetrics,
219}
220
221impl<N: Network> BlockSync<N> {
222 pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
224 let sync_state = SyncState::new_with_height(ledger.latest_block_height());
226
227 Self {
228 ledger,
229 sync_state: RwLock::new(sync_state),
230 peer_notify: Default::default(),
231 response_notify: Default::default(),
232 locators: Default::default(),
233 requests: Default::default(),
234 common_ancestors: Default::default(),
235 advance_with_sync_blocks_lock: Default::default(),
236 metrics: Default::default(),
237 }
238 }
239
240 pub async fn wait_for_peer_update(&self) {
245 self.peer_notify.notified().await
246 }
247
248 pub async fn wait_for_block_responses(&self) {
252 self.response_notify.notified().await
253 }
254
255 #[inline]
257 pub fn is_block_synced(&self) -> bool {
258 self.sync_state.read().is_block_synced()
259 }
260
261 #[inline]
267 pub fn can_block_sync(&self) -> bool {
268 self.sync_state.read().can_block_sync() || self.has_pending_responses()
269 }
270
271 #[inline]
274 pub fn num_blocks_behind(&self) -> Option<u32> {
275 self.sync_state.read().num_blocks_behind()
276 }
277
278 #[inline]
280 pub fn greatest_peer_block_height(&self) -> Option<u32> {
281 self.sync_state.read().get_greatest_peer_height()
282 }
283
284 #[inline]
287 pub fn get_sync_height(&self) -> u32 {
288 self.sync_state.read().get_sync_height()
289 }
290
291 #[inline]
293 pub fn num_outstanding_block_requests(&self) -> usize {
294 self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
295 }
296
297 #[inline]
299 pub fn num_total_block_requests(&self) -> usize {
300 self.requests.read().len()
301 }
302
303 pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
305 self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
306 }
307
308 pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
310 self.requests
311 .read()
312 .iter()
313 .map(|(height, request)| {
314 (*height, BlockRequestInfo {
315 done: request.sync_ips().is_empty(),
316 elapsed: request.timestamp.elapsed().as_secs(),
317 })
318 })
319 .collect()
320 }
321
322 pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
324 let completed = self
325 .requests
326 .read()
327 .iter()
328 .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
329 .collect::<Vec<_>>();
330
331 let outstanding = self
332 .requests
333 .read()
334 .iter()
335 .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
336 .collect::<Vec<_>>();
337
338 BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
339 }
340
341 pub fn get_sync_speed(&self) -> f64 {
342 self.metrics.get_sync_speed()
343 }
344}
345
346#[cfg(test)]
348impl<N: Network> BlockSync<N> {
349 fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
351 self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
352 }
353
354 fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
356 self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
357 }
358
359 fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
361 self.requests.read().get(&height).map(|e| e.request.clone())
362 }
363
364 fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
366 self.requests.read().get(&height).map(|e| e.timestamp)
367 }
368}
369
370impl<N: Network> BlockSync<N> {
371 #[inline]
373 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
374 let latest_height = self.ledger.latest_block_height();
376
377 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
380 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
382 recents.insert(height, self.ledger.get_block_hash(height)?);
383 }
384
385 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
387 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
389 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
390 }
391
392 BlockLocators::new(recents, checkpoints)
394 }
395
396 pub fn has_pending_responses(&self) -> bool {
398 self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
399 }
400
401 pub async fn send_block_requests<C: CommunicationService>(
403 &self,
404 communication: &C,
405 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
406 requests: &[(u32, PrepareSyncRequest<N>)],
407 ) -> bool {
408 let (start_height, max_num_sync_ips) = match requests.first() {
409 Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
410 None => {
411 warn!("Block sync failed - no block requests");
412 return false;
413 }
414 };
415
416 debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
417
418 let sync_ips: IndexSet<_> =
420 sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
421
422 let end_height = start_height.saturating_add(requests.len() as u32);
424
425 for (height, (hash, previous_hash, _)) in requests.iter() {
427 if let Err(err) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
429 let err = err.context(format!("Failed to insert block request for height {height}"));
430 warn!("{}", flatten_error(&err));
431 return false;
432 }
433 }
434
435 let message = C::prepare_block_request(start_height, end_height);
439
440 let mut tasks = Vec::with_capacity(sync_ips.len());
442 for sync_ip in sync_ips {
443 let sender = communication.send(sync_ip, message.clone()).await;
444 let task = tokio::spawn(async move {
445 match sender {
447 Some(sender) => {
448 if let Err(err) = sender.await {
449 warn!("Failed to send block request to peer '{sync_ip}': {err}");
450 false
451 } else {
452 true
453 }
454 }
455 None => {
456 warn!("Failed to send block request to peer '{sync_ip}': no such peer");
457 false
458 }
459 }
460 });
461
462 tasks.push(task);
463 }
464
465 for result in futures::future::join_all(tasks).await {
467 let success = match result {
468 Ok(success) => success,
469 Err(err) => {
470 error!("tokio join error: {err}");
471 false
472 }
473 };
474
475 if !success {
477 let mut requests = self.requests.write();
479 for height in start_height..end_height {
480 requests.remove(&height);
481 }
482 return false;
484 }
485 }
486 true
487 }
488
489 #[inline]
497 pub fn insert_block_responses(
498 &self,
499 peer_ip: SocketAddr,
500 blocks: Vec<Block<N>>,
501 latest_consensus_version: Option<ConsensusVersion>,
502 ) -> Result<(), InsertBlockResponseError<N>> {
503 let result = 'outer: {
505 let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
506 break 'outer Err(InsertBlockResponseError::EmptyBlockResponse);
507 };
508
509 let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
510
511 if expected_consensus_version >= ConsensusVersion::V12 {
514 if let Some(peer_version) = latest_consensus_version {
515 if peer_version != expected_consensus_version {
516 break 'outer Err(InsertBlockResponseError::ConsensusVersionMismatch {
517 peer_version,
518 expected_version: expected_consensus_version,
519 last_height,
520 });
521 }
522 } else {
523 break 'outer Err(InsertBlockResponseError::NoConsensusVersion);
524 }
525 }
526
527 for block in blocks {
529 self.insert_block_response(peer_ip, block)?;
530 }
531
532 Ok(())
533 };
534
535 if result.is_err() {
537 self.remove_block_requests_to_peer(&peer_ip);
538 }
539
540 result
542 }
543
544 #[inline]
547 pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
548 if let Some(entry) = self.requests.read().get(&next_height) {
551 let is_complete = entry.sync_ips().is_empty();
552 if !is_complete {
553 return None;
554 }
555
556 if entry.response.is_none() {
558 warn!("Request for height {next_height} is complete but no response exists");
559 }
560 entry.response.clone()
561 } else {
562 None
563 }
564 }
565
566 #[inline]
575 pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
576 let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
583 trace!("Skipping attempt to advance block synchronziation as it is already in progress");
584 return Ok(false);
585 };
586
587 let mut current_height = self.ledger.latest_block_height();
589 let start_height = current_height;
590 trace!(
591 "Try advancing with block responses (at block {current_height}, current sync speed is {})",
592 self.get_sync_speed()
593 );
594
595 loop {
596 let next_height = current_height + 1;
597
598 let Some(block) = self.peek_next_block(next_height) else {
599 break;
600 };
601
602 if block.height() != next_height {
604 warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
605 break;
606 }
607
608 let ledger = self.ledger.clone();
609 let advanced = tokio::task::spawn_blocking(move || {
610 match ledger.check_next_block(&block) {
612 Ok(_) => match ledger.advance_to_next_block(&block) {
613 Ok(_) => true,
614 Err(err) => {
615 let err = err.context(format!(
616 "Failed to advance to next block (height: {}, hash: '{}')",
617 block.height(),
618 block.hash()
619 ));
620 warn!("{}", flatten_error(&err));
621 false
622 }
623 },
624 Err(err) => {
625 let err = err.context(format!(
626 "The next block (height: {}, hash: '{}') is invalid",
627 block.height(),
628 block.hash()
629 ));
630 warn!("{}", flatten_error(&err));
631 false
632 }
633 }
634 })
635 .await?;
636
637 if advanced {
639 self.count_request_completed();
640 }
641
642 self.remove_block_response(next_height);
644
645 if !advanced {
647 break;
648 }
649
650 current_height = next_height;
652 }
653
654 if current_height > start_height {
655 self.set_sync_height(current_height);
656 Ok(true)
657 } else {
658 Ok(false)
659 }
660 }
661}
662
663impl<N: Network> BlockSync<N> {
664 pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
671 let current_height = self.get_sync_height();
673
674 if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
675 let sync_peers =
677 sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
678 Some((sync_peers, min_common_ancestor))
680 } else {
681 None
682 }
683 }
684
685 pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
693 match self.locators.write().entry(peer_ip) {
696 hash_map::Entry::Occupied(mut e) => {
697 if e.get() == locators {
699 return Ok(());
700 }
701
702 let old_height = e.get().latest_locator_height();
703 let new_height = locators.latest_locator_height();
704
705 if old_height > new_height {
706 debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
707 }
708 e.insert(locators.clone());
709 }
710 hash_map::Entry::Vacant(e) => {
711 e.insert(locators.clone());
712 }
713 }
714
715 let new_local_ancestor = {
717 let mut ancestor = 0;
718 for (height, hash) in locators.clone().into_iter() {
722 if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
723 match ledger_hash == hash {
724 true => ancestor = height,
725 false => {
726 warn!("Detected fork between this node and peer \"{peer_ip}\" at height {height}");
727 break;
728 }
729 }
730 }
731 }
732 ancestor
733 };
734
735 let ancestor_updates: Vec<_> = self
738 .locators
739 .read()
740 .iter()
741 .filter_map(|(other_ip, other_locators)| {
742 if other_ip == &peer_ip {
744 return None;
745 }
746 let mut ancestor = 0;
748 for (height, hash) in other_locators.clone().into_iter() {
749 if let Some(expected_hash) = locators.get_hash(height) {
750 match expected_hash == hash {
751 true => ancestor = height,
752 false => {
753 debug!(
754 "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
755 );
756 break;
757 }
758 }
759 }
760 }
761
762 Some((PeerPair(peer_ip, *other_ip), ancestor))
763 })
764 .collect();
765
766 {
769 let mut common_ancestors = self.common_ancestors.write();
770 common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
771
772 for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
773 common_ancestors.insert(peer_pair, new_ancestor);
774 }
775 }
776
777 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
779 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
780 } else {
781 error!("Got new block locators but greatest peer height is zero.");
782 }
783 self.peer_notify.notify_one();
786
787 Ok(())
788 }
789
790 pub fn remove_peer(&self, peer_ip: &SocketAddr) {
794 trace!("Removing peer {peer_ip} from block sync");
795
796 self.locators.write().remove(peer_ip);
798 self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
800 self.remove_block_requests_to_peer(peer_ip);
802
803 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
805 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
806 } else {
807 self.sync_state.write().clear_greatest_peer_height();
809 }
810
811 self.peer_notify.notify_one();
813 }
814}
815
816pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
818
819impl<N: Network> BlockSync<N> {
820 pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
834 let print_requests = || {
836 if tracing::enabled!(tracing::Level::TRACE) {
837 let summary = self.get_block_requests_summary();
838
839 trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
840 trace!("The following requests are still outstanding: {:?}", summary.outstanding);
841 }
842 };
843
844 let current_height = self.get_sync_height();
846
847 let max_outstanding_block_requests =
849 (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
850
851 let max_total_requests = 4 * max_outstanding_block_requests;
853
854 let max_new_blocks_to_request =
855 max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
856
857 if self.num_total_block_requests() >= max_total_requests as usize {
859 trace!(
860 "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
861 );
862
863 print_requests();
864 Default::default()
865 } else if max_new_blocks_to_request == 0 {
866 trace!(
867 "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
868 );
869
870 print_requests();
871 Default::default()
872 } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
873 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
876
877 let requests = self.construct_requests(
879 &sync_peers,
880 current_height,
881 min_common_ancestor,
882 max_new_blocks_to_request,
883 greatest_peer_height,
884 );
885
886 (requests, sync_peers)
887 } else if self.requests.read().is_empty() {
888 Default::default()
894 } else {
895 trace!("No new blocks can be requested, but there are still outstanding requests.");
897
898 print_requests();
899 Default::default()
900 }
901 }
902
903 pub fn count_request_completed(&self) {
908 self.metrics.count_request_completed();
909 }
910
911 pub fn set_sync_height(&self, new_height: u32) {
914 let fully_synced = {
916 let mut state = self.sync_state.write();
917 state.set_sync_height(new_height);
918 !state.can_block_sync()
919 };
920
921 if fully_synced {
922 self.metrics.mark_fully_synced();
923 }
924 }
925
926 fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
928 self.check_block_request(height)?;
930 ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
932 self.requests.write().insert(height, OutstandingRequest {
934 request: (hash, previous_hash, sync_ips),
935 timestamp: Instant::now(),
936 response: None,
937 });
938 Ok(())
939 }
940
941 fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<(), InsertBlockResponseError<N>> {
944 let height = block.height();
946 let mut requests = self.requests.write();
947
948 if self.ledger.contains_block_height(height) {
949 return Err(InsertBlockResponseError::BlockSyncAlreadyAdvanced { height });
950 }
951
952 let Some(entry) = requests.get_mut(&height) else {
953 return Err(InsertBlockResponseError::NoSuchRequest { height });
954 };
955
956 let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
958
959 if let Some(expected_hash) = expected_hash
961 && block.hash() != *expected_hash
962 {
963 return Err(InsertBlockResponseError::InvalidBlockHash { height, peer_ip });
964 }
965 if let Some(expected_previous_hash) = expected_previous_hash
967 && block.previous_hash() != *expected_previous_hash
968 {
969 return Err(InsertBlockResponseError::InvalidPreviousBlockHash {
970 height,
971 peer_ip,
972 expected: *expected_previous_hash,
973 actual: block.previous_hash(),
974 });
975 }
976 if !sync_ips.contains(&peer_ip) {
978 return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip });
979 }
980
981 entry.sync_ips_mut().swap_remove(&peer_ip);
983
984 if let Some(existing_block) = &entry.response {
985 if block != *existing_block {
987 return Err(InsertBlockResponseError::MalformedBlock { height, peer_ip });
988 }
989 } else {
990 entry.response = Some(block.clone());
991 }
992
993 trace!("Received a new and valid block response for height {height}");
994
995 self.response_notify.notify_one();
997
998 Ok(())
999 }
1000
1001 fn check_block_request(&self, height: u32) -> Result<()> {
1003 if self.ledger.contains_block_height(height) {
1005 bail!("Failed to add block request, as block {height} exists in the ledger");
1006 }
1007 if self.requests.read().contains_key(&height) {
1009 bail!("Failed to add block request, as block {height} exists in the requests map");
1010 }
1011
1012 Ok(())
1013 }
1014
1015 pub fn remove_block_response(&self, height: u32) {
1022 if let Some(e) = self.requests.write().remove(&height) {
1024 trace!(
1025 "Block request for height {height} was completed in {}ms (sync speed is {})",
1026 e.timestamp.elapsed().as_millis(),
1027 self.get_sync_speed()
1028 );
1029
1030 self.peer_notify.notify_one();
1032 }
1033 }
1034
1035 fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
1039 trace!("Block sync is removing all block requests to peer {peer_ip}...");
1040
1041 self.requests.write().retain(|height, e| {
1044 let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
1045
1046 let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
1049 if !retain {
1050 trace!("Removed block request timestamp for {peer_ip} at height {height}");
1051 }
1052 retain
1053 });
1054
1055 }
1057
1058 pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1070 &self,
1071 _peer_pool_handler: &P,
1072 ) -> Result<Option<BlockRequestBatch<N>>> {
1073 let mut requests = self.requests.write();
1075
1076 let now = Instant::now();
1078
1079 let current_height = self.ledger.latest_block_height();
1081
1082 let mut timed_out_requests = vec![];
1084
1085 let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1087
1088 requests.retain(|height, e| {
1090 let is_obsolete = *height <= current_height;
1091 let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1093 let is_complete = e.sync_ips().is_empty();
1095
1096 let is_timeout = timer_elapsed && !is_complete;
1098
1099 let retain = !is_timeout && !is_obsolete;
1101
1102 if is_timeout {
1103 trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1104
1105 timed_out_requests.push(*height);
1107 } else if is_obsolete {
1108 trace!("Block request at height {height} became obsolete (current_height={current_height})");
1109 }
1110
1111 if is_timeout {
1113 for peer_ip in e.sync_ips().iter() {
1114 peers_to_ban.insert(*peer_ip);
1115 }
1116 }
1117
1118 retain
1119 });
1120
1121 if !timed_out_requests.is_empty() {
1122 debug!("{num} block requests timed out", num = timed_out_requests.len());
1123 }
1124
1125 let next_request_height = requests.iter().next().map(|(h, _)| *h);
1126
1127 drop(requests);
1129
1130 for peer_ip in peers_to_ban {
1132 self.remove_peer(&peer_ip);
1133 }
1136
1137 let sync_height = self.get_sync_height();
1145 let start_height = sync_height + 1;
1146
1147 let end_height = if let Some(next_height) = next_request_height
1148 && next_height > start_height
1149 {
1150 next_height
1152 } else {
1153 return Ok(None);
1156 };
1157
1158 let max_new_blocks_to_request = end_height - start_height;
1160
1161 let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1162 bail!("Cannot re-request blocks because no or not enough peers are connected");
1164 };
1165
1166 let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1168 bail!("Cannot re-request blocks because no or not enough peers are connected");
1170 };
1171
1172 let requests = self.construct_requests(
1174 &sync_peers,
1175 sync_height,
1176 min_common_ancestor,
1177 max_new_blocks_to_request,
1178 greatest_peer_height,
1179 );
1180
1181 if let Some((height, _)) = requests.as_slice().first() {
1184 debug!("Re-requesting blocks starting at height {height}");
1185 Ok(Some((requests, sync_peers)))
1186 } else {
1187 Ok(None)
1189 }
1190 }
1191
1192 fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1200 let latest_ledger_height = self.ledger.latest_block_height();
1202
1203 let candidate_locators: IndexMap<_, _> = self
1206 .locators
1207 .read()
1208 .iter()
1209 .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1210 .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1211 .take(NUM_SYNC_CANDIDATE_PEERS)
1212 .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1213 .collect();
1214
1215 if candidate_locators.is_empty() {
1217 trace!("Found no sync peers with height greater {current_height}");
1218 return None;
1219 }
1220
1221 let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1228
1229 for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1232 let mut min_common_ancestor = peer_locators.latest_locator_height();
1234
1235 let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1238
1239 for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1241 if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1243 if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1245 min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1247
1248 sync_peers.push((*other_ip, other_locators.clone()));
1250 }
1251 }
1252 }
1253
1254 if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1256 sync_peers.shuffle(&mut rand::thread_rng());
1259
1260 return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1262 }
1263 }
1264
1265 None
1267 }
1268
1269 fn construct_requests(
1271 &self,
1272 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1273 sync_height: u32,
1274 min_common_ancestor: u32,
1275 max_blocks_to_request: u32,
1276 greatest_peer_height: u32,
1277 ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1278 let start_height = {
1280 let requests = self.requests.read();
1281 let ledger_height = self.ledger.latest_block_height();
1282
1283 let mut start_height = ledger_height.max(sync_height + 1);
1285
1286 while requests.contains_key(&start_height) {
1288 start_height += 1;
1289 }
1290
1291 start_height
1292 };
1293
1294 if min_common_ancestor < start_height {
1296 if start_height < greatest_peer_height {
1297 trace!(
1298 "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1299 );
1300 }
1301 return Default::default();
1302 }
1303
1304 let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1306
1307 let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1309 let mut max_num_sync_ips = 1;
1311
1312 for height in start_height..end_height {
1313 if let Err(err) = self.check_block_request(height) {
1315 trace!("{err}");
1316
1317 match request_hashes.is_empty() {
1320 true => continue,
1321 false => break,
1322 }
1323 }
1324
1325 let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1327
1328 if !is_honest {
1330 warn!("Detected dishonest peer(s) when preparing block request");
1332 if sync_peers.len() < num_sync_ips {
1334 break;
1335 }
1336 }
1337
1338 max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1340
1341 request_hashes.insert(height, (hash, previous_hash));
1343 }
1344
1345 request_hashes
1347 .into_iter()
1348 .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1349 .collect()
1350 }
1351}
1352
1353fn construct_request<N: Network>(
1356 height: u32,
1357 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1358) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1359 let mut hash = None;
1360 let mut hash_redundancy: usize = 0;
1361 let mut previous_hash = None;
1362 let mut is_honest = true;
1363
1364 for peer_locators in sync_peers.values() {
1365 if let Some(candidate_hash) = peer_locators.get_hash(height) {
1366 match hash {
1367 Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1369 Some(_) => {
1371 hash = None;
1372 hash_redundancy = 0;
1373 previous_hash = None;
1374 is_honest = false;
1375 break;
1376 }
1377 None => {
1379 hash = Some(candidate_hash);
1380 hash_redundancy = 1;
1381 }
1382 }
1383 }
1384 if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1385 match previous_hash {
1386 Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1388 Some(_) => {
1390 hash = None;
1391 hash_redundancy = 0;
1392 previous_hash = None;
1393 is_honest = false;
1394 break;
1395 }
1396 None => previous_hash = Some(candidate_previous_hash),
1398 }
1399 }
1400 }
1401
1402 let num_sync_ips = {
1405 if !is_honest {
1407 EXTRA_REDUNDANCY_FACTOR
1409 }
1410 else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1412 1
1414 }
1415 else {
1417 REDUNDANCY_FACTOR
1419 }
1420 };
1421
1422 (hash, previous_hash, num_sync_ips, is_honest)
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427 use super::*;
1428 use crate::locators::{
1429 CHECKPOINT_INTERVAL,
1430 NUM_RECENT_BLOCKS,
1431 test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1432 };
1433
1434 use snarkos_node_bft_ledger_service::MockLedgerService;
1435 use snarkos_node_network::{NodeType, Peer, Resolver};
1436 use snarkos_node_tcp::{P2P, Tcp};
1437 use snarkvm::{
1438 ledger::committee::Committee,
1439 prelude::{Field, TestRng},
1440 };
1441
1442 use indexmap::{IndexSet, indexset};
1443 #[cfg(feature = "locktick")]
1444 use locktick::parking_lot::RwLock;
1445 #[cfg(not(feature = "locktick"))]
1446 use parking_lot::RwLock;
1447 use rand::Rng;
1448 use std::net::{IpAddr, Ipv4Addr};
1449
1450 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1451
1452 #[derive(Default)]
1453 struct DummyPeerPoolHandler {
1454 peers_to_ban: RwLock<Vec<SocketAddr>>,
1455 }
1456
1457 impl P2P for DummyPeerPoolHandler {
1458 fn tcp(&self) -> &Tcp {
1459 unreachable!();
1460 }
1461 }
1462
1463 impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1464 const MAXIMUM_POOL_SIZE: usize = 10;
1465 const OWNER: &str = "[DummyPeerPoolHandler]";
1466 const PEER_SLASHING_COUNT: usize = 0;
1467
1468 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1469 unreachable!();
1470 }
1471
1472 fn resolver(&self) -> &RwLock<Resolver<N>> {
1473 unreachable!();
1474 }
1475
1476 fn is_dev(&self) -> bool {
1477 true
1478 }
1479
1480 fn trusted_peers_only(&self) -> bool {
1481 false
1482 }
1483
1484 fn node_type(&self) -> NodeType {
1485 NodeType::Client
1486 }
1487
1488 fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1489 self.peers_to_ban.write().push(listener_addr);
1490 }
1491 }
1492
1493 fn sample_peer_ip(id: u16) -> SocketAddr {
1495 assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1496 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1497 }
1498
1499 fn sample_committee() -> Committee<CurrentNetwork> {
1501 let rng = &mut TestRng::default();
1502 snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1503 }
1504
1505 fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1507 MockLedgerService::new_at_height(sample_committee(), height)
1508 }
1509
1510 fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1512 BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1513 }
1514
1515 fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1519 assert!(num_values > 0, "Cannot generate an empty vector");
1520 assert!((max_height as usize) >= num_values);
1521
1522 let mut rng = TestRng::default();
1523
1524 let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1525
1526 heights.push(max_height);
1527
1528 heights
1529 }
1530
1531 fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1533 BlockSync::<CurrentNetwork> {
1534 peer_notify: Notify::new(),
1535 response_notify: Default::default(),
1536 ledger: Arc::new(sample_ledger_service(height)),
1537 locators: RwLock::new(sync.locators.read().clone()),
1538 common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1539 requests: RwLock::new(sync.requests.read().clone()),
1540 sync_state: RwLock::new(sync.sync_state.read().clone()),
1541 advance_with_sync_blocks_lock: Default::default(),
1542 metrics: Default::default(),
1543 }
1544 }
1545
1546 fn check_prepare_block_requests(
1548 sync: BlockSync<CurrentNetwork>,
1549 min_common_ancestor: u32,
1550 peers: IndexSet<SocketAddr>,
1551 ) {
1552 let rng = &mut TestRng::default();
1553
1554 assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1556
1557 let num_peers_within_recent_range_of_ledger = {
1559 if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1561 0
1562 }
1563 else {
1565 peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1566 }
1567 };
1568
1569 let (requests, sync_peers) = sync.prepare_block_requests();
1571
1572 if peers.is_empty() {
1574 assert!(requests.is_empty());
1575 return;
1576 }
1577
1578 let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1580 assert_eq!(requests.len(), expected_num_requests);
1581
1582 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1583 let sync_ips: IndexSet<_> =
1585 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1586 assert_eq!(height, 1 + idx as u32);
1587 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1588 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1589
1590 if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1591 assert_eq!(sync_ips.len(), 1);
1592 } else {
1593 assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1594 assert_eq!(sync_ips, peers);
1595 }
1596 }
1597 }
1598
1599 #[test]
1601 fn test_latest_block_height() {
1602 for height in generate_block_heights(100_001, 5000) {
1603 let sync = sample_sync_at_height(height);
1604 assert_eq!(sync.ledger.latest_block_height(), height);
1606
1607 assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1609 assert_eq!(
1610 sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1611 height
1612 );
1613 }
1614 }
1615
1616 #[test]
1617 fn test_get_block_hash() {
1618 for height in generate_block_heights(100_001, 5000) {
1619 let sync = sample_sync_at_height(height);
1620
1621 assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1623 assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1624 }
1625 }
1626
1627 #[test]
1628 fn test_prepare_block_requests() {
1629 for num_peers in 0..111 {
1630 println!("Testing with {num_peers} peers");
1631
1632 let sync = sample_sync_at_height(0);
1633
1634 let mut peers = indexset![];
1635
1636 for peer_id in 1..=num_peers {
1637 sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1639 peers.insert(sample_peer_ip(peer_id));
1641 }
1642
1643 check_prepare_block_requests(sync, 10, peers);
1645 }
1646 }
1647
1648 #[test]
1649 fn test_prepare_block_requests_with_leading_fork_at_11() {
1650 let sync = sample_sync_at_height(0);
1651
1652 let peer_1 = sample_peer_ip(1);
1663 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1664
1665 let peer_2 = sample_peer_ip(2);
1667 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1668
1669 let peer_3 = sample_peer_ip(3);
1671 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1672
1673 let (requests, _) = sync.prepare_block_requests();
1675 assert_eq!(requests.len(), 10);
1676
1677 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1679 assert_eq!(height, 1 + idx as u32);
1680 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1681 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1682 assert_eq!(num_sync_ips, 1); }
1684 }
1685
1686 #[test]
1687 fn test_prepare_block_requests_with_leading_fork_at_10() {
1688 let rng = &mut TestRng::default();
1689 let sync = sample_sync_at_height(0);
1690
1691 let peer_1 = sample_peer_ip(1);
1706 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1707
1708 let peer_2 = sample_peer_ip(2);
1710 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1711
1712 let peer_3 = sample_peer_ip(3);
1714 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1715
1716 let (requests, _) = sync.prepare_block_requests();
1718 assert_eq!(requests.len(), 0);
1719
1720 let peer_4 = sample_peer_ip(4);
1724 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1725
1726 let (requests, sync_peers) = sync.prepare_block_requests();
1728 assert_eq!(requests.len(), 10);
1729
1730 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1732 let sync_ips: IndexSet<_> =
1734 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1735 assert_eq!(height, 1 + idx as u32);
1736 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1737 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1738 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_1); }
1741 }
1742
1743 #[test]
1744 fn test_prepare_block_requests_with_trailing_fork_at_9() {
1745 let rng = &mut TestRng::default();
1746 let sync = sample_sync_at_height(0);
1747
1748 let peer_1 = sample_peer_ip(1);
1754 sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1755
1756 let peer_2 = sample_peer_ip(2);
1758 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1759
1760 let peer_3 = sample_peer_ip(3);
1762 sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1763
1764 let (requests, _) = sync.prepare_block_requests();
1766 assert_eq!(requests.len(), 0);
1767
1768 let peer_4 = sample_peer_ip(4);
1772 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1773
1774 let (requests, sync_peers) = sync.prepare_block_requests();
1776 assert_eq!(requests.len(), 10);
1777
1778 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1780 let sync_ips: IndexSet<_> =
1782 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1783 assert_eq!(height, 1 + idx as u32);
1784 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1785 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1786 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_3); }
1789 }
1790
1791 #[test]
1792 fn test_insert_block_requests() {
1793 let rng = &mut TestRng::default();
1794 let sync = sample_sync_at_height(0);
1795
1796 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1798
1799 let (requests, sync_peers) = sync.prepare_block_requests();
1801 assert_eq!(requests.len(), 10);
1802
1803 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1804 let sync_ips: IndexSet<_> =
1806 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1807 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1809 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1811 assert!(sync.get_block_request_timestamp(height).is_some());
1812 }
1813
1814 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1815 let sync_ips: IndexSet<_> =
1817 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1818 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1820 assert!(sync.get_block_request_timestamp(height).is_some());
1821 }
1822
1823 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1824 let sync_ips: IndexSet<_> =
1826 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1827 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1829 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1831 assert!(sync.get_block_request_timestamp(height).is_some());
1832 }
1833 }
1834
1835 #[test]
1836 fn test_insert_block_requests_fails() {
1837 let sync = sample_sync_at_height(9);
1838
1839 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1841
1842 sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1844 sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1846 }
1847
1848 #[test]
1849 fn test_update_peer_locators() {
1850 let sync = sample_sync_at_height(0);
1851
1852 let peer1_ip = sample_peer_ip(1);
1854 for peer1_height in 0..500u32 {
1855 sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1856 assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1857
1858 let peer2_ip = sample_peer_ip(2);
1859 for peer2_height in 0..500u32 {
1860 println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1861
1862 sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1863 assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1864
1865 let distance = peer1_height.abs_diff(peer2_height);
1867
1868 if distance < NUM_RECENT_BLOCKS as u32 {
1870 let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1871 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1872 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1873 } else {
1874 let min_checkpoints =
1875 core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1876 let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1877 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1878 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1879 }
1880 }
1881 }
1882 }
1883
1884 #[test]
1885 fn test_remove_peer() {
1886 let sync = sample_sync_at_height(0);
1887
1888 let peer_ip = sample_peer_ip(1);
1889 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1890 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1891
1892 sync.remove_peer(&peer_ip);
1893 assert_eq!(sync.get_peer_height(&peer_ip), None);
1894
1895 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1896 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1897
1898 sync.remove_peer(&peer_ip);
1899 assert_eq!(sync.get_peer_height(&peer_ip), None);
1900 }
1901
1902 #[test]
1903 fn test_locators_insert_remove_insert() {
1904 let sync = sample_sync_at_height(0);
1905
1906 let peer_ip = sample_peer_ip(1);
1907 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1908 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1909
1910 sync.remove_peer(&peer_ip);
1911 assert_eq!(sync.get_peer_height(&peer_ip), None);
1912
1913 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1914 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1915 }
1916
1917 #[test]
1918 fn test_requests_insert_remove_insert() {
1919 let rng = &mut TestRng::default();
1920 let sync = sample_sync_at_height(0);
1921
1922 let peer_ip = sample_peer_ip(1);
1924 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1925
1926 let (requests, sync_peers) = sync.prepare_block_requests();
1928 assert_eq!(requests.len(), 10);
1929
1930 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1931 let sync_ips: IndexSet<_> =
1933 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1934 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1936 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1938 assert!(sync.get_block_request_timestamp(height).is_some());
1939 }
1940
1941 sync.remove_peer(&peer_ip);
1943
1944 for (height, _) in requests {
1945 assert_eq!(sync.get_block_request(height), None);
1947 assert!(sync.get_block_request_timestamp(height).is_none());
1948 }
1949
1950 let (requests, _) = sync.prepare_block_requests();
1952 assert_eq!(requests.len(), 0);
1953
1954 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1956
1957 let (requests, _) = sync.prepare_block_requests();
1959 assert_eq!(requests.len(), 10);
1960
1961 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1962 let sync_ips: IndexSet<_> =
1964 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1965 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1967 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1969 assert!(sync.get_block_request_timestamp(height).is_some());
1970 }
1971 }
1972
1973 #[test]
1974 fn test_obsolete_block_requests() {
1975 let rng = &mut TestRng::default();
1976 let sync = sample_sync_at_height(0);
1977
1978 let locator_height = rng.gen_range(0..50);
1979
1980 let locators = sample_block_locators(locator_height);
1982 sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1983
1984 let (requests, sync_peers) = sync.prepare_block_requests();
1986 assert_eq!(requests.len(), locator_height as usize);
1987
1988 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1990 let sync_ips: IndexSet<_> =
1992 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1993 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1995 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1997 assert!(sync.get_block_request_timestamp(height).is_some());
1998 }
1999
2000 let ledger_height = rng.gen_range(0..=locator_height);
2004 let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
2005
2006 assert_eq!(new_sync.requests.read().len(), requests.len());
2008
2009 let c = DummyPeerPoolHandler::default();
2011 new_sync.handle_block_request_timeouts(&c).unwrap();
2012
2013 assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
2015 }
2016
2017 #[test]
2018 fn test_timed_out_block_request() {
2019 let sync = sample_sync_at_height(0);
2020 let peer_ip = sample_peer_ip(1);
2021 let locators = sample_block_locators(10);
2022 let block_hash = locators.get_hash(1);
2023
2024 sync.update_peer_locators(peer_ip, &locators).unwrap();
2025
2026 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2027
2028 sync.requests.write().insert(1, OutstandingRequest {
2030 request: (block_hash, None, [peer_ip].into()),
2031 timestamp,
2032 response: None,
2033 });
2034
2035 assert_eq!(sync.requests.read().len(), 1);
2036 assert_eq!(sync.locators.read().len(), 1);
2037
2038 let c = DummyPeerPoolHandler::default();
2040 sync.handle_block_request_timeouts(&c).unwrap();
2041
2042 assert!(sync.requests.read().is_empty());
2047 assert!(sync.locators.read().is_empty());
2048 }
2049
2050 #[test]
2051 fn test_reissue_timed_out_block_request() {
2052 let sync = sample_sync_at_height(0);
2053 let peer_ip1 = sample_peer_ip(1);
2054 let peer_ip2 = sample_peer_ip(2);
2055 let peer_ip3 = sample_peer_ip(3);
2056
2057 let locators = sample_block_locators(10);
2058 let block_hash1 = locators.get_hash(1);
2059 let block_hash2 = locators.get_hash(2);
2060
2061 sync.update_peer_locators(peer_ip1, &locators).unwrap();
2062 sync.update_peer_locators(peer_ip2, &locators).unwrap();
2063 sync.update_peer_locators(peer_ip3, &locators).unwrap();
2064
2065 assert_eq!(sync.locators.read().len(), 3);
2066
2067 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2068
2069 sync.requests.write().insert(1, OutstandingRequest {
2071 request: (block_hash1, None, [peer_ip1].into()),
2072 timestamp,
2073 response: None,
2074 });
2075
2076 sync.requests.write().insert(2, OutstandingRequest {
2078 request: (block_hash2, None, [peer_ip2].into()),
2079 timestamp: Instant::now(),
2080 response: None,
2081 });
2082
2083 assert_eq!(sync.requests.read().len(), 2);
2084
2085 let c = DummyPeerPoolHandler::default();
2087
2088 let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2089
2090 assert_eq!(sync.requests.read().len(), 1);
2095 assert_eq!(sync.locators.read().len(), 2);
2096
2097 let (new_requests, new_sync_ips) = re_requests.unwrap();
2098 assert_eq!(new_requests.len(), 1);
2099
2100 let (height, (hash, _, _)) = new_requests.first().unwrap();
2101 assert_eq!(*height, 1);
2102 assert_eq!(*hash, block_hash1);
2103 assert_eq!(new_sync_ips.len(), 2);
2104
2105 let mut iter = new_sync_ips.iter();
2107 assert_ne!(iter.next().unwrap().0, &peer_ip1);
2108 assert_ne!(iter.next().unwrap().0, &peer_ip1);
2109 }
2110}