1use crate::{
17 helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18 locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_network::PeerPoolHandling;
22use snarkos_node_router::messages::DataBlocks;
23use snarkos_node_sync_communication_service::CommunicationService;
24use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
25
26use snarkvm::{
27 console::network::{ConsensusVersion, Network},
28 prelude::block::Block,
29 utilities::ensure_equals,
30};
31
32use anyhow::{Result, bail, ensure};
33use indexmap::{IndexMap, IndexSet};
34use itertools::Itertools;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(feature = "locktick")]
38use locktick::tokio::Mutex as TMutex;
39#[cfg(not(feature = "locktick"))]
40use parking_lot::RwLock;
41use rand::seq::{IteratorRandom, SliceRandom};
42use std::{
43 collections::{BTreeMap, HashMap, HashSet, hash_map},
44 net::{IpAddr, Ipv4Addr, SocketAddr},
45 sync::Arc,
46 time::{Duration, Instant},
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::sync::Notify;
51
52mod helpers;
53use helpers::rangify_heights;
54
55mod sync_state;
56use sync_state::SyncState;
57
58mod metrics;
59use metrics::BlockSyncMetrics;
60
61#[cfg(not(test))]
65pub const REDUNDANCY_FACTOR: usize = 1;
66#[cfg(test)]
67pub const REDUNDANCY_FACTOR: usize = 3;
68
69pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
76
77const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
78const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
79
80const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
81
82const MAX_BLOCK_REQUESTS: usize = 50; pub const MAX_BLOCKS_BEHIND: u32 = 1; pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
92
93#[derive(Clone)]
96struct OutstandingRequest<N: Network> {
97 request: SyncRequest<N>,
98 timestamp: Instant,
99 response: Option<Block<N>>,
102}
103
104#[derive(Clone, serde::Serialize)]
106pub struct BlockRequestInfo {
107 elapsed: u64,
109 done: bool,
111}
112
113#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestsSummary {
116 outstanding: String,
117 completed: String,
118}
119
120impl<N: Network> OutstandingRequest<N> {
121 fn sync_ips(&self) -> &IndexSet<SocketAddr> {
123 let (_, _, sync_ips) = &self.request;
124 sync_ips
125 }
126
127 fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
129 let (_, _, sync_ips) = &mut self.request;
130 sync_ips
131 }
132}
133
134pub struct BlockSync<N: Network> {
147 ledger: Arc<dyn LedgerService<N>>,
149
150 locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
153
154 common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
159
160 requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
162
163 sync_state: RwLock<SyncState>,
167
168 advance_with_sync_blocks_lock: TMutex<()>,
170
171 peer_notify: Notify,
173
174 response_notify: Notify,
176
177 metrics: BlockSyncMetrics,
179}
180
181impl<N: Network> BlockSync<N> {
182 pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
184 let sync_state = SyncState::new_with_height(ledger.latest_block_height());
186
187 Self {
188 ledger,
189 sync_state: RwLock::new(sync_state),
190 peer_notify: Default::default(),
191 response_notify: Default::default(),
192 locators: Default::default(),
193 requests: Default::default(),
194 common_ancestors: Default::default(),
195 advance_with_sync_blocks_lock: Default::default(),
196 metrics: Default::default(),
197 }
198 }
199
200 pub async fn wait_for_peer_update(&self) {
205 self.peer_notify.notified().await
206 }
207
208 pub async fn wait_for_block_responses(&self) {
212 self.response_notify.notified().await
213 }
214
215 #[inline]
217 pub fn is_block_synced(&self) -> bool {
218 self.sync_state.read().is_block_synced()
219 }
220
221 #[inline]
227 pub fn can_block_sync(&self) -> bool {
228 self.sync_state.read().can_block_sync() || self.has_pending_responses()
229 }
230
231 #[inline]
234 pub fn num_blocks_behind(&self) -> Option<u32> {
235 self.sync_state.read().num_blocks_behind()
236 }
237
238 #[inline]
240 pub fn greatest_peer_block_height(&self) -> Option<u32> {
241 self.sync_state.read().get_greatest_peer_height()
242 }
243
244 #[inline]
247 pub fn get_sync_height(&self) -> u32 {
248 self.sync_state.read().get_sync_height()
249 }
250
251 #[inline]
253 pub fn num_outstanding_block_requests(&self) -> usize {
254 self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
255 }
256
257 #[inline]
259 pub fn num_total_block_requests(&self) -> usize {
260 self.requests.read().len()
261 }
262
263 pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
265 self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
266 }
267
268 pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
270 self.requests
271 .read()
272 .iter()
273 .map(|(height, request)| {
274 (*height, BlockRequestInfo {
275 done: request.sync_ips().is_empty(),
276 elapsed: request.timestamp.elapsed().as_secs(),
277 })
278 })
279 .collect()
280 }
281
282 pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
284 let completed = self
285 .requests
286 .read()
287 .iter()
288 .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
289 .collect::<Vec<_>>();
290
291 let outstanding = self
292 .requests
293 .read()
294 .iter()
295 .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
296 .collect::<Vec<_>>();
297
298 BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
299 }
300
301 pub fn get_sync_speed(&self) -> f64 {
302 self.metrics.get_sync_speed()
303 }
304}
305
306#[cfg(test)]
308impl<N: Network> BlockSync<N> {
309 fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
311 self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
312 }
313
314 fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
316 self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
317 }
318
319 fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
321 self.requests.read().get(&height).map(|e| e.request.clone())
322 }
323
324 fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
326 self.requests.read().get(&height).map(|e| e.timestamp)
327 }
328}
329
330impl<N: Network> BlockSync<N> {
331 #[inline]
333 pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
334 let latest_height = self.ledger.latest_block_height();
336
337 let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
340 for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
342 recents.insert(height, self.ledger.get_block_hash(height)?);
343 }
344
345 let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
347 for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
349 checkpoints.insert(height, self.ledger.get_block_hash(height)?);
350 }
351
352 BlockLocators::new(recents, checkpoints)
354 }
355
356 pub fn has_pending_responses(&self) -> bool {
358 self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
359 }
360
361 pub async fn send_block_requests<C: CommunicationService>(
363 &self,
364 communication: &C,
365 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
366 requests: &[(u32, PrepareSyncRequest<N>)],
367 ) -> bool {
368 let (start_height, max_num_sync_ips) = match requests.first() {
369 Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
370 None => {
371 warn!("Block sync failed - no block requests");
372 return false;
373 }
374 };
375
376 debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
377
378 let sync_ips: IndexSet<_> =
380 sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
381
382 let end_height = start_height.saturating_add(requests.len() as u32);
384
385 for (height, (hash, previous_hash, _)) in requests.iter() {
387 if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
389 warn!("Block sync failed - {error}");
390 return false;
391 }
392 }
393
394 let message = C::prepare_block_request(start_height, end_height);
398
399 let mut tasks = Vec::with_capacity(sync_ips.len());
401 for sync_ip in sync_ips {
402 let sender = communication.send(sync_ip, message.clone()).await;
403 let task = tokio::spawn(async move {
404 match sender {
406 Some(sender) => {
407 if let Err(err) = sender.await {
408 warn!("Failed to send block request to peer '{sync_ip}': {err}");
409 false
410 } else {
411 true
412 }
413 }
414 None => {
415 warn!("Failed to send block request to peer '{sync_ip}': no such peer");
416 false
417 }
418 }
419 });
420
421 tasks.push(task);
422 }
423
424 for result in futures::future::join_all(tasks).await {
426 let success = match result {
427 Ok(success) => success,
428 Err(err) => {
429 error!("tokio join error: {err}");
430 false
431 }
432 };
433
434 if !success {
436 let mut requests = self.requests.write();
438 for height in start_height..end_height {
439 requests.remove(&height);
440 }
441 return false;
443 }
444 }
445 true
446 }
447
448 #[inline]
456 pub fn insert_block_responses(
457 &self,
458 peer_ip: SocketAddr,
459 blocks: Vec<Block<N>>,
460 latest_consensus_version: Option<ConsensusVersion>,
461 ) -> Result<()> {
462 let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
463 bail!("Empty block response");
464 };
465
466 let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
467
468 if expected_consensus_version >= ConsensusVersion::V12 {
471 if let Some(latest_consensus_version) = latest_consensus_version {
472 ensure_equals!(
473 expected_consensus_version,
474 latest_consensus_version,
475 "the peer's consensus version for height {last_height} does not match ours"
476 );
477 } else {
478 bail!("The peer did not send a consensus version");
479 }
480 }
481
482 for block in blocks {
484 if let Err(error) = self.insert_block_response(peer_ip, block) {
485 self.remove_block_requests_to_peer(&peer_ip);
486 bail!("{error}");
487 }
488 }
489 Ok(())
490 }
491
492 #[inline]
495 pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
496 if let Some(entry) = self.requests.read().get(&next_height) {
499 let is_complete = entry.sync_ips().is_empty();
500 if !is_complete {
501 return None;
502 }
503
504 if entry.response.is_none() {
506 warn!("Request for height {next_height} is complete but no response exists");
507 }
508 entry.response.clone()
509 } else {
510 None
511 }
512 }
513
514 #[inline]
523 pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
524 let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
531 trace!("Skipping attempt to advance block synchronziation as it is already in progress");
532 return Ok(false);
533 };
534
535 let mut current_height = self.ledger.latest_block_height();
537 let start_height = current_height;
538 trace!(
539 "Try advancing with block responses (at block {current_height}, current sync speed is {})",
540 self.get_sync_speed()
541 );
542
543 loop {
544 let next_height = current_height + 1;
545
546 let Some(block) = self.peek_next_block(next_height) else {
547 break;
548 };
549
550 if block.height() != next_height {
552 warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
553 break;
554 }
555
556 let ledger = self.ledger.clone();
557 let advanced = tokio::task::spawn_blocking(move || {
558 match ledger.check_next_block(&block) {
560 Ok(_) => match ledger.advance_to_next_block(&block) {
561 Ok(_) => true,
562 Err(err) => {
563 warn!(
564 "Failed to advance to next block (height: {}, hash: '{}'): {err}",
565 block.height(),
566 block.hash()
567 );
568 false
569 }
570 },
571 Err(err) => {
572 warn!(
573 "The next block (height: {}, hash: '{}') is invalid - {err}",
574 block.height(),
575 block.hash()
576 );
577 false
578 }
579 }
580 })
581 .await?;
582
583 if advanced {
585 self.count_request_completed();
586 }
587
588 self.remove_block_response(next_height);
590
591 if !advanced {
593 break;
594 }
595
596 current_height = next_height;
598 }
599
600 if current_height > start_height {
601 self.set_sync_height(current_height);
602 Ok(true)
603 } else {
604 Ok(false)
605 }
606 }
607}
608
609impl<N: Network> BlockSync<N> {
610 pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
617 let current_height = self.get_sync_height();
619
620 if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
621 let sync_peers =
623 sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
624 Some((sync_peers, min_common_ancestor))
626 } else {
627 None
628 }
629 }
630
631 pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
639 match self.locators.write().entry(peer_ip) {
642 hash_map::Entry::Occupied(mut e) => {
643 if e.get() == locators {
645 return Ok(());
646 }
647
648 let old_height = e.get().latest_locator_height();
649 let new_height = locators.latest_locator_height();
650
651 if old_height > new_height {
652 debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
653 }
654 e.insert(locators.clone());
655 }
656 hash_map::Entry::Vacant(e) => {
657 e.insert(locators.clone());
658 }
659 }
660
661 let new_local_ancestor = {
663 let mut ancestor = 0;
664 for (height, hash) in locators.clone().into_iter() {
668 if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
669 match ledger_hash == hash {
670 true => ancestor = height,
671 false => {
672 debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
673 break;
674 }
675 }
676 }
677 }
678 ancestor
679 };
680
681 let ancestor_updates: Vec<_> = self
684 .locators
685 .read()
686 .iter()
687 .filter_map(|(other_ip, other_locators)| {
688 if other_ip == &peer_ip {
690 return None;
691 }
692 let mut ancestor = 0;
694 for (height, hash) in other_locators.clone().into_iter() {
695 if let Some(expected_hash) = locators.get_hash(height) {
696 match expected_hash == hash {
697 true => ancestor = height,
698 false => {
699 debug!(
700 "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
701 );
702 break;
703 }
704 }
705 }
706 }
707
708 Some((PeerPair(peer_ip, *other_ip), ancestor))
709 })
710 .collect();
711
712 {
715 let mut common_ancestors = self.common_ancestors.write();
716 common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
717
718 for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
719 common_ancestors.insert(peer_pair, new_ancestor);
720 }
721 }
722
723 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
725 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
726 } else {
727 error!("Got new block locators but greatest peer height is zero.");
728 }
729
730 self.peer_notify.notify_one();
732
733 Ok(())
734 }
735
736 pub fn remove_peer(&self, peer_ip: &SocketAddr) {
740 trace!("Removing peer {peer_ip} from block sync");
741
742 self.locators.write().remove(peer_ip);
744 self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
746 self.remove_block_requests_to_peer(peer_ip);
748
749 if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
751 self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
752 } else {
753 self.sync_state.write().clear_greatest_peer_height();
755 }
756
757 self.peer_notify.notify_one();
759 }
760}
761
762pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
764
765impl<N: Network> BlockSync<N> {
766 pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
780 let print_requests = || {
782 if tracing::enabled!(tracing::Level::TRACE) {
783 let summary = self.get_block_requests_summary();
784
785 trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
786 trace!("The following requests are still outstanding: {:?}", summary.outstanding);
787 }
788 };
789
790 let current_height = self.get_sync_height();
792
793 let max_outstanding_block_requests =
795 (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
796
797 let max_total_requests = 4 * max_outstanding_block_requests;
799
800 let max_new_blocks_to_request =
801 max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
802
803 if self.num_total_block_requests() >= max_total_requests as usize {
805 trace!(
806 "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
807 );
808
809 print_requests();
810 Default::default()
811 } else if max_new_blocks_to_request == 0 {
812 trace!(
813 "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
814 );
815
816 print_requests();
817 Default::default()
818 } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
819 let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
822
823 let requests = self.construct_requests(
825 &sync_peers,
826 current_height,
827 min_common_ancestor,
828 max_new_blocks_to_request,
829 greatest_peer_height,
830 );
831
832 (requests, sync_peers)
833 } else if self.requests.read().is_empty() {
834 Default::default()
840 } else {
841 trace!("No new blocks can be requested, but there are still outstanding requests.");
843
844 print_requests();
845 Default::default()
846 }
847 }
848
849 pub fn count_request_completed(&self) {
854 self.metrics.count_request_completed();
855 }
856
857 pub fn set_sync_height(&self, new_height: u32) {
860 let fully_synced = {
862 let mut state = self.sync_state.write();
863 state.set_sync_height(new_height);
864 !state.can_block_sync()
865 };
866
867 if fully_synced {
868 self.metrics.mark_fully_synced();
869 }
870 }
871
872 fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
874 self.check_block_request(height)?;
876 ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
878 self.requests.write().insert(height, OutstandingRequest {
880 request: (hash, previous_hash, sync_ips),
881 timestamp: Instant::now(),
882 response: None,
883 });
884 Ok(())
885 }
886
887 fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
890 let height = block.height();
892 let mut requests = self.requests.write();
893
894 if self.ledger.contains_block_height(height) {
895 bail!("The sync request was removed because we already advanced");
896 }
897
898 let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
899
900 let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
902
903 if let Some(expected_hash) = expected_hash {
905 if block.hash() != *expected_hash {
906 bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
907 }
908 }
909 if let Some(expected_previous_hash) = expected_previous_hash {
911 if block.previous_hash() != *expected_previous_hash {
912 bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
913 }
914 }
915 if !sync_ips.contains(&peer_ip) {
917 bail!("The sync pool did not request block {height} from '{peer_ip}'")
918 }
919
920 entry.sync_ips_mut().swap_remove(&peer_ip);
922
923 if let Some(existing_block) = &entry.response {
924 if block != *existing_block {
926 bail!("Candidate block {height} from '{peer_ip}' is malformed");
927 }
928 } else {
929 entry.response = Some(block.clone());
930 }
931
932 self.response_notify.notify_one();
934
935 Ok(())
936 }
937
938 fn check_block_request(&self, height: u32) -> Result<()> {
940 if self.ledger.contains_block_height(height) {
942 bail!("Failed to add block request, as block {height} exists in the ledger");
943 }
944 if self.requests.read().contains_key(&height) {
946 bail!("Failed to add block request, as block {height} exists in the requests map");
947 }
948
949 Ok(())
950 }
951
952 pub fn remove_block_response(&self, height: u32) {
959 if let Some(e) = self.requests.write().remove(&height) {
961 trace!(
962 "Block request for height {height} was completed in {}ms (sync speed is {})",
963 e.timestamp.elapsed().as_millis(),
964 self.get_sync_speed()
965 );
966
967 self.peer_notify.notify_one();
969 }
970 }
971
972 fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
976 trace!("Block sync is removing all block requests to peer {peer_ip}...");
977
978 self.requests.write().retain(|height, e| {
981 let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
982
983 let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
986 if !retain {
987 trace!("Removed block request timestamp for {peer_ip} at height {height}");
988 }
989 retain
990 });
991
992 }
994
995 pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1007 &self,
1008 peer_pool_handler: &P,
1009 ) -> Result<Option<BlockRequestBatch<N>>> {
1010 let mut requests = self.requests.write();
1012
1013 let now = Instant::now();
1015
1016 let current_height = self.ledger.latest_block_height();
1018
1019 let mut timed_out_requests = vec![];
1021
1022 let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1024
1025 requests.retain(|height, e| {
1027 let is_obsolete = *height <= current_height;
1028 let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1030 let is_complete = e.sync_ips().is_empty();
1032
1033 let is_timeout = timer_elapsed && !is_complete;
1035
1036 let retain = !is_timeout && !is_obsolete;
1038
1039 if is_timeout {
1040 trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1041
1042 timed_out_requests.push(*height);
1044 } else if is_obsolete {
1045 trace!("Block request at height {height} became obsolete (current_height={current_height})");
1046 }
1047
1048 if is_timeout {
1050 for peer_ip in e.sync_ips().iter() {
1051 peers_to_ban.insert(*peer_ip);
1052 }
1053 }
1054
1055 retain
1056 });
1057
1058 if !timed_out_requests.is_empty() {
1059 debug!("{num} block requests timed out", num = timed_out_requests.len());
1060 }
1061
1062 let next_request_height = requests.iter().next().map(|(h, _)| *h);
1063
1064 drop(requests);
1066
1067 for peer_ip in peers_to_ban {
1069 self.remove_peer(&peer_ip);
1070 peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1071 }
1072
1073 let sync_height = self.get_sync_height();
1081 let start_height = sync_height + 1;
1082
1083 let end_height = if let Some(next_height) = next_request_height
1084 && next_height > start_height
1085 {
1086 next_height
1088 } else {
1089 return Ok(None);
1092 };
1093
1094 let max_new_blocks_to_request = end_height - start_height;
1096
1097 let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1098 bail!("Cannot re-request blocks because no or not enough peers are connected");
1100 };
1101
1102 let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1104 bail!("Cannot re-request blocks because no or not enough peers are connected");
1106 };
1107
1108 let requests = self.construct_requests(
1110 &sync_peers,
1111 sync_height,
1112 min_common_ancestor,
1113 max_new_blocks_to_request,
1114 greatest_peer_height,
1115 );
1116
1117 if let Some((height, _)) = requests.as_slice().first() {
1120 debug!("Re-requesting blocks starting at height {height}");
1121 Ok(Some((requests, sync_peers)))
1122 } else {
1123 Ok(None)
1125 }
1126 }
1127
1128 fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1136 let latest_ledger_height = self.ledger.latest_block_height();
1138
1139 let candidate_locators: IndexMap<_, _> = self
1142 .locators
1143 .read()
1144 .iter()
1145 .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1146 .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1147 .take(NUM_SYNC_CANDIDATE_PEERS)
1148 .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1149 .collect();
1150
1151 if candidate_locators.is_empty() {
1153 trace!("Found no sync peers with height greater {current_height}");
1154 return None;
1155 }
1156
1157 let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1164
1165 for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1168 let mut min_common_ancestor = peer_locators.latest_locator_height();
1170
1171 let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1174
1175 for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1177 if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1179 if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1181 min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1183
1184 sync_peers.push((*other_ip, other_locators.clone()));
1186 }
1187 }
1188 }
1189
1190 if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1192 sync_peers.shuffle(&mut rand::thread_rng());
1195
1196 return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1198 }
1199 }
1200
1201 None
1203 }
1204
1205 fn construct_requests(
1207 &self,
1208 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1209 sync_height: u32,
1210 min_common_ancestor: u32,
1211 max_blocks_to_request: u32,
1212 greatest_peer_height: u32,
1213 ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1214 let start_height = {
1216 let requests = self.requests.read();
1217 let ledger_height = self.ledger.latest_block_height();
1218
1219 let mut start_height = ledger_height.max(sync_height + 1);
1221
1222 while requests.contains_key(&start_height) {
1224 start_height += 1;
1225 }
1226
1227 start_height
1228 };
1229
1230 if min_common_ancestor < start_height {
1232 if start_height < greatest_peer_height {
1233 trace!(
1234 "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1235 );
1236 }
1237 return Default::default();
1238 }
1239
1240 let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1242
1243 let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1245 let mut max_num_sync_ips = 1;
1247
1248 for height in start_height..end_height {
1249 if let Err(err) = self.check_block_request(height) {
1251 trace!("{err}");
1252
1253 match request_hashes.is_empty() {
1256 true => continue,
1257 false => break,
1258 }
1259 }
1260
1261 let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1263
1264 if !is_honest {
1266 warn!("Detected dishonest peer(s) when preparing block request");
1268 if sync_peers.len() < num_sync_ips {
1270 break;
1271 }
1272 }
1273
1274 max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1276
1277 request_hashes.insert(height, (hash, previous_hash));
1279 }
1280
1281 request_hashes
1283 .into_iter()
1284 .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1285 .collect()
1286 }
1287}
1288
1289fn construct_request<N: Network>(
1292 height: u32,
1293 sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1294) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1295 let mut hash = None;
1296 let mut hash_redundancy: usize = 0;
1297 let mut previous_hash = None;
1298 let mut is_honest = true;
1299
1300 for peer_locators in sync_peers.values() {
1301 if let Some(candidate_hash) = peer_locators.get_hash(height) {
1302 match hash {
1303 Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1305 Some(_) => {
1307 hash = None;
1308 hash_redundancy = 0;
1309 previous_hash = None;
1310 is_honest = false;
1311 break;
1312 }
1313 None => {
1315 hash = Some(candidate_hash);
1316 hash_redundancy = 1;
1317 }
1318 }
1319 }
1320 if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1321 match previous_hash {
1322 Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1324 Some(_) => {
1326 hash = None;
1327 hash_redundancy = 0;
1328 previous_hash = None;
1329 is_honest = false;
1330 break;
1331 }
1332 None => previous_hash = Some(candidate_previous_hash),
1334 }
1335 }
1336 }
1337
1338 let num_sync_ips = {
1341 if !is_honest {
1343 EXTRA_REDUNDANCY_FACTOR
1345 }
1346 else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1348 1
1350 }
1351 else {
1353 REDUNDANCY_FACTOR
1355 }
1356 };
1357
1358 (hash, previous_hash, num_sync_ips, is_honest)
1359}
1360
1361#[cfg(test)]
1362mod tests {
1363 use super::*;
1364 use crate::locators::{
1365 CHECKPOINT_INTERVAL,
1366 NUM_RECENT_BLOCKS,
1367 test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1368 };
1369
1370 use snarkos_node_bft_ledger_service::MockLedgerService;
1371 use snarkos_node_network::{NodeType, Peer, Resolver};
1372 use snarkos_node_tcp::{P2P, Tcp};
1373 use snarkvm::{
1374 ledger::committee::Committee,
1375 prelude::{Field, TestRng},
1376 };
1377
1378 use indexmap::{IndexSet, indexset};
1379 #[cfg(feature = "locktick")]
1380 use locktick::parking_lot::RwLock;
1381 #[cfg(not(feature = "locktick"))]
1382 use parking_lot::RwLock;
1383 use rand::Rng;
1384 use std::net::{IpAddr, Ipv4Addr};
1385
1386 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1387
1388 #[derive(Default)]
1389 struct DummyPeerPoolHandler {
1390 peers_to_ban: RwLock<Vec<SocketAddr>>,
1391 }
1392
1393 impl P2P for DummyPeerPoolHandler {
1394 fn tcp(&self) -> &Tcp {
1395 unreachable!();
1396 }
1397 }
1398
1399 impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1400 const MAXIMUM_POOL_SIZE: usize = 10;
1401 const OWNER: &str = "[DummyPeerPoolHandler]";
1402 const PEER_SLASHING_COUNT: usize = 0;
1403
1404 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1405 unreachable!();
1406 }
1407
1408 fn resolver(&self) -> &RwLock<Resolver<N>> {
1409 unreachable!();
1410 }
1411
1412 fn is_dev(&self) -> bool {
1413 true
1414 }
1415
1416 fn trusted_peers_only(&self) -> bool {
1417 false
1418 }
1419
1420 fn node_type(&self) -> NodeType {
1421 NodeType::Client
1422 }
1423
1424 fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1425 self.peers_to_ban.write().push(listener_addr);
1426 }
1427 }
1428
1429 fn sample_peer_ip(id: u16) -> SocketAddr {
1431 assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1432 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1433 }
1434
1435 fn sample_committee() -> Committee<CurrentNetwork> {
1437 let rng = &mut TestRng::default();
1438 snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1439 }
1440
1441 fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1443 MockLedgerService::new_at_height(sample_committee(), height)
1444 }
1445
1446 fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1448 BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1449 }
1450
1451 fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1455 assert!(num_values > 0, "Cannot generate an empty vector");
1456 assert!((max_height as usize) >= num_values);
1457
1458 let mut rng = TestRng::default();
1459
1460 let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1461
1462 heights.push(max_height);
1463
1464 heights
1465 }
1466
1467 fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1469 BlockSync::<CurrentNetwork> {
1470 peer_notify: Notify::new(),
1471 response_notify: Default::default(),
1472 ledger: Arc::new(sample_ledger_service(height)),
1473 locators: RwLock::new(sync.locators.read().clone()),
1474 common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1475 requests: RwLock::new(sync.requests.read().clone()),
1476 sync_state: RwLock::new(sync.sync_state.read().clone()),
1477 advance_with_sync_blocks_lock: Default::default(),
1478 metrics: Default::default(),
1479 }
1480 }
1481
1482 fn check_prepare_block_requests(
1484 sync: BlockSync<CurrentNetwork>,
1485 min_common_ancestor: u32,
1486 peers: IndexSet<SocketAddr>,
1487 ) {
1488 let rng = &mut TestRng::default();
1489
1490 assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1492
1493 let num_peers_within_recent_range_of_ledger = {
1495 if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1497 0
1498 }
1499 else {
1501 peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1502 }
1503 };
1504
1505 let (requests, sync_peers) = sync.prepare_block_requests();
1507
1508 if peers.is_empty() {
1510 assert!(requests.is_empty());
1511 return;
1512 }
1513
1514 let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1516 assert_eq!(requests.len(), expected_num_requests);
1517
1518 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1519 let sync_ips: IndexSet<_> =
1521 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1522 assert_eq!(height, 1 + idx as u32);
1523 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1524 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1525
1526 if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1527 assert_eq!(sync_ips.len(), 1);
1528 } else {
1529 assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1530 assert_eq!(sync_ips, peers);
1531 }
1532 }
1533 }
1534
1535 #[test]
1537 fn test_latest_block_height() {
1538 for height in generate_block_heights(100_001, 5000) {
1539 let sync = sample_sync_at_height(height);
1540 assert_eq!(sync.ledger.latest_block_height(), height);
1542
1543 assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1545 assert_eq!(
1546 sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1547 height
1548 );
1549 }
1550 }
1551
1552 #[test]
1553 fn test_get_block_hash() {
1554 for height in generate_block_heights(100_001, 5000) {
1555 let sync = sample_sync_at_height(height);
1556
1557 assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1559 assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1560 }
1561 }
1562
1563 #[test]
1564 fn test_prepare_block_requests() {
1565 for num_peers in 0..111 {
1566 println!("Testing with {num_peers} peers");
1567
1568 let sync = sample_sync_at_height(0);
1569
1570 let mut peers = indexset![];
1571
1572 for peer_id in 1..=num_peers {
1573 sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1575 peers.insert(sample_peer_ip(peer_id));
1577 }
1578
1579 check_prepare_block_requests(sync, 10, peers);
1581 }
1582 }
1583
1584 #[test]
1585 fn test_prepare_block_requests_with_leading_fork_at_11() {
1586 let sync = sample_sync_at_height(0);
1587
1588 let peer_1 = sample_peer_ip(1);
1599 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1600
1601 let peer_2 = sample_peer_ip(2);
1603 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1604
1605 let peer_3 = sample_peer_ip(3);
1607 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1608
1609 let (requests, _) = sync.prepare_block_requests();
1611 assert_eq!(requests.len(), 10);
1612
1613 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1615 assert_eq!(height, 1 + idx as u32);
1616 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1617 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1618 assert_eq!(num_sync_ips, 1); }
1620 }
1621
1622 #[test]
1623 fn test_prepare_block_requests_with_leading_fork_at_10() {
1624 let rng = &mut TestRng::default();
1625 let sync = sample_sync_at_height(0);
1626
1627 let peer_1 = sample_peer_ip(1);
1642 sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1643
1644 let peer_2 = sample_peer_ip(2);
1646 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1647
1648 let peer_3 = sample_peer_ip(3);
1650 sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1651
1652 let (requests, _) = sync.prepare_block_requests();
1654 assert_eq!(requests.len(), 0);
1655
1656 let peer_4 = sample_peer_ip(4);
1660 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1661
1662 let (requests, sync_peers) = sync.prepare_block_requests();
1664 assert_eq!(requests.len(), 10);
1665
1666 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1668 let sync_ips: IndexSet<_> =
1670 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1671 assert_eq!(height, 1 + idx as u32);
1672 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1673 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1674 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_1); }
1677 }
1678
1679 #[test]
1680 fn test_prepare_block_requests_with_trailing_fork_at_9() {
1681 let rng = &mut TestRng::default();
1682 let sync = sample_sync_at_height(0);
1683
1684 let peer_1 = sample_peer_ip(1);
1690 sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1691
1692 let peer_2 = sample_peer_ip(2);
1694 sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1695
1696 let peer_3 = sample_peer_ip(3);
1698 sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1699
1700 let (requests, _) = sync.prepare_block_requests();
1702 assert_eq!(requests.len(), 0);
1703
1704 let peer_4 = sample_peer_ip(4);
1708 sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1709
1710 let (requests, sync_peers) = sync.prepare_block_requests();
1712 assert_eq!(requests.len(), 10);
1713
1714 for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1716 let sync_ips: IndexSet<_> =
1718 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1719 assert_eq!(height, 1 + idx as u32);
1720 assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1721 assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1722 assert_eq!(sync_ips.len(), 1); assert_ne!(sync_ips[0], peer_3); }
1725 }
1726
1727 #[test]
1728 fn test_insert_block_requests() {
1729 let rng = &mut TestRng::default();
1730 let sync = sample_sync_at_height(0);
1731
1732 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1734
1735 let (requests, sync_peers) = sync.prepare_block_requests();
1737 assert_eq!(requests.len(), 10);
1738
1739 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1740 let sync_ips: IndexSet<_> =
1742 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1743 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1745 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1747 assert!(sync.get_block_request_timestamp(height).is_some());
1748 }
1749
1750 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1751 let sync_ips: IndexSet<_> =
1753 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1754 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1756 assert!(sync.get_block_request_timestamp(height).is_some());
1757 }
1758
1759 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1760 let sync_ips: IndexSet<_> =
1762 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1763 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1765 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1767 assert!(sync.get_block_request_timestamp(height).is_some());
1768 }
1769 }
1770
1771 #[test]
1772 fn test_insert_block_requests_fails() {
1773 let sync = sample_sync_at_height(9);
1774
1775 sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1777
1778 sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1780 sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1782 }
1783
1784 #[test]
1785 fn test_update_peer_locators() {
1786 let sync = sample_sync_at_height(0);
1787
1788 let peer1_ip = sample_peer_ip(1);
1790 for peer1_height in 0..500u32 {
1791 sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1792 assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1793
1794 let peer2_ip = sample_peer_ip(2);
1795 for peer2_height in 0..500u32 {
1796 println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1797
1798 sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1799 assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1800
1801 let distance = peer1_height.abs_diff(peer2_height);
1803
1804 if distance < NUM_RECENT_BLOCKS as u32 {
1806 let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1807 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1808 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1809 } else {
1810 let min_checkpoints =
1811 core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1812 let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1813 assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1814 assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1815 }
1816 }
1817 }
1818 }
1819
1820 #[test]
1821 fn test_remove_peer() {
1822 let sync = sample_sync_at_height(0);
1823
1824 let peer_ip = sample_peer_ip(1);
1825 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1826 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1827
1828 sync.remove_peer(&peer_ip);
1829 assert_eq!(sync.get_peer_height(&peer_ip), None);
1830
1831 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1832 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1833
1834 sync.remove_peer(&peer_ip);
1835 assert_eq!(sync.get_peer_height(&peer_ip), None);
1836 }
1837
1838 #[test]
1839 fn test_locators_insert_remove_insert() {
1840 let sync = sample_sync_at_height(0);
1841
1842 let peer_ip = sample_peer_ip(1);
1843 sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1844 assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1845
1846 sync.remove_peer(&peer_ip);
1847 assert_eq!(sync.get_peer_height(&peer_ip), None);
1848
1849 sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1850 assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1851 }
1852
1853 #[test]
1854 fn test_requests_insert_remove_insert() {
1855 let rng = &mut TestRng::default();
1856 let sync = sample_sync_at_height(0);
1857
1858 let peer_ip = sample_peer_ip(1);
1860 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1861
1862 let (requests, sync_peers) = sync.prepare_block_requests();
1864 assert_eq!(requests.len(), 10);
1865
1866 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1867 let sync_ips: IndexSet<_> =
1869 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1870 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1872 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1874 assert!(sync.get_block_request_timestamp(height).is_some());
1875 }
1876
1877 sync.remove_peer(&peer_ip);
1879
1880 for (height, _) in requests {
1881 assert_eq!(sync.get_block_request(height), None);
1883 assert!(sync.get_block_request_timestamp(height).is_none());
1884 }
1885
1886 let (requests, _) = sync.prepare_block_requests();
1888 assert_eq!(requests.len(), 0);
1889
1890 sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1892
1893 let (requests, _) = sync.prepare_block_requests();
1895 assert_eq!(requests.len(), 10);
1896
1897 for (height, (hash, previous_hash, num_sync_ips)) in requests {
1898 let sync_ips: IndexSet<_> =
1900 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1901 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1903 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1905 assert!(sync.get_block_request_timestamp(height).is_some());
1906 }
1907 }
1908
1909 #[test]
1910 fn test_obsolete_block_requests() {
1911 let rng = &mut TestRng::default();
1912 let sync = sample_sync_at_height(0);
1913
1914 let locator_height = rng.gen_range(0..50);
1915
1916 let locators = sample_block_locators(locator_height);
1918 sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1919
1920 let (requests, sync_peers) = sync.prepare_block_requests();
1922 assert_eq!(requests.len(), locator_height as usize);
1923
1924 for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1926 let sync_ips: IndexSet<_> =
1928 sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1929 sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1931 assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1933 assert!(sync.get_block_request_timestamp(height).is_some());
1934 }
1935
1936 let ledger_height = rng.gen_range(0..=locator_height);
1940 let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1941
1942 assert_eq!(new_sync.requests.read().len(), requests.len());
1944
1945 let c = DummyPeerPoolHandler::default();
1947 new_sync.handle_block_request_timeouts(&c).unwrap();
1948
1949 assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1951 }
1952
1953 #[test]
1954 fn test_timed_out_block_request() {
1955 let sync = sample_sync_at_height(0);
1956 let peer_ip = sample_peer_ip(1);
1957 let locators = sample_block_locators(10);
1958 let block_hash = locators.get_hash(1);
1959
1960 sync.update_peer_locators(peer_ip, &locators).unwrap();
1961
1962 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1963
1964 sync.requests.write().insert(1, OutstandingRequest {
1966 request: (block_hash, None, [peer_ip].into()),
1967 timestamp,
1968 response: None,
1969 });
1970
1971 assert_eq!(sync.requests.read().len(), 1);
1972 assert_eq!(sync.locators.read().len(), 1);
1973
1974 let c = DummyPeerPoolHandler::default();
1976 sync.handle_block_request_timeouts(&c).unwrap();
1977
1978 let ban_list = c.peers_to_ban.write();
1979 assert_eq!(ban_list.len(), 1);
1980 assert_eq!(ban_list.iter().next(), Some(&peer_ip));
1981
1982 assert!(sync.requests.read().is_empty());
1983 assert!(sync.locators.read().is_empty());
1984 }
1985
1986 #[test]
1987 fn test_reissue_timed_out_block_request() {
1988 let sync = sample_sync_at_height(0);
1989 let peer_ip1 = sample_peer_ip(1);
1990 let peer_ip2 = sample_peer_ip(2);
1991 let peer_ip3 = sample_peer_ip(3);
1992
1993 let locators = sample_block_locators(10);
1994 let block_hash1 = locators.get_hash(1);
1995 let block_hash2 = locators.get_hash(2);
1996
1997 sync.update_peer_locators(peer_ip1, &locators).unwrap();
1998 sync.update_peer_locators(peer_ip2, &locators).unwrap();
1999 sync.update_peer_locators(peer_ip3, &locators).unwrap();
2000
2001 assert_eq!(sync.locators.read().len(), 3);
2002
2003 let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2004
2005 sync.requests.write().insert(1, OutstandingRequest {
2007 request: (block_hash1, None, [peer_ip1].into()),
2008 timestamp,
2009 response: None,
2010 });
2011
2012 sync.requests.write().insert(2, OutstandingRequest {
2014 request: (block_hash2, None, [peer_ip2].into()),
2015 timestamp: Instant::now(),
2016 response: None,
2017 });
2018
2019 assert_eq!(sync.requests.read().len(), 2);
2020
2021 let c = DummyPeerPoolHandler::default();
2023
2024 let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2025
2026 let ban_list = c.peers_to_ban.write();
2027 assert_eq!(ban_list.len(), 1);
2028 assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
2029
2030 assert_eq!(sync.requests.read().len(), 1);
2031 assert_eq!(sync.locators.read().len(), 2);
2032
2033 let (new_requests, new_sync_ips) = re_requests.unwrap();
2034 assert_eq!(new_requests.len(), 1);
2035
2036 let (height, (hash, _, _)) = new_requests.first().unwrap();
2037 assert_eq!(*height, 1);
2038 assert_eq!(*hash, block_hash1);
2039 assert_eq!(new_sync_ips.len(), 2);
2040
2041 let mut iter = new_sync_ips.iter();
2043 assert_ne!(iter.next().unwrap().0, &peer_ip1);
2044 assert_ne!(iter.next().unwrap().0, &peer_ip1);
2045 }
2046}