1use crate::{
17 helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18 locators::BlockLocators,
19};
20use futures::future::BoxFuture;
21use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService};
22use snarkos_node_network::ConnectionMode;
23use snarkos_node_router::messages::DataBlocks;
24use snarkos_node_sync_communication_service::CommunicationService;
25use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
26
27use snarkvm::{
28 console::network::{ConsensusVersion, Network},
29 ledger::{Block, CheckBlockError},
30 utilities::flatten_error,
31};
32
33use anyhow::{Context, Result, anyhow, bail, ensure};
34use futures::FutureExt;
35use indexmap::{IndexMap, IndexSet};
36use itertools::Itertools;
37#[cfg(feature = "locktick")]
38use locktick::parking_lot::{Mutex, RwLock};
39#[cfg(feature = "locktick")]
40use locktick::tokio::Mutex as TMutex;
41#[cfg(not(feature = "locktick"))]
42use parking_lot::Mutex;
43#[cfg(not(feature = "locktick"))]
44use parking_lot::RwLock;
45use rand::seq::{IteratorRandom, SliceRandom};
46use std::{
47 collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
48 net::{IpAddr, Ipv4Addr, SocketAddr},
49 sync::Arc,
50 time::{Duration, Instant},
51};
52#[cfg(not(feature = "locktick"))]
53use tokio::sync::Mutex as TMutex;
54use tokio::sync::Notify;
55use tracing::info;
56
57mod helpers;
58use helpers::rangify_heights;
59
60mod sync_state;
61pub use sync_state::BftSyncMode;
62use sync_state::SyncState;
63
64mod metrics;
65use metrics::BlockSyncMetrics;
66
67#[cfg(not(test))]
71pub const REDUNDANCY_FACTOR: usize = 1;
72#[cfg(test)]
73pub const REDUNDANCY_FACTOR: usize = 3;
74
75pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
82
83const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
84const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
85
86const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
87
88const MAX_BLOCK_REQUESTS: usize = 50; pub const MAX_BLOCKS_BEHIND: u32 = 1; pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
98
99type FailedRequests<H> = BTreeMap<u32, (Option<H>, Option<H>)>;
101
102#[derive(Clone)]
105struct OutstandingRequest<N: Network> {
106 request: SyncRequest<N>,
107 timestamp: Instant,
108 response: Option<Block<N>>,
111}
112
113#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestInfo {
116 elapsed: u64,
118 done: bool,
120}
121
122#[derive(Clone, serde::Serialize)]
124pub struct BlockRequestsSummary {
125 outstanding: String,
126 completed: String,
127}
128
129#[derive(thiserror::Error, Debug)]
130pub enum InsertBlockResponseError<N: Network> {
131 #[error("Empty block response")]
132 EmptyBlockResponse,
133 #[error("The peer did not send a consensus version")]
134 NoConsensusVersion,
135 #[error(
136 "The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
137 )]
138 ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
139 #[error("Block Sync already advanced to block {height}")]
140 BlockSyncAlreadyAdvanced { height: u32 },
141 #[error("No such request for height {height}")]
142 NoSuchRequest { height: u32 },
143 #[error("Invalid block hash for height {height} from '{peer_ip}': expected {expected_hash}, got {actual_hash}")]
144 InvalidBlockHash { height: u32, peer_ip: SocketAddr, expected_hash: N::BlockHash, actual_hash: N::BlockHash },
145 #[error(
146 "The previous block hash in candidate block {height} from '{peer_ip}' is incorrect: expected {expected}, but got {actual}"
147 )]
148 InvalidPreviousBlockHash { height: u32, peer_ip: SocketAddr, expected: N::BlockHash, actual: N::BlockHash },
149 #[error("Candidate block {height} from '{peer_ip}' is malformed")]
150 MalformedBlock { height: u32, peer_ip: SocketAddr },
151 #[error("The sync pool did not request block {height} from '{peer_ip}'")]
152 WrongSyncPeer { height: u32, peer_ip: SocketAddr },
153 #[error("{}", flatten_error(.0))]
154 Other(#[from] anyhow::Error),
155}
156
157impl<N: Network> InsertBlockResponseError<N> {
158 pub fn is_benign(&self) -> bool {
160 matches!(self, Self::NoSuchRequest { .. } | Self::BlockSyncAlreadyAdvanced { .. })
161 }
162
163 pub fn is_invalid_consensus_version(&self) -> bool {
165 matches!(self, Self::ConsensusVersionMismatch { .. } | Self::NoConsensusVersion)
166 }
167}
168
169impl<N: Network> OutstandingRequest<N> {
170 fn sync_ips(&self) -> &IndexSet<SocketAddr> {
172 let (_, _, sync_ips) = &self.request;
173 sync_ips
174 }
175
176 fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
178 let (_, _, sync_ips) = &mut self.request;
179 sync_ips
180 }
181}
182
183pub struct BlockSync<N: Network> {
196 ledger: Arc<dyn LedgerService<N>>,
198
199 connection_mode: ConnectionMode,
201
202 locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
205
206 common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
211
212 requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
214
215 sync_state: RwLock<SyncState>,
219
220 advance_with_sync_blocks_lock: TMutex<()>,
222
223 peer_notify: Notify,
225
226 response_notify: Notify,
228
229 metrics: BlockSyncMetrics,
231
232 prepare_requests_lock: Mutex<()>,
234
235 failed_requests: Mutex<FailedRequests<N::BlockHash>>,
237
238 last_response_at: Mutex<HashMap<SocketAddr, Instant>>,
243
244 synced_notify: Notify,
246}
247
248impl<N: Network> BlockSync<N> {
249 pub fn new(ledger: Arc<dyn LedgerService<N>>, connection_mode: ConnectionMode) -> Self {
251 let sync_state = SyncState::new_with_height(ledger.latest_block_height());
253
254 Self {
255 ledger,
256 connection_mode,
257 sync_state: RwLock::new(sync_state),
258 peer_notify: Default::default(),
259 response_notify: Default::default(),
260 locators: Default::default(),
261 requests: Default::default(),
262 common_ancestors: Default::default(),
263 advance_with_sync_blocks_lock: Default::default(),
264 metrics: Default::default(),
265 prepare_requests_lock: Default::default(),
266 failed_requests: Default::default(),
267 last_response_at: Default::default(),
268 synced_notify: Default::default(),
269 }
270 }
271
272 pub async fn wait_for_peer_update(&self) {
280 self.peer_notify.notified().await
281 }
282
283 pub async fn wait_for_block_responses(&self) {
290 self.response_notify.notified().await
291 }
292
293 #[inline]
295 pub fn is_block_synced(&self) -> bool {
296 self.sync_state.read().is_block_synced()
297 }
298
299 pub async fn wait_for_synced(&self) {
304 loop {
305 let mut fut = std::pin::pin!(self.synced_notify.notified());
306
307 {
308 let sync_state = self.sync_state.read();
309 if sync_state.is_block_synced() {
310 return;
311 }
312
313 fut.as_mut().enable();
315 }
316
317 fut.await;
318 }
319 }
320
321 pub fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<()>> {
328 let mut notified = Box::pin(self.synced_notify.notified());
329
330 {
331 let sync_state = self.sync_state.read();
332 if sync_state.is_block_synced() {
333 return None;
334 }
335
336 notified.as_mut().enable();
338 }
339
340 Some(
341 async move {
342 notified.await;
343 self.wait_for_synced().await;
344 }
345 .boxed(),
346 )
347 }
348
349 #[inline]
352 pub fn num_blocks_behind(&self) -> Option<u32> {
353 self.sync_state.read().num_blocks_behind()
354 }
355
356 #[inline]
358 pub fn greatest_peer_block_height(&self) -> Option<u32> {
359 self.sync_state.read().get_greatest_peer_height()
360 }
361
362 #[inline]
365 pub fn get_sync_height(&self) -> u32 {
366 self.sync_state.read().get_sync_height()
367 }
368
369 #[inline]
371 pub fn get_bft_sync_mode(&self) -> Option<BftSyncMode> {
372 self.sync_state.read().get_bft_sync_mode()
373 }
374
375 #[inline]
380 pub fn set_bft_sync_mode(&self, mode: BftSyncMode) -> Option<BftSyncMode> {
381 self.sync_state.write().set_bft_sync_mode(mode)
382 }
383
384 #[inline]
386 pub fn num_outstanding_block_requests(&self) -> usize {
387 self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
388 }
389
390 #[inline]
392 pub fn num_total_block_requests(&self) -> usize {
393 self.requests.read().len()
394 }
395
396 pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
398 self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
399 }
400
401 pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
403 self.requests
404 .read()
405 .iter()
406 .map(|(height, request)| {
407 (*height, BlockRequestInfo {
408 done: request.sync_ips().is_empty(),
409 elapsed: request.timestamp.elapsed().as_secs(),
410 })
411 })
412 .collect()
413 }
414
415 pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
417 let requests = self.requests.read();
418 let completed = requests.iter().filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None });
419 let outstanding = requests.iter().filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None });
420
421 BlockRequestsSummary { completed: rangify_heights(completed), outstanding: rangify_heights(outstanding) }
422 }
423
424 pub fn get_sync_speed(&self) -> f64 {
425 self.metrics.get_sync_speed()
426 }
427}
428
429#[cfg(test)]
431impl<N: Network> BlockSync<N> {
432 fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
434 self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
435 }
436
437 fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
439 self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
440 }
441
442 fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
444 self.requests.read().get(&height).map(|e| e.request.clone())
445 }
446
447 fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
449 self.requests.read().get(&height).map(|e| e.timestamp)
450 }
451}
452
453impl<N: Network> BlockSync<N> {
454 #[inline]
456 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
457 let latest_height = self.ledger.latest_block_height();
459
460 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
463 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
465 recents.insert(height, self.ledger.get_block_hash(height)?);
466 }
467
468 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
470 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
472 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
473 }
474
475 BlockLocators::new(recents, checkpoints)
477 }
478
479 pub fn has_pending_responses(&self) -> bool {
481 self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
482 }
483
484 pub async fn send_block_requests<C: CommunicationService>(
486 &self,
487 communication: &C,
488 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
489 requests: &[(u32, PrepareSyncRequest<N>)],
490 ) -> bool {
491 let (start_height, max_num_sync_ips) = match requests.first() {
492 Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
493 None => {
494 warn!("Block sync failed - no block requests");
495 return false;
496 }
497 };
498
499 let sync_ips: IndexSet<_> =
501 sync_peers.keys().copied().sample(&mut rand::rng(), max_num_sync_ips).into_iter().collect();
502
503 let end_height = start_height.saturating_add(requests.len() as u32);
505
506 {
511 let _prepare_requests_lock = self.prepare_requests_lock.lock();
512 let all_still_connected = {
513 let locators = self.locators.read();
514 sync_ips.iter().all(|ip| locators.contains_key(ip))
515 };
516
517 if !all_still_connected {
518 trace!(
519 "Skipping block request batch for heights {start_height}-{inclusive_end}: at least one of the selected peer(s) has disconnected",
520 inclusive_end = end_height.saturating_sub(1)
521 );
522 return false;
523 }
524
525 for (height, (hash, previous_hash, _)) in requests.iter() {
527 if let Err(err) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
529 let err = err.context(format!("Failed to insert block request for height {height}"));
530 warn!("{}", flatten_error(&err));
531 return false;
532 }
533 }
534 }
535
536 debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_ips);
537
538 let message = C::prepare_block_request(start_height, end_height);
542
543 let mut tasks = Vec::with_capacity(sync_ips.len());
545 for sync_ip in sync_ips {
546 let sender = communication.send(sync_ip, message.clone()).await;
547 let task = tokio::spawn(async move {
548 match sender {
550 Some(sender) => {
551 if let Err(err) = sender.await {
552 warn!("Failed to send block request to peer '{sync_ip}': {err}");
553 false
554 } else {
555 true
556 }
557 }
558 None => {
559 warn!("Failed to send block request to peer '{sync_ip}': no such peer");
560 false
561 }
562 }
563 });
564
565 tasks.push(task);
566 }
567
568 for result in futures::future::join_all(tasks).await {
570 let success = match result {
571 Ok(success) => success,
572 Err(err) => {
573 error!("tokio join error: {err}");
574 false
575 }
576 };
577
578 if !success {
580 let mut requests = self.requests.write();
582 for height in start_height..end_height {
583 requests.remove(&height);
584 }
585 return false;
587 }
588 }
589 true
590 }
591
592 pub async fn try_issuing_block_requests<C: CommunicationService>(&self, communication: &C) {
598 self.handle_block_request_timeouts();
599
600 if self.is_block_synced() {
601 trace!("Node is already synced. Will not issue new block requests");
602 return;
603 }
604
605 if !self.sync_state.read().can_issue_new_block_requests() && self.failed_requests.lock().is_empty() {
606 trace!("Nothing to sync. Will not issue new block requests");
607 return;
608 }
609
610 let batches = self.prepare_block_requests();
611
612 if batches.is_empty() {
613 let total_requests = self.num_total_block_requests();
614 let num_outstanding = self.num_outstanding_block_requests();
615 if total_requests != 0 {
616 trace!(
617 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
618 );
619 } else {
620 debug!(
621 "Not block synced yet, and there are no outstanding block requests or \
622 new block requests to send"
623 );
624 }
625 } else {
626 for (block_requests, sync_peers) in batches {
627 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
628 if !self.send_block_requests(communication, &sync_peers, requests).await {
629 break;
630 }
631 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
632 }
633 }
634 }
635 }
636
637 #[inline]
645 pub fn insert_block_responses(
646 &self,
647 peer_ip: SocketAddr,
648 blocks: Vec<Block<N>>,
649 latest_consensus_version: Option<ConsensusVersion>,
650 ) -> Result<(), InsertBlockResponseError<N>> {
651 let result = 'outer: {
653 let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
654 break 'outer Err(InsertBlockResponseError::EmptyBlockResponse);
655 };
656
657 let expected_consensus_version =
658 N::CONSENSUS_VERSION(last_height).map_err(InsertBlockResponseError::Other)?;
659
660 if expected_consensus_version >= ConsensusVersion::V12 {
663 if let Some(peer_version) = latest_consensus_version {
664 if peer_version != expected_consensus_version {
665 break 'outer Err(InsertBlockResponseError::ConsensusVersionMismatch {
666 peer_version,
667 expected_version: expected_consensus_version,
668 last_height,
669 });
670 }
671 } else {
672 break 'outer Err(InsertBlockResponseError::NoConsensusVersion);
673 }
674 }
675
676 for block in blocks {
678 if let Err(error) = self.insert_block_response(peer_ip, block) {
679 break 'outer Err(error);
680 }
681 }
682
683 Ok(())
684 };
685
686 if result.is_err() {
688 self.remove_block_requests_to_peer(&peer_ip);
689 }
690
691 result
693 }
694
695 #[inline]
698 pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
699 if let Some(entry) = self.requests.read().get(&next_height) {
702 let is_complete = entry.sync_ips().is_empty();
703 if !is_complete {
704 return None;
705 }
706
707 if entry.response.is_none() {
709 warn!("Request for height {next_height} is complete but no response exists");
710 }
711 entry.response.clone()
712 } else {
713 None
714 }
715 }
716
717 #[inline]
726 pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
727 let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
734 trace!("Skipping attempt to advance block synchronziation as it is already in progress");
735 return Ok(false);
736 };
737
738 let mut current_height = self.ledger.latest_block_height();
740 let start_height = current_height;
741 trace!(
742 "Try advancing with block responses (at block {current_height}, current sync speed is {})",
743 self.get_sync_speed()
744 );
745
746 loop {
747 let next_height = current_height + 1;
748
749 let Some(block) = self.peek_next_block(next_height) else {
750 break;
751 };
752
753 if block.height() != next_height {
755 warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
756 break;
757 }
758
759 let ledger = self.ledger.clone();
760
761 let (advanced, stop) = tokio::task::spawn_blocking(move || {
762 let ledger_update = match ledger.begin_ledger_update() {
763 Ok(update) => update,
764 Err(BeginLedgerUpdateError::ShuttingDown) => {
765 info!("BlockSync cannot advance the ledger any more. The node is shutting down.");
766 return Ok((false, true));
767 }
768 Err(err) => {
769 return Err(anyhow!("Unexpected error when beginning ledger update: {err}"));
770 }
771 };
772
773 let block = match ledger_update.check_next_block(block) {
775 Ok(block) => block,
776 Err(CheckBlockError::InvalidHeight { .. })
777 | Err(CheckBlockError::BlockAlreadyExists { .. })
778 | Err(CheckBlockError::InvalidRound { .. }) => {
779 debug!("Skipping a block at height {next_height}. The ledger already advanced",);
780 return Ok((false, false));
781 }
782 Err(err) => {
783 warn!("{err}");
784 return Err(err.into_anyhow());
785 }
786 };
787
788 ledger_update.advance_to_next_block(&block).with_context(|| {
789 format!(
790 "Failed to advance to next block (height: {height}, hash: {hash})",
791 height = block.height(),
792 hash = block.hash(),
793 )
794 })?;
795
796 Ok((true, false))
797 })
798 .await??;
799
800 if advanced {
803 self.count_request_completed();
804 }
805
806 self.remove_block_response(next_height);
808
809 if stop {
811 break;
812 }
813
814 current_height = next_height;
816 }
817
818 if current_height > start_height {
819 self.set_sync_height(current_height);
820 Ok(true)
821 } else {
822 Ok(false)
823 }
824 }
825}
826
827impl<N: Network> BlockSync<N> {
828 pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
835 let current_height = self.get_sync_height();
837
838 if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
839 let sync_peers =
841 sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
842 Some((sync_peers, min_common_ancestor))
844 } else {
845 None
846 }
847 }
848
849 pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
857 let connection_mode = self.connection_mode;
858 match self.locators.write().entry(peer_ip) {
861 hash_map::Entry::Occupied(mut e) => {
862 if e.get() == locators {
864 return Ok(());
865 }
866
867 let old_height = e.get().latest_locator_height();
868 let new_height = locators.latest_locator_height();
869
870 if old_height > new_height {
871 debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
872 }
873 e.insert(locators.clone());
874 }
875 hash_map::Entry::Vacant(e) => {
876 e.insert(locators.clone());
877 }
878 }
879
880 let new_local_ancestor = {
882 let mut ancestor = 0;
883 for (height, hash) in locators.clone().into_iter() {
887 if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
888 match ledger_hash == hash {
889 true => ancestor = height,
890 false => {
891 warn!(
892 "[{connection_mode}] Detected fork between this node and peer \"{peer_ip}\" at height {height}"
893 );
894 break;
895 }
896 }
897 }
898 }
899 ancestor
900 };
901
902 let ancestor_updates: Vec<_> = self
905 .locators
906 .read()
907 .iter()
908 .filter_map(|(other_ip, other_locators)| {
909 if other_ip == &peer_ip {
911 return None;
912 }
913 let mut ancestor = 0;
915 for (height, hash) in other_locators.clone().into_iter() {
916 if let Some(expected_hash) = locators.get_hash(height) {
917 match expected_hash == hash {
918 true => ancestor = height,
919 false => {
920 debug!(
921 "[{connection_mode}] Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
922 );
923 break;
924 }
925 }
926 }
927 }
928
929 Some((PeerPair(peer_ip, *other_ip), ancestor))
930 })
931 .collect();
932
933 {
936 let mut common_ancestors = self.common_ancestors.write();
937 common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
938
939 for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
940 common_ancestors.insert(peer_pair, new_ancestor);
941 }
942 }
943
944 let is_synced = if let Some(greatest_peer_height) =
946 self.locators.read().values().map(|l| l.latest_locator_height()).max()
947 {
948 let mut sync_state = self.sync_state.write();
949 sync_state.set_greatest_peer_height(greatest_peer_height);
950 sync_state.is_block_synced()
951 } else {
952 error!("Got new block locators but greatest peer height is zero.");
953 false
954 };
955
956 if is_synced {
958 self.synced_notify.notify_waiters();
959 }
960
961 self.peer_notify.notify_one();
964
965 Ok(())
966 }
967
968 pub fn remove_peer(&self, peer_ip: &SocketAddr) {
972 trace!("Removing peer {peer_ip} from block sync");
973
974 let _prepare_requests_lock = self.prepare_requests_lock.lock();
976
977 self.locators.write().remove(peer_ip);
979 self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
981 self.last_response_at.lock().remove(peer_ip);
983 self.remove_block_requests_to_peer(peer_ip);
985
986 let synced = {
987 let max_height = self.locators.read().values().map(|l| l.latest_locator_height()).max();
989 let mut sync_state = self.sync_state.write();
990
991 if let Some(greatest_peer_height) = max_height {
993 sync_state.set_greatest_peer_height(greatest_peer_height);
994 } else {
995 sync_state.clear_greatest_peer_height();
997 }
998
999 sync_state.is_block_synced()
1000 };
1001
1002 if synced {
1004 self.synced_notify.notify_waiters();
1005 }
1006
1007 self.peer_notify.notify_one();
1009 }
1010}
1011
1012pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
1014
1015impl<N: Network> BlockSync<N> {
1016 pub fn prepare_block_requests(&self) -> Vec<BlockRequestBatch<N>> {
1034 let _block_requests_lock = self.prepare_requests_lock.lock();
1035
1036 let print_requests = || {
1038 if tracing::enabled!(tracing::Level::TRACE) {
1039 let summary = self.get_block_requests_summary();
1040
1041 if summary.completed.is_empty() {
1042 trace!("There are no completed requests that have not been processed yet.");
1043 } else {
1044 trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
1045 }
1046
1047 if summary.outstanding.is_empty() {
1048 trace!("There are no outstanding requests.");
1049 } else {
1050 trace!("The following requests are still outstanding: {:?}", summary.outstanding);
1051 }
1052 }
1053 };
1054
1055 let current_height = self.get_sync_height();
1057
1058 let mut failed_requests = self.failed_requests.lock();
1062
1063 while let Some(height) = failed_requests.keys().next()
1065 && *height <= current_height
1066 {
1067 failed_requests.pop_first();
1068 }
1069
1070 if !failed_requests.is_empty() {
1072 trace!("There are {} failed requests that need to be re-issued.", failed_requests.len());
1073
1074 let iter = failed_requests.iter();
1076 let mut batches: VecDeque<Vec<(u32, _, _)>> = VecDeque::new();
1077
1078 for (height, (hash, previous_hash)) in iter {
1079 if let Some(prev_batch) = batches.back_mut() {
1080 if let Some((last_height, _, _)) = prev_batch.last()
1081 && *last_height + 1 != *height
1082 {
1083 batches.push_back(vec![(*height, *hash, *previous_hash)]);
1085 } else {
1086 prev_batch.push((*height, *hash, *previous_hash));
1088 }
1089 } else {
1090 batches.push_back(vec![(*height, *hash, *previous_hash)]);
1092 }
1093 }
1094
1095 let mut result = vec![];
1096 while let Some(batch) = batches.pop_front() {
1097 let start_height = batch.first().unwrap().0;
1099 let end_height = batch.last().unwrap().0 + 1;
1100
1101 let max_new_blocks_to_request = end_height - start_height;
1103
1104 let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1105 error!("Cannot re-request blocks because no or not enough peers are connected");
1107 return result;
1108 };
1109
1110 let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1112 error!(
1114 "Cannot re-request blocks because no or not enough peers with sufficient height are connected"
1115 );
1116 return result;
1117 };
1118
1119 let requests = self.construct_requests(
1121 &sync_peers,
1122 start_height.saturating_sub(1),
1123 min_common_ancestor,
1124 max_new_blocks_to_request,
1125 greatest_peer_height,
1126 );
1127
1128 for (height, _) in &requests {
1131 failed_requests.remove(height);
1132 }
1133
1134 result.push((requests, sync_peers));
1135 }
1136
1137 return result;
1138 }
1139
1140 let max_outstanding_block_requests =
1142 (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
1143
1144 let max_total_requests = 4 * max_outstanding_block_requests;
1146
1147 let max_new_blocks_to_request =
1148 max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
1149
1150 if self.num_total_block_requests() >= max_total_requests as usize {
1152 trace!(
1153 "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
1154 );
1155
1156 print_requests();
1157 vec![]
1158 } else if max_new_blocks_to_request == 0 {
1159 trace!(
1160 "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
1161 );
1162
1163 print_requests();
1164 vec![]
1165 } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
1166 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
1169
1170 let requests = self.construct_requests(
1172 &sync_peers,
1173 current_height,
1174 min_common_ancestor,
1175 max_new_blocks_to_request,
1176 greatest_peer_height,
1177 );
1178
1179 if !requests.is_empty() {
1180 trace!(
1181 "Generated new block requests for the following heights: {}",
1182 rangify_heights(requests.iter().map(|(h, _)| *h))
1183 );
1184 }
1185
1186 vec![(requests, sync_peers)]
1187 } else if self.requests.read().is_empty() {
1188 vec![]
1194 } else {
1195 trace!("No new blocks can be requested, but there are still outstanding requests.");
1197
1198 print_requests();
1199 vec![]
1200 }
1201 }
1202
1203 pub fn count_request_completed(&self) {
1208 self.metrics.count_request_completed();
1209 }
1210
1211 pub fn set_sync_height(&self, new_height: u32) {
1214 let (synced, fully_synced) = {
1216 let mut state = self.sync_state.write();
1217 state.set_sync_height(new_height);
1218 (state.is_block_synced(), !state.can_issue_new_block_requests())
1219 };
1220
1221 if fully_synced {
1222 self.metrics.mark_fully_synced();
1223 }
1224
1225 if synced {
1226 self.synced_notify.notify_waiters();
1227 }
1228 }
1229
1230 fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
1238 self.check_block_request(height)?;
1240 ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
1242 self.requests.write().insert(height, OutstandingRequest {
1244 request: (hash, previous_hash, sync_ips),
1245 timestamp: Instant::now(),
1246 response: None,
1247 });
1248 Ok(())
1249 }
1250
1251 fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<(), InsertBlockResponseError<N>> {
1254 let height = block.height();
1256 let mut requests = self.requests.write();
1257
1258 if self.ledger.contains_block_height(height) {
1259 return Err(InsertBlockResponseError::BlockSyncAlreadyAdvanced { height });
1260 }
1261
1262 let Some(entry) = requests.get_mut(&height) else {
1263 return Err(InsertBlockResponseError::NoSuchRequest { height });
1264 };
1265
1266 let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
1268
1269 if let Some(expected_hash) = expected_hash
1271 && block.hash() != *expected_hash
1272 {
1273 return Err(InsertBlockResponseError::InvalidBlockHash {
1274 height,
1275 peer_ip,
1276 expected_hash: *expected_hash,
1277 actual_hash: block.hash(),
1278 });
1279 }
1280 if let Some(expected_previous_hash) = expected_previous_hash
1282 && block.previous_hash() != *expected_previous_hash
1283 {
1284 return Err(InsertBlockResponseError::InvalidPreviousBlockHash {
1285 height,
1286 peer_ip,
1287 expected: *expected_previous_hash,
1288 actual: block.previous_hash(),
1289 });
1290 }
1291 if !sync_ips.contains(&peer_ip) {
1293 return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip });
1294 }
1295
1296 entry.sync_ips_mut().swap_remove(&peer_ip);
1298
1299 if let Some(existing_block) = &entry.response {
1300 if block != *existing_block {
1302 return Err(InsertBlockResponseError::MalformedBlock { height, peer_ip });
1303 }
1304 } else {
1305 entry.response = Some(block.clone());
1306 }
1307
1308 trace!("Received a new and valid block response for height {height}");
1309
1310 self.last_response_at.lock().insert(peer_ip, Instant::now());
1313
1314 self.response_notify.notify_one();
1316
1317 Ok(())
1318 }
1319
1320 fn check_block_request(&self, height: u32) -> Result<()> {
1322 if self.ledger.contains_block_height(height) {
1324 bail!("Failed to add block request, as block {height} exists in the ledger");
1325 }
1326 if self.requests.read().contains_key(&height) {
1328 bail!("Failed to add block request, as block {height} exists in the requests map");
1329 }
1330
1331 Ok(())
1332 }
1333
1334 pub fn remove_block_response(&self, height: u32) {
1341 if let Some(e) = self.requests.write().remove(&height) {
1343 trace!(
1344 "Block request for height {height} was completed in {}ms (sync speed is {})",
1345 e.timestamp.elapsed().as_millis(),
1346 self.get_sync_speed()
1347 );
1348
1349 self.peer_notify.notify_one();
1351 }
1352 }
1353
1354 fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
1358 trace!("Block sync is removing all block requests to peer {peer_ip}...");
1359 let mut heights = vec![];
1360 let mut removed_requests = vec![];
1361
1362 self.requests.write().retain(|height, e| {
1365 let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
1366
1367 if had_peer && e.response.is_none() {
1368 trace!("Removed outstanding block request to peer {peer_ip} at height {height}");
1369 heights.push(*height);
1370 }
1371
1372 let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
1375 if !retain {
1376 let (hash, previous_hash, _) = &e.request;
1378 removed_requests.push((*height, (*hash, *previous_hash)));
1379 }
1380 retain
1381 });
1382
1383 if !heights.is_empty() {
1384 debug!(
1385 "Removed outstanding block requests to disconnecting peer '{peer_ip}' at heights: {}. {} were fully removed.",
1386 rangify_heights(heights),
1387 removed_requests.len(),
1388 );
1389 }
1390
1391 if !removed_requests.is_empty() {
1393 let mut failed_requests = self.failed_requests.lock();
1394 for (height, e) in removed_requests.into_iter() {
1395 let prev = failed_requests.insert(height, e);
1396 if prev.is_some() {
1397 warn!(
1398 "Failed to mark block request at height {height} as failed, as it already exists in the failed requests map"
1399 );
1400 }
1401 }
1402 }
1403
1404 }
1406
1407 pub fn handle_block_request_timeouts(&self) {
1411 let responsive_peers: HashSet<SocketAddr> = {
1416 let last_response_at = self.last_response_at.lock();
1417 let now = Instant::now();
1418 last_response_at
1419 .iter()
1420 .filter_map(|(peer, t)| (now.duration_since(*t) <= BLOCK_REQUEST_TIMEOUT).then_some(*peer))
1421 .collect()
1422 };
1423
1424 let (timed_out_requests, peers_to_ban) = {
1426 let mut requests = self.requests.write();
1428
1429 let now = Instant::now();
1431
1432 let current_height = self.ledger.latest_block_height();
1434
1435 let mut timed_out_requests = vec![];
1437
1438 let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1440
1441 requests.retain(|height, e| {
1443 let is_obsolete = *height <= current_height;
1444 let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1446 let is_complete = e.sync_ips().is_empty() && e.response.is_some();
1448 let has_responsive_peer = e.sync_ips().iter().any(|ip| responsive_peers.contains(ip));
1450
1451 let is_timeout = timer_elapsed && !is_complete && !has_responsive_peer;
1453
1454 let retain = !is_timeout && !is_obsolete;
1456
1457 if is_timeout {
1458 trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1459
1460 let (hash, previous_hash, _) = &e.request;
1462 timed_out_requests.push((*height, (*hash, *previous_hash)));
1463 } else if is_obsolete {
1464 trace!("Block request at height {height} became obsolete (current_height={current_height})");
1465 }
1466
1467 if is_timeout {
1469 for peer_ip in e.sync_ips().iter() {
1470 peers_to_ban.insert(*peer_ip);
1471 }
1472 }
1473
1474 retain
1475 });
1476
1477 if !timed_out_requests.is_empty() {
1478 debug!(
1479 "{num} block requests timed out: {list}",
1480 num = timed_out_requests.len(),
1481 list = rangify_heights(timed_out_requests.iter().map(|(height, _)| *height))
1482 );
1483 }
1484
1485 (timed_out_requests, peers_to_ban)
1486 };
1487
1488 if !timed_out_requests.is_empty() {
1490 let mut failed_requests = self.failed_requests.lock();
1491 for (height, e) in timed_out_requests.into_iter() {
1492 let prev = failed_requests.insert(height, e);
1493 if prev.is_some() {
1494 warn!(
1495 "Failed to mark block request at height {height} as failed, as it already exists in the failed requests map"
1496 );
1497 }
1498 }
1499 }
1500
1501 for peer_ip in peers_to_ban {
1505 self.remove_peer(&peer_ip);
1506 }
1509 }
1510
1511 fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1519 let latest_ledger_height = self.ledger.latest_block_height();
1521
1522 let candidate_locators: IndexMap<_, _> = self
1525 .locators
1526 .read()
1527 .iter()
1528 .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1529 .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1530 .take(NUM_SYNC_CANDIDATE_PEERS)
1531 .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1532 .collect();
1533
1534 if candidate_locators.is_empty() {
1536 trace!("Found no sync peers with height greater {current_height}");
1537 return None;
1538 }
1539
1540 let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1547
1548 for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1551 let mut min_common_ancestor = peer_locators.latest_locator_height();
1553
1554 let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1557
1558 for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1560 if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1562 if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1564 min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1566
1567 sync_peers.push((*other_ip, other_locators.clone()));
1569 }
1570 }
1571 }
1572
1573 if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1575 sync_peers.shuffle(&mut rand::rng());
1578
1579 return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1581 }
1582 }
1583
1584 None
1586 }
1587
1588 fn construct_requests(
1593 &self,
1594 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1595 sync_height: u32,
1596 min_common_ancestor: u32,
1597 max_blocks_to_request: u32,
1598 greatest_peer_height: u32,
1599 ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1600 let start_height = {
1602 let requests = self.requests.read();
1603 let ledger_height = self.ledger.latest_block_height();
1604
1605 let mut start_height = ledger_height.max(sync_height + 1);
1607
1608 while requests.contains_key(&start_height) {
1610 start_height += 1;
1611 }
1612
1613 start_height
1614 };
1615
1616 if min_common_ancestor < start_height {
1618 if start_height < greatest_peer_height {
1619 trace!(
1620 "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1621 );
1622 }
1623 return Default::default();
1624 }
1625
1626 let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1628
1629 let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1631 let mut max_num_sync_ips = 1;
1633
1634 for height in start_height..end_height {
1635 if self.check_block_request(height).is_err() {
1637 match request_hashes.is_empty() {
1640 true => continue,
1641 false => break,
1642 }
1643 }
1644
1645 let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1647
1648 if !is_honest {
1650 warn!("Detected dishonest peer(s) when preparing block request");
1652 if sync_peers.len() < num_sync_ips {
1654 break;
1655 }
1656 }
1657
1658 max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1660
1661 request_hashes.insert(height, (hash, previous_hash));
1663 }
1664
1665 request_hashes
1667 .into_iter()
1668 .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1669 .collect()
1670 }
1671}
1672
1673fn construct_request<N: Network>(
1676 height: u32,
1677 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1678) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1679 let mut hash = None;
1680 let mut hash_redundancy: usize = 0;
1681 let mut previous_hash = None;
1682 let mut is_honest = true;
1683
1684 for peer_locators in sync_peers.values() {
1685 if let Some(candidate_hash) = peer_locators.get_hash(height) {
1686 match hash {
1687 Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1689 Some(_) => {
1691 hash = None;
1692 hash_redundancy = 0;
1693 previous_hash = None;
1694 is_honest = false;
1695 break;
1696 }
1697 None => {
1699 hash = Some(candidate_hash);
1700 hash_redundancy = 1;
1701 }
1702 }
1703 }
1704 if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1705 match previous_hash {
1706 Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1708 Some(_) => {
1710 hash = None;
1711 hash_redundancy = 0;
1712 previous_hash = None;
1713 is_honest = false;
1714 break;
1715 }
1716 None => previous_hash = Some(candidate_previous_hash),
1718 }
1719 }
1720 }
1721
1722 let num_sync_ips = {
1725 if !is_honest {
1727 EXTRA_REDUNDANCY_FACTOR
1729 }
1730 else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1732 1
1734 }
1735 else {
1737 REDUNDANCY_FACTOR
1739 }
1740 };
1741
1742 (hash, previous_hash, num_sync_ips, is_honest)
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use super::*;
1748 use crate::locators::{
1749 CHECKPOINT_INTERVAL,
1750 NUM_RECENT_BLOCKS,
1751 test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1752 };
1753
1754 use snarkos_node_bft_ledger_service::MockLedgerService;
1755 use snarkvm::{
1756 ledger::committee::Committee,
1757 prelude::{Field, TestRng},
1758 };
1759
1760 use indexmap::{IndexSet, indexset};
1761 #[cfg(feature = "locktick")]
1762 use locktick::parking_lot::RwLock;
1763 #[cfg(not(feature = "locktick"))]
1764 use parking_lot::RwLock;
1765 use rand::RngExt;
1766 use std::net::{IpAddr, Ipv4Addr};
1767
1768 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1769
1770 fn sample_peer_ip(id: u16) -> SocketAddr {
1772 assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1773 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1774 }
1775
1776 fn sample_committee() -> Committee<CurrentNetwork> {
1778 let rng = &mut TestRng::default();
1779 snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1780 }
1781
1782 fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1784 MockLedgerService::new_at_height(sample_committee(), height)
1785 }
1786
1787 fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1789 BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)), ConnectionMode::Router)
1790 }
1791
1792 fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1796 assert!(num_values > 0, "Cannot generate an empty vector");
1797 assert!((max_height as usize) >= num_values);
1798
1799 let mut rng = TestRng::default();
1800
1801 let mut heights: Vec<u32> = (0..(max_height - 1)).sample(&mut rng, num_values);
1802
1803 heights.push(max_height);
1804
1805 heights
1806 }
1807
1808 fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1810 BlockSync::<CurrentNetwork> {
1811 failed_requests: Default::default(),
1812 peer_notify: Notify::new(),
1813 response_notify: Default::default(),
1814 ledger: Arc::new(sample_ledger_service(height)),
1815 connection_mode: sync.connection_mode,
1816 locators: RwLock::new(sync.locators.read().clone()),
1817 common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1818 requests: RwLock::new(sync.requests.read().clone()),
1819 sync_state: RwLock::new(sync.sync_state.read().clone()),
1820 synced_notify: Notify::new(),
1821 advance_with_sync_blocks_lock: Default::default(),
1822 metrics: Default::default(),
1823 prepare_requests_lock: Default::default(),
1824 last_response_at: Default::default(),
1825 }
1826 }
1827
1828 fn check_prepare_block_requests(
1830 sync: BlockSync<CurrentNetwork>,
1831 min_common_ancestor: u32,
1832 peers: IndexSet<SocketAddr>,
1833 ) {
1834 let rng = &mut TestRng::default();
1835
1836 assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1838
1839 let num_peers_within_recent_range_of_ledger = {
1841 if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1843 0
1844 }
1845 else {
1847 peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1848 }
1849 };
1850
1851 let mut batches = sync.prepare_block_requests();
1853
1854 if peers.is_empty() {
1856 assert!(batches.is_empty());
1857 return;
1858 }
1859
1860 let (requests, sync_peers) = batches.pop().unwrap();
1861
1862 let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1864 assert_eq!(requests.len(), expected_num_requests);
1865
1866 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1867 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
1869 assert_eq!(height, 1 + idx as u32);
1870 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1871 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1872
1873 if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1874 assert_eq!(sync_ips.len(), 1);
1875 } else {
1876 assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1877 assert_eq!(sync_ips, peers);
1878 }
1879 }
1880 }
1881
1882 #[test]
1884 fn test_latest_block_height() {
1885 for height in generate_block_heights(100_001, 5000) {
1886 let sync = sample_sync_at_height(height);
1887 assert_eq!(sync.ledger.latest_block_height(), height);
1889
1890 assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1892 assert_eq!(
1893 sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1894 height
1895 );
1896 }
1897 }
1898
1899 #[test]
1900 fn test_get_block_hash() {
1901 for height in generate_block_heights(100_001, 5000) {
1902 let sync = sample_sync_at_height(height);
1903
1904 assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1906 assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1907 }
1908 }
1909
1910 #[test]
1911 fn test_prepare_block_requests() {
1912 for num_peers in 0..111 {
1913 println!("Testing with {num_peers} peers");
1914
1915 let sync = sample_sync_at_height(0);
1916
1917 let mut peers = indexset![];
1918
1919 for peer_id in 1..=num_peers {
1920 sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1922 peers.insert(sample_peer_ip(peer_id));
1924 }
1925
1926 check_prepare_block_requests(sync, 10, peers);
1928 }
1929 }
1930
1931 #[test]
1932 fn test_prepare_block_requests_with_leading_fork_at_11() {
1933 let sync = sample_sync_at_height(0);
1934
1935 let peer_1 = sample_peer_ip(1);
1946 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1947
1948 let peer_2 = sample_peer_ip(2);
1950 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1951
1952 let peer_3 = sample_peer_ip(3);
1954 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1955
1956 let (requests, _) = sync.prepare_block_requests().pop().unwrap();
1958 assert_eq!(requests.len(), 10);
1959
1960 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1962 assert_eq!(height, 1 + idx as u32);
1963 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1964 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1965 assert_eq!(num_sync_ips, 1); }
1967 }
1968
1969 #[test]
1970 fn test_prepare_block_requests_with_leading_fork_at_10() {
1971 let rng = &mut TestRng::default();
1972 let sync = sample_sync_at_height(0);
1973
1974 let peer_1 = sample_peer_ip(1);
1989 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1990
1991 let peer_2 = sample_peer_ip(2);
1993 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1994
1995 let peer_3 = sample_peer_ip(3);
1997 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1998
1999 let batches = sync.prepare_block_requests();
2001 assert!(batches.is_empty());
2002
2003 let peer_4 = sample_peer_ip(4);
2007 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
2008
2009 let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2011 assert_eq!(requests.len(), 10);
2012
2013 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
2015 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2017 assert_eq!(height, 1 + idx as u32);
2018 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
2019 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
2020 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_1); }
2023 }
2024
2025 #[test]
2026 fn test_prepare_block_requests_with_trailing_fork_at_9() {
2027 let rng = &mut TestRng::default();
2028 let sync = sample_sync_at_height(0);
2029
2030 let peer_1 = sample_peer_ip(1);
2036 sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
2037
2038 let peer_2 = sample_peer_ip(2);
2040 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
2041
2042 let peer_3 = sample_peer_ip(3);
2044 sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
2045
2046 let batches = sync.prepare_block_requests();
2048 assert!(batches.is_empty());
2049
2050 let peer_4 = sample_peer_ip(4);
2054 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
2055
2056 let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2058 assert_eq!(requests.len(), 10);
2059
2060 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
2062 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2064 assert_eq!(height, 1 + idx as u32);
2065 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
2066 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
2067 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_3); }
2070 }
2071
2072 #[test]
2073 fn test_insert_block_requests() {
2074 let rng = &mut TestRng::default();
2075 let sync = sample_sync_at_height(0);
2076
2077 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
2079
2080 let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2082 assert_eq!(requests.len(), 10);
2083
2084 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2085 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2087 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2089 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2091 assert!(sync.get_block_request_timestamp(height).is_some());
2092 }
2093
2094 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2095 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2097 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2099 assert!(sync.get_block_request_timestamp(height).is_some());
2100 }
2101
2102 for (height, (hash, previous_hash, num_sync_ips)) in requests {
2103 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2105 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
2107 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2109 assert!(sync.get_block_request_timestamp(height).is_some());
2110 }
2111 }
2112
2113 #[test]
2114 fn test_insert_block_requests_fails() {
2115 let sync = sample_sync_at_height(9);
2116
2117 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
2119
2120 sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
2122 sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
2124 }
2125
2126 #[test]
2127 fn test_update_peer_locators() {
2128 let sync = sample_sync_at_height(0);
2129
2130 let peer1_ip = sample_peer_ip(1);
2132 for peer1_height in 0..500u32 {
2133 sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
2134 assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
2135
2136 let peer2_ip = sample_peer_ip(2);
2137 for peer2_height in 0..500u32 {
2138 println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
2139
2140 sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
2141 assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
2142
2143 let distance = peer1_height.abs_diff(peer2_height);
2145
2146 if distance < NUM_RECENT_BLOCKS as u32 {
2148 let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
2149 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
2150 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
2151 } else {
2152 let min_checkpoints =
2153 core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
2154 let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
2155 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
2156 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
2157 }
2158 }
2159 }
2160 }
2161
2162 #[test]
2163 fn test_remove_peer() {
2164 let sync = sample_sync_at_height(0);
2165
2166 let peer_ip = sample_peer_ip(1);
2167 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
2168 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
2169
2170 sync.remove_peer(&peer_ip);
2171 assert_eq!(sync.get_peer_height(&peer_ip), None);
2172
2173 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
2174 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
2175
2176 sync.remove_peer(&peer_ip);
2177 assert_eq!(sync.get_peer_height(&peer_ip), None);
2178 }
2179
2180 #[test]
2181 fn test_locators_insert_remove_insert() {
2182 let sync = sample_sync_at_height(0);
2183
2184 let peer_ip = sample_peer_ip(1);
2185 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
2186 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
2187
2188 sync.remove_peer(&peer_ip);
2189 assert_eq!(sync.get_peer_height(&peer_ip), None);
2190
2191 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
2192 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
2193 }
2194
2195 #[test]
2196 fn test_requests_insert_remove_insert() {
2197 let rng = &mut TestRng::default();
2198 let sync = sample_sync_at_height(0);
2199
2200 let peer_ip = sample_peer_ip(1);
2202 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
2203
2204 let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2206 assert_eq!(requests.len(), 10);
2207
2208 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2209 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2211 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2213 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2215 assert!(sync.get_block_request_timestamp(height).is_some());
2216 }
2217
2218 sync.remove_peer(&peer_ip);
2220
2221 for (height, _) in requests {
2222 assert_eq!(sync.get_block_request(height), None);
2224 assert!(sync.get_block_request_timestamp(height).is_none());
2225 }
2226
2227 let batches = sync.prepare_block_requests();
2229 assert!(batches.is_empty());
2230
2231 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
2233
2234 let (requests, _) = sync.prepare_block_requests().pop().unwrap();
2236 assert_eq!(requests.len(), 10);
2237
2238 for (height, (hash, previous_hash, num_sync_ips)) in requests {
2239 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2241 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2243 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2245 assert!(sync.get_block_request_timestamp(height).is_some());
2246 }
2247 }
2248
2249 #[test]
2250 fn test_obsolete_block_requests() {
2251 let rng = &mut TestRng::default();
2252 let sync = sample_sync_at_height(0);
2253
2254 let locator_height = rng.random_range(1..50);
2258
2259 let locators = sample_block_locators(locator_height);
2261 sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
2262
2263 let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2265 assert_eq!(requests.len(), locator_height as usize);
2266
2267 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2269 let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2271 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2273 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2275 assert!(sync.get_block_request_timestamp(height).is_some());
2276 }
2277
2278 let ledger_height = rng.random_range(0..=locator_height);
2282 let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
2283
2284 assert_eq!(new_sync.requests.read().len(), requests.len());
2286
2287 new_sync.handle_block_request_timeouts();
2289
2290 assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
2292 }
2293
2294 #[test]
2295 fn test_timed_out_block_request() {
2296 let sync = sample_sync_at_height(0);
2297 let peer_ip = sample_peer_ip(1);
2298 let locators = sample_block_locators(10);
2299 let block_hash = locators.get_hash(1);
2300
2301 sync.update_peer_locators(peer_ip, &locators).unwrap();
2302
2303 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2304
2305 sync.requests.write().insert(1, OutstandingRequest {
2307 request: (block_hash, None, [peer_ip].into()),
2308 timestamp,
2309 response: None,
2310 });
2311
2312 assert_eq!(sync.requests.read().len(), 1);
2313 assert_eq!(sync.locators.read().len(), 1);
2314
2315 sync.handle_block_request_timeouts();
2317
2318 assert!(sync.requests.read().is_empty());
2323 assert!(sync.locators.read().is_empty());
2324 }
2325
2326 #[test]
2327 fn test_reissue_timed_out_block_request() {
2328 let sync = sample_sync_at_height(0);
2329 let peer_ip1 = sample_peer_ip(1);
2330 let peer_ip2 = sample_peer_ip(2);
2331 let peer_ip3 = sample_peer_ip(3);
2332
2333 let locators = sample_block_locators(10);
2334 let block_hash1 = locators.get_hash(1);
2335 let block_hash2 = locators.get_hash(2);
2336
2337 sync.update_peer_locators(peer_ip1, &locators).unwrap();
2338 sync.update_peer_locators(peer_ip2, &locators).unwrap();
2339 sync.update_peer_locators(peer_ip3, &locators).unwrap();
2340
2341 assert_eq!(sync.locators.read().len(), 3);
2342
2343 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2344
2345 sync.requests.write().insert(1, OutstandingRequest {
2347 request: (block_hash1, None, [peer_ip1].into()),
2348 timestamp,
2349 response: None,
2350 });
2351
2352 sync.requests.write().insert(2, OutstandingRequest {
2354 request: (block_hash2, None, [peer_ip2].into()),
2355 timestamp: Instant::now(),
2356 response: None,
2357 });
2358
2359 assert_eq!(sync.requests.read().len(), 2);
2360
2361 sync.handle_block_request_timeouts();
2363
2364 assert_eq!(sync.requests.read().len(), 1);
2369 assert_eq!(sync.locators.read().len(), 2);
2370
2371 let failed_requests = sync.failed_requests.lock();
2372 assert_eq!(failed_requests.len(), 1);
2373
2374 let (height, (hash, _)) = failed_requests.iter().next().unwrap();
2375 assert_eq!(*height, 1);
2376 assert_eq!(*hash, block_hash1);
2377 }
2385}