snarkos_node_sync/
block_sync.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18    locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_network::PeerPoolHandling;
22use snarkos_node_router::messages::DataBlocks;
23use snarkos_node_sync_communication_service::CommunicationService;
24use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
25
26use snarkvm::{
27    console::network::{ConsensusVersion, Network},
28    prelude::block::Block,
29    utilities::ensure_equals,
30};
31
32use anyhow::{Result, bail, ensure};
33use indexmap::{IndexMap, IndexSet};
34use itertools::Itertools;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(feature = "locktick")]
38use locktick::tokio::Mutex as TMutex;
39#[cfg(not(feature = "locktick"))]
40use parking_lot::RwLock;
41use rand::seq::{IteratorRandom, SliceRandom};
42use std::{
43    collections::{BTreeMap, HashMap, HashSet, hash_map},
44    net::{IpAddr, Ipv4Addr, SocketAddr},
45    sync::Arc,
46    time::{Duration, Instant},
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::sync::Notify;
51
52mod helpers;
53use helpers::rangify_heights;
54
55mod sync_state;
56use sync_state::SyncState;
57
58mod metrics;
59use metrics::BlockSyncMetrics;
60
61// The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator
62// by requiring multiple peers to advertise the same (prefix of) block locators.
63// However, we do not use this in production yet.
64#[cfg(not(test))]
65pub const REDUNDANCY_FACTOR: usize = 1;
66#[cfg(test)]
67pub const REDUNDANCY_FACTOR: usize = 3;
68
69/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection.
70///
71/// The current rate limit for all messages is around 160k  per second (see [`Gateway::max_cache_events`]).
72/// This constant limits number of block requests to a much lower 100 per second.
73///
74// TODO(kaimast): base rate limits on how many requests were sent to each peer instead.
75pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
76
77const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
78const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
79
80const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
81
82/// The maximum number of outstanding block requests.
83/// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses.
84const MAX_BLOCK_REQUESTS: usize = 50; // 50 requests
85
86/// The maximum number of blocks tolerated before the primary is considered behind its peers.
87pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks
88
89/// This is a dummy IP address that is used to represent the local node.
90/// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections.
91pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
92
93/// Handle to an outstanding requested, containing the request itself and its timestamp.
94/// This does not contain the response so that checking for responses does not require iterating over all requests.
95#[derive(Clone)]
96struct OutstandingRequest<N: Network> {
97    request: SyncRequest<N>,
98    timestamp: Instant,
99    /// The corresponding response (if any).
100    /// This is guaranteed to be Some if sync_ips for the given request are empty.
101    response: Option<Block<N>>,
102}
103
104/// Information about a block request (used for the REST API).
105#[derive(Clone, serde::Serialize)]
106pub struct BlockRequestInfo {
107    /// Seconds since the request was created
108    elapsed: u64,
109    /// Has the request been responded to?
110    done: bool,
111}
112
113/// Summary of completed all in-flight requests.
114#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestsSummary {
116    outstanding: String,
117    completed: String,
118}
119
120impl<N: Network> OutstandingRequest<N> {
121    /// Get a reference to the IPs of peers that have not responded to the request (yet).
122    fn sync_ips(&self) -> &IndexSet<SocketAddr> {
123        let (_, _, sync_ips) = &self.request;
124        sync_ips
125    }
126
127    /// Get a mutable reference to the IPs of peers that have not responded to the request (yet).
128    fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
129        let (_, _, sync_ips) = &mut self.request;
130        sync_ips
131    }
132}
133
134/// A struct that keeps track of synchronizing blocks with other nodes.
135///
136/// It generates requests to send to other peers and processes responses to those requests.
137/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from.
138///
139/// # Notes
140/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators).
141///
142/// - Validators only sync from other nodes using this struct if they fall behind, e.g.,
143///   because they experience a network partition.
144///   In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved
145///   by a supermajority of the committee.
146pub struct BlockSync<N: Network> {
147    /// The ledger.
148    ledger: Arc<dyn LedgerService<N>>,
149
150    /// The map of peer IP to their block locators.
151    /// The block locators are consistent with the ledger and every other peer's block locators.
152    locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
153
154    /// The map of peer-to-peer to their common ancestor.
155    /// This map is used to determine which peers to request blocks from.
156    ///
157    /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first.
158    common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
159
160    /// The block requests in progress and their responses.
161    requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
162
163    /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
164    ///
165    /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first.
166    sync_state: RwLock<SyncState>,
167
168    /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time.
169    advance_with_sync_blocks_lock: TMutex<()>,
170
171    /// Gets notified when there was an update to the locators or a peer disconnected.
172    peer_notify: Notify,
173
174    /// Gets notified when we received a new block response.
175    response_notify: Notify,
176
177    /// Tracks sync speed
178    metrics: BlockSyncMetrics,
179}
180
181impl<N: Network> BlockSync<N> {
182    /// Initializes a new block sync module.
183    pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
184        // Make sync state aware of the blocks that already exist on disk at startup.
185        let sync_state = SyncState::new_with_height(ledger.latest_block_height());
186
187        Self {
188            ledger,
189            sync_state: RwLock::new(sync_state),
190            peer_notify: Default::default(),
191            response_notify: Default::default(),
192            locators: Default::default(),
193            requests: Default::default(),
194            common_ancestors: Default::default(),
195            advance_with_sync_blocks_lock: Default::default(),
196            metrics: Default::default(),
197        }
198    }
199
200    /// Blocks until something about a peer changes,
201    /// or block request has been fully processed (either successfully or unsuccessfully).
202    ///
203    /// Used by the outgoing task.
204    pub async fn wait_for_peer_update(&self) {
205        self.peer_notify.notified().await
206    }
207
208    /// Blocks until there is a new response to a block request.
209    ///
210    /// Used by the incoming task.
211    pub async fn wait_for_block_responses(&self) {
212        self.response_notify.notified().await
213    }
214
215    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
216    #[inline]
217    pub fn is_block_synced(&self) -> bool {
218        self.sync_state.read().is_block_synced()
219    }
220
221    /// Returns `true` if there a blocks to fetch or responses to process.
222    ///
223    /// This will always return true if [`Self::is_block_synced`] returns false,
224    /// but it can return true when [`Self::is_block_synced`] returns true
225    /// (due to the latter having a tolerance of one block).
226    #[inline]
227    pub fn can_block_sync(&self) -> bool {
228        self.sync_state.read().can_block_sync() || self.has_pending_responses()
229    }
230
231    /// Returns the number of blocks the node is behind the greatest peer height,
232    /// or `None` if no peers are connected yet.
233    #[inline]
234    pub fn num_blocks_behind(&self) -> Option<u32> {
235        self.sync_state.read().num_blocks_behind()
236    }
237
238    /// Returns the greatest block height of any connected peer.
239    #[inline]
240    pub fn greatest_peer_block_height(&self) -> Option<u32> {
241        self.sync_state.read().get_greatest_peer_height()
242    }
243
244    /// Returns the current sync height of this node.
245    /// The sync height is always greater or equal to the ledger height.
246    #[inline]
247    pub fn get_sync_height(&self) -> u32 {
248        self.sync_state.read().get_sync_height()
249    }
250
251    /// Returns the number of blocks we requested from peers, but have not received yet.
252    #[inline]
253    pub fn num_outstanding_block_requests(&self) -> usize {
254        self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
255    }
256
257    /// The total number of block request, including the ones that have been answered already but not processed yet.
258    #[inline]
259    pub fn num_total_block_requests(&self) -> usize {
260        self.requests.read().len()
261    }
262
263    //// Returns the latest locator height for all known peers.
264    pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
265        self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
266    }
267
268    //// Returns information about all in-flight block requests.
269    pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
270        self.requests
271            .read()
272            .iter()
273            .map(|(height, request)| {
274                (*height, BlockRequestInfo {
275                    done: request.sync_ips().is_empty(),
276                    elapsed: request.timestamp.elapsed().as_secs(),
277                })
278            })
279            .collect()
280    }
281
282    /// Returns a summary of all in-flight requests.
283    pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
284        let completed = self
285            .requests
286            .read()
287            .iter()
288            .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
289            .collect::<Vec<_>>();
290
291        let outstanding = self
292            .requests
293            .read()
294            .iter()
295            .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
296            .collect::<Vec<_>>();
297
298        BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
299    }
300
301    pub fn get_sync_speed(&self) -> f64 {
302        self.metrics.get_sync_speed()
303    }
304}
305
306// Helper functions needed for testing
307#[cfg(test)]
308impl<N: Network> BlockSync<N> {
309    /// Returns the latest block height of the given peer IP.
310    fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
311        self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
312    }
313
314    /// Returns the common ancestor for the given peer pair, if it exists.
315    fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
316        self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
317    }
318
319    /// Returns the block request for the given height, if it exists.
320    fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
321        self.requests.read().get(&height).map(|e| e.request.clone())
322    }
323
324    /// Returns the timestamp of the last time the block was requested, if it exists.
325    fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
326        self.requests.read().get(&height).map(|e| e.timestamp)
327    }
328}
329
330impl<N: Network> BlockSync<N> {
331    /// Returns the block locators.
332    #[inline]
333    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
334        // Retrieve the latest block height.
335        let latest_height = self.ledger.latest_block_height();
336
337        // Initialize the recents map.
338        // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1
339        let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
340        // Retrieve the recent block hashes.
341        for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
342            recents.insert(height, self.ledger.get_block_hash(height)?);
343        }
344
345        // Initialize the checkpoints map.
346        let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
347        // Retrieve the checkpoint block hashes.
348        for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
349            checkpoints.insert(height, self.ledger.get_block_hash(height)?);
350        }
351
352        // Construct the block locators.
353        BlockLocators::new(recents, checkpoints)
354    }
355
356    /// Returns true if there are pending responses to block requests that need to be processed.
357    pub fn has_pending_responses(&self) -> bool {
358        self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
359    }
360
361    /// Send a batch of block requests.
362    pub async fn send_block_requests<C: CommunicationService>(
363        &self,
364        communication: &C,
365        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
366        requests: &[(u32, PrepareSyncRequest<N>)],
367    ) -> bool {
368        let (start_height, max_num_sync_ips) = match requests.first() {
369            Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
370            None => {
371                warn!("Block sync failed - no block requests");
372                return false;
373            }
374        };
375
376        debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
377
378        // Use a randomly sampled subset of the sync IPs.
379        let sync_ips: IndexSet<_> =
380            sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
381
382        // Calculate the end height.
383        let end_height = start_height.saturating_add(requests.len() as u32);
384
385        // Insert the chunk of block requests.
386        for (height, (hash, previous_hash, _)) in requests.iter() {
387            // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk.
388            if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
389                warn!("Block sync failed - {error}");
390                return false;
391            }
392        }
393
394        /* Send the block request to the peers */
395
396        // Construct the message.
397        let message = C::prepare_block_request(start_height, end_height);
398
399        // Send the message to the peers.
400        let mut tasks = Vec::with_capacity(sync_ips.len());
401        for sync_ip in sync_ips {
402            let sender = communication.send(sync_ip, message.clone()).await;
403            let task = tokio::spawn(async move {
404                // Ensure the request is sent successfully.
405                match sender {
406                    Some(sender) => {
407                        if let Err(err) = sender.await {
408                            warn!("Failed to send block request to peer '{sync_ip}': {err}");
409                            false
410                        } else {
411                            true
412                        }
413                    }
414                    None => {
415                        warn!("Failed to send block request to peer '{sync_ip}': no such peer");
416                        false
417                    }
418                }
419            });
420
421            tasks.push(task);
422        }
423
424        // Wait for all sends to finish at the same time.
425        for result in futures::future::join_all(tasks).await {
426            let success = match result {
427                Ok(success) => success,
428                Err(err) => {
429                    error!("tokio join error: {err}");
430                    false
431                }
432            };
433
434            // If sending fails for any peer, remove the block request from the sync pool.
435            if !success {
436                // Remove the entire block request from the sync pool.
437                let mut requests = self.requests.write();
438                for height in start_height..end_height {
439                    requests.remove(&height);
440                }
441                // Break out of the loop.
442                return false;
443            }
444        }
445        true
446    }
447
448    /// Inserts a new block response from the given peer IP.
449    ///
450    /// Returns an error if the block was malformed, or we already received a different block for this height.
451    /// This function also removes all block requests from the given peer IP on failure.
452    ///
453    /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`.
454    ///
455    #[inline]
456    pub fn insert_block_responses(
457        &self,
458        peer_ip: SocketAddr,
459        blocks: Vec<Block<N>>,
460        latest_consensus_version: Option<ConsensusVersion>,
461    ) -> Result<()> {
462        let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
463            bail!("Empty block response");
464        };
465
466        let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
467
468        // Perform consensus version check, if possible.
469        // This check is only enabled after nodes have reached V12.
470        if expected_consensus_version >= ConsensusVersion::V12 {
471            if let Some(latest_consensus_version) = latest_consensus_version {
472                ensure_equals!(
473                    expected_consensus_version,
474                    latest_consensus_version,
475                    "the peer's consensus version for height {last_height} does not match ours"
476                );
477            } else {
478                bail!("The peer did not send a consensus version");
479            }
480        }
481
482        // Insert the candidate blocks into the sync pool.
483        for block in blocks {
484            if let Err(error) = self.insert_block_response(peer_ip, block) {
485                self.remove_block_requests_to_peer(&peer_ip);
486                bail!("{error}");
487            }
488        }
489        Ok(())
490    }
491
492    /// Returns the next block for the given `next_height` if the request is complete,
493    /// or `None` otherwise. This does not remove the block from the `responses` map.
494    #[inline]
495    pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
496        // Determine if the request is complete:
497        // either there is no request for `next_height`, or the request has no peer socket addresses left.
498        if let Some(entry) = self.requests.read().get(&next_height) {
499            let is_complete = entry.sync_ips().is_empty();
500            if !is_complete {
501                return None;
502            }
503
504            // If the request is complete, return the block from the responses, if there is one.
505            if entry.response.is_none() {
506                warn!("Request for height {next_height} is complete but no response exists");
507            }
508            entry.response.clone()
509        } else {
510            None
511        }
512    }
513
514    /// Attempts to advance synchronization by processing completed block responses.
515    ///
516    /// Returns true, if new blocks were added to the ledger.
517    ///
518    /// # Usage
519    /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks.
520    /// Validators do not call this function, and instead invoke
521    /// [`snarkos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state.
522    #[inline]
523    pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
524        // Acquire the lock to ensure this function is called only once at a time.
525        // If the lock is already acquired, return early.
526        //
527        // Note: This lock should not be needed anymore as there is only one place we call it from,
528        // but we keep it for now out of caution.
529        // TODO(kaimast): remove this eventually.
530        let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
531            trace!("Skipping attempt to advance block synchronziation as it is already in progress");
532            return Ok(false);
533        };
534
535        // Start with the current height.
536        let mut current_height = self.ledger.latest_block_height();
537        let start_height = current_height;
538        trace!(
539            "Try advancing with block responses (at block {current_height}, current sync speed is {})",
540            self.get_sync_speed()
541        );
542
543        loop {
544            let next_height = current_height + 1;
545
546            let Some(block) = self.peek_next_block(next_height) else {
547                break;
548            };
549
550            // Ensure the block height matches.
551            if block.height() != next_height {
552                warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
553                break;
554            }
555
556            let ledger = self.ledger.clone();
557            let advanced = tokio::task::spawn_blocking(move || {
558                // Try to check the next block and advance to it.
559                match ledger.check_next_block(&block) {
560                    Ok(_) => match ledger.advance_to_next_block(&block) {
561                        Ok(_) => true,
562                        Err(err) => {
563                            warn!(
564                                "Failed to advance to next block (height: {}, hash: '{}'): {err}",
565                                block.height(),
566                                block.hash()
567                            );
568                            false
569                        }
570                    },
571                    Err(err) => {
572                        warn!(
573                            "The next block (height: {}, hash: '{}') is invalid - {err}",
574                            block.height(),
575                            block.hash()
576                        );
577                        false
578                    }
579                }
580            })
581            .await?;
582
583            // Only count successful requests.
584            if advanced {
585                self.count_request_completed();
586            }
587
588            // Remove the block response.
589            self.remove_block_response(next_height);
590
591            // If advancing failed, exit the loop.
592            if !advanced {
593                break;
594            }
595
596            // Update the latest height.
597            current_height = next_height;
598        }
599
600        if current_height > start_height {
601            self.set_sync_height(current_height);
602            Ok(true)
603        } else {
604            Ok(false)
605        }
606    }
607}
608
609impl<N: Network> BlockSync<N> {
610    /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync.
611    /// This function returns peers that are consistent with each other, and have a block height
612    /// that is greater than the ledger height of this node.
613    ///
614    /// # Locking
615    /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time.
616    pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
617        // Retrieve the current sync height.
618        let current_height = self.get_sync_height();
619
620        if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
621            // Map the locators into the latest height.
622            let sync_peers =
623                sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
624            // Return the sync peers and their minimum common ancestor.
625            Some((sync_peers, min_common_ancestor))
626        } else {
627            None
628        }
629    }
630
631    /// Updates the block locators and common ancestors for the given peer IP.
632    ///
633    /// This function does not need to check that the block locators are well-formed,
634    /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`].
635    ///
636    /// This function does **not** check
637    /// that the block locators are consistent with the peer's previous block locators or other peers' block locators.
638    pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
639        // Update the locators entry for the given peer IP.
640        // We perform this update atomically, and drop the lock as soon as we are done with the update.
641        match self.locators.write().entry(peer_ip) {
642            hash_map::Entry::Occupied(mut e) => {
643                // Return early if the block locators did not change.
644                if e.get() == locators {
645                    return Ok(());
646                }
647
648                let old_height = e.get().latest_locator_height();
649                let new_height = locators.latest_locator_height();
650
651                if old_height > new_height {
652                    debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
653                }
654                e.insert(locators.clone());
655            }
656            hash_map::Entry::Vacant(e) => {
657                e.insert(locators.clone());
658            }
659        }
660
661        // Compute the common ancestor with this node.
662        let new_local_ancestor = {
663            let mut ancestor = 0;
664            // Attention: Please do not optimize this loop, as it performs fork-detection. In addition,
665            // by iterating upwards, it also early-terminates malicious block locators at the *first* point
666            // of bifurcation in their ledger history, which is a critical safety guarantee provided here.
667            for (height, hash) in locators.clone().into_iter() {
668                if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
669                    match ledger_hash == hash {
670                        true => ancestor = height,
671                        false => {
672                            debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
673                            break;
674                        }
675                    }
676                }
677            }
678            ancestor
679        };
680
681        // Compute the common ancestor with every other peer.
682        // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers.
683        let ancestor_updates: Vec<_> = self
684            .locators
685            .read()
686            .iter()
687            .filter_map(|(other_ip, other_locators)| {
688                // Skip if the other peer is the given peer.
689                if other_ip == &peer_ip {
690                    return None;
691                }
692                // Compute the common ancestor with the other peer.
693                let mut ancestor = 0;
694                for (height, hash) in other_locators.clone().into_iter() {
695                    if let Some(expected_hash) = locators.get_hash(height) {
696                        match expected_hash == hash {
697                            true => ancestor = height,
698                            false => {
699                                debug!(
700                                    "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
701                                );
702                                break;
703                            }
704                        }
705                    }
706                }
707
708                Some((PeerPair(peer_ip, *other_ip), ancestor))
709            })
710            .collect();
711
712        // Update the map of common ancestors.
713        // Scope the lock, so it is dropped before locking `sync_state`.
714        {
715            let mut common_ancestors = self.common_ancestors.write();
716            common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
717
718            for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
719                common_ancestors.insert(peer_pair, new_ancestor);
720            }
721        }
722
723        // Update sync state, because the greatest peer height may have decreased.
724        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
725            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
726        } else {
727            error!("Got new block locators but greatest peer height is zero.");
728        }
729
730        // Notify the sync loop that something changed.
731        self.peer_notify.notify_one();
732
733        Ok(())
734    }
735
736    /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe
737    ///  (that we don't rely upon it for safety when we re-connect with the same peer).
738    /// Removes the peer from the sync pool, if they exist.
739    pub fn remove_peer(&self, peer_ip: &SocketAddr) {
740        trace!("Removing peer {peer_ip} from block sync");
741
742        // Remove the locators entry for the given peer IP.
743        self.locators.write().remove(peer_ip);
744        // Remove all common ancestor entries for this peers.
745        self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
746        // Remove all block requests to the peer.
747        self.remove_block_requests_to_peer(peer_ip);
748
749        // Update sync state, because the greatest peer height may have decreased.
750        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
751            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
752        } else {
753            // There are no more peers left.
754            self.sync_state.write().clear_greatest_peer_height();
755        }
756
757        // Notify the sync loop that something changed.
758        self.peer_notify.notify_one();
759    }
760}
761
762// Helper type for prepare_block_requests
763pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
764
765impl<N: Network> BlockSync<N> {
766    /// Returns a list of block requests and the sync peers, if the node needs to sync.
767    ///
768    /// You usually want to call `remove_timed_out_block_requests` before invoking this function.
769    ///
770    /// # Concurrency
771    /// This should be called by at most one task at a time.
772    ///
773    /// # Usage
774    ///  - For validators, the primary spawns exactly one task that periodically calls
775    ///    `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it.
776    ///  - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls
777    ///    `Client::try_issuing_block_requests` which calls this function.
778    ///  - Provers do not call this function.
779    pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
780        // Used to print more information when we max out on requests.
781        let print_requests = || {
782            if tracing::enabled!(tracing::Level::TRACE) {
783                let summary = self.get_block_requests_summary();
784
785                trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
786                trace!("The following requests are still outstanding: {:?}", summary.outstanding);
787            }
788        };
789
790        // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while.
791        let current_height = self.get_sync_height();
792
793        // Ensure to not exceed the maximum number of outstanding block requests.
794        let max_outstanding_block_requests =
795            (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
796
797        // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet.
798        let max_total_requests = 4 * max_outstanding_block_requests;
799
800        let max_new_blocks_to_request =
801            max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
802
803        // Prepare the block requests and sync peers, or returns an empty result if there is nothing to request.
804        if self.num_total_block_requests() >= max_total_requests as usize {
805            trace!(
806                "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
807            );
808
809            print_requests();
810            Default::default()
811        } else if max_new_blocks_to_request == 0 {
812            trace!(
813                "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
814            );
815
816            print_requests();
817            Default::default()
818        } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
819            // Retrieve the greatest block height of any connected peer.
820            // We do not need to update the sync state here, as that already happens when the block locators are received.
821            let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
822
823            // Construct the list of block requests.
824            let requests = self.construct_requests(
825                &sync_peers,
826                current_height,
827                min_common_ancestor,
828                max_new_blocks_to_request,
829                greatest_peer_height,
830            );
831
832            (requests, sync_peers)
833        } else if self.requests.read().is_empty() {
834            // This can happen during a race condition where the node just finished syncing.
835            // It does not make sense to log or change the sync status here.
836            // Checking the sync status here also does not make sense, as the node might as well have switched back
837            //  from `synced` to `syncing` between calling `find_sync_peers_inner` and this line.
838
839            Default::default()
840        } else {
841            // This happens if we already requested all advertised blocks.
842            trace!("No new blocks can be requested, but there are still outstanding requests.");
843
844            print_requests();
845            Default::default()
846        }
847    }
848
849    /// Should only be called by validators when they successfully process a block request.
850    /// (for other nodes this will be automatically called internally)
851    ///
852    /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`.
853    pub fn count_request_completed(&self) {
854        self.metrics.count_request_completed();
855    }
856
857    /// Set the sync height to a the given value.
858    /// This is a no-op if `new_height` is equal or less to the current sync height.
859    pub fn set_sync_height(&self, new_height: u32) {
860        // Scope state lock to avoid locking state and metrics at the same time.
861        let fully_synced = {
862            let mut state = self.sync_state.write();
863            state.set_sync_height(new_height);
864            !state.can_block_sync()
865        };
866
867        if fully_synced {
868            self.metrics.mark_fully_synced();
869        }
870    }
871
872    /// Inserts a block request for the given height.
873    fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
874        // Ensure the block request does not already exist.
875        self.check_block_request(height)?;
876        // Ensure the sync IPs are not empty.
877        ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
878        // Insert the block request.
879        self.requests.write().insert(height, OutstandingRequest {
880            request: (hash, previous_hash, sync_ips),
881            timestamp: Instant::now(),
882            response: None,
883        });
884        Ok(())
885    }
886
887    /// Inserts the given block response, after checking that the request exists and the response is well-formed.
888    /// On success, this function removes the peer IP from the request sync peers and inserts the response.
889    fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
890        // Retrieve the block height.
891        let height = block.height();
892        let mut requests = self.requests.write();
893
894        if self.ledger.contains_block_height(height) {
895            bail!("The sync request was removed because we already advanced");
896        }
897
898        let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
899
900        // Retrieve the request entry for the candidate block.
901        let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
902
903        // Ensure the candidate block hash matches the expected hash.
904        if let Some(expected_hash) = expected_hash {
905            if block.hash() != *expected_hash {
906                bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
907            }
908        }
909        // Ensure the previous block hash matches if it exists.
910        if let Some(expected_previous_hash) = expected_previous_hash {
911            if block.previous_hash() != *expected_previous_hash {
912                bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
913            }
914        }
915        // Ensure the sync pool requested this block from the given peer.
916        if !sync_ips.contains(&peer_ip) {
917            bail!("The sync pool did not request block {height} from '{peer_ip}'")
918        }
919
920        // Remove the peer IP from the request entry.
921        entry.sync_ips_mut().swap_remove(&peer_ip);
922
923        if let Some(existing_block) = &entry.response {
924            // If the candidate block was already present, ensure it is the same block.
925            if block != *existing_block {
926                bail!("Candidate block {height} from '{peer_ip}' is malformed");
927            }
928        } else {
929            entry.response = Some(block.clone());
930        }
931
932        // Notify the sync loop that something changed.
933        self.response_notify.notify_one();
934
935        Ok(())
936    }
937
938    /// Checks that a block request for the given height does not already exist.
939    fn check_block_request(&self, height: u32) -> Result<()> {
940        // Ensure the block height is not already in the ledger.
941        if self.ledger.contains_block_height(height) {
942            bail!("Failed to add block request, as block {height} exists in the ledger");
943        }
944        // Ensure the block height is not already requested.
945        if self.requests.read().contains_key(&height) {
946            bail!("Failed to add block request, as block {height} exists in the requests map");
947        }
948
949        Ok(())
950    }
951
952    /// Removes the block request and response for the given height
953    /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
954    ///
955    /// Precondition: This may only be called after `peek_next_block` has returned `Some`,
956    /// which has checked if the request for the given height is complete
957    /// and there is a block with the given `height` in the `responses` map.
958    pub fn remove_block_response(&self, height: u32) {
959        // Remove the request entry for the given height.
960        if let Some(e) = self.requests.write().remove(&height) {
961            trace!(
962                "Block request for height {height} was completed in {}ms (sync speed is {})",
963                e.timestamp.elapsed().as_millis(),
964                self.get_sync_speed()
965            );
966
967            // Notify the sending task that less requests are in-flight.
968            self.peer_notify.notify_one();
969        }
970    }
971
972    /// Removes all block requests for the given peer IP.
973    ///
974    /// This is used when disconnecting from a peer or when a peer sends invalid block responses.
975    fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
976        trace!("Block sync is removing all block requests to peer {peer_ip}...");
977
978        // Remove the peer IP from the requests map. If any request entry is now empty,
979        // and its corresponding response entry is also empty, then remove that request entry altogether.
980        self.requests.write().retain(|height, e| {
981            let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
982
983            // Only remove requests that were sent to this peer, that have no other peer that can respond instead,
984            // and that were not completed yet.
985            let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
986            if !retain {
987                trace!("Removed block request timestamp for {peer_ip} at height {height}");
988            }
989            retain
990        });
991
992        // No need to remove responses here, because requests with responses will be retained.
993    }
994
995    /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time.
996    ///
997    /// This removes the corresponding block responses and returns the set of peers/addresses that timed out.
998    /// It will ask the peer pool handling service to ban any timed-out peers.
999    ///
1000    /// # Return Value
1001    /// On success it will return `None` if there is nothing to re-request, or a set of new of block requests that replaced the timed-out requests.
1002    /// This set of new requests can also replace requests that timed out earlier, and which we were not able to re-request yet.
1003    ///
1004    /// This function will return an error if it cannot re-request blocks due to a lack of peers.
1005    /// In this case, the current iteration of block synchronization should not continue and the node should re-try later instead.
1006    pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1007        &self,
1008        peer_pool_handler: &P,
1009    ) -> Result<Option<BlockRequestBatch<N>>> {
1010        // Acquire the write lock on the requests map.
1011        let mut requests = self.requests.write();
1012
1013        // Retrieve the current time.
1014        let now = Instant::now();
1015
1016        // Retrieve the current block height
1017        let current_height = self.ledger.latest_block_height();
1018
1019        // Track the number of timed out block requests (only used to print a log message).
1020        let mut timed_out_requests = vec![];
1021
1022        // Track which peers should be banned due to unresponsiveness.
1023        let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1024
1025        // Remove timed out block requests.
1026        requests.retain(|height, e| {
1027            let is_obsolete = *height <= current_height;
1028            // Determine if the duration since the request timestamp has exceeded the request timeout.
1029            let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1030            // Determine if the request is incomplete.
1031            let is_complete = e.sync_ips().is_empty();
1032
1033            // Determine if the request has timed out.
1034            let is_timeout = timer_elapsed && !is_complete;
1035
1036            // Retain if this is not a timeout and is not obsolete.
1037            let retain = !is_timeout && !is_obsolete;
1038
1039            if is_timeout {
1040                trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1041
1042                // Increment the number of timed out block requests.
1043                timed_out_requests.push(*height);
1044            } else if is_obsolete {
1045                trace!("Block request at height {height} became obsolete (current_height={current_height})");
1046            }
1047
1048            // If the request timed out, also remove and ban given peer.
1049            if is_timeout {
1050                for peer_ip in e.sync_ips().iter() {
1051                    peers_to_ban.insert(*peer_ip);
1052                }
1053            }
1054
1055            retain
1056        });
1057
1058        if !timed_out_requests.is_empty() {
1059            debug!("{num} block requests timed out", num = timed_out_requests.len());
1060        }
1061
1062        let next_request_height = requests.iter().next().map(|(h, _)| *h);
1063
1064        // Avoid locking `locators` and `requests` at the same time.
1065        drop(requests);
1066
1067        // Now remove and ban any unresponsive peers
1068        for peer_ip in peers_to_ban {
1069            self.remove_peer(&peer_ip);
1070            peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1071        }
1072
1073        // Determine if we need to re-issue any timed-out requests.
1074        // If there are no requests remaining or no gap at the beginning,
1075        // we do not need to re-issue requests and will just issue them regularly.
1076        //
1077        // This needs to be checked even if timed_out_requests is empty, because we might not be able to re-issue
1078        // requests immediately if there are no other peers at a given time.
1079        // Further, this only closes the first gap. So multiple calls to this might be needed.
1080        let sync_height = self.get_sync_height();
1081        let start_height = sync_height + 1;
1082
1083        let end_height = if let Some(next_height) = next_request_height
1084            && next_height > start_height
1085        {
1086            // The end height is exclusive, so use the height of the first existing block requests as the end
1087            next_height
1088        } else {
1089            // Nothing to do.
1090            // Do not log here as this check happens frequently.
1091            return Ok(None);
1092        };
1093
1094        // Set the maximum number of blocks, so that they do not exceed the end height.
1095        let max_new_blocks_to_request = end_height - start_height;
1096
1097        let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1098            // This generally shouldn't happen, because there cannot be outstanding requests when no peers are connected.
1099            bail!("Cannot re-request blocks because no or not enough peers are connected");
1100        };
1101
1102        // Retrieve the greatest block height of any connected peer.
1103        let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1104            // This should never happen because `sync_peers` is guaranteed to be non-empty.
1105            bail!("Cannot re-request blocks because no or not enough peers are connected");
1106        };
1107
1108        // (Try to) construct the requests.
1109        let requests = self.construct_requests(
1110            &sync_peers,
1111            sync_height,
1112            min_common_ancestor,
1113            max_new_blocks_to_request,
1114            greatest_peer_height,
1115        );
1116
1117        // If the ledger advanced concurrenctly, there may be no requests to issue after all.
1118        // The given height may also be greater `start_height` due to concurerent block advancement.
1119        if let Some((height, _)) = requests.as_slice().first() {
1120            debug!("Re-requesting blocks starting at height {height}");
1121            Ok(Some((requests, sync_peers)))
1122        } else {
1123            // Do not log here as this constitutes a benign race condition.
1124            Ok(None)
1125        }
1126    }
1127
1128    /// Finds the peers to sync from and the shared common ancestor, starting at the give height.
1129    ///
1130    /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer.
1131    /// Returns `None` if there are no peers to sync from.
1132    ///
1133    /// # Locking
1134    /// This function will read-lock `common_ancstors`.
1135    fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1136        // Retrieve the latest ledger height.
1137        let latest_ledger_height = self.ledger.latest_block_height();
1138
1139        // Pick a set of peers above the latest ledger height, and include their locators.
1140        // This will sort the peers by locator height in descending order.
1141        let candidate_locators: IndexMap<_, _> = self
1142            .locators
1143            .read()
1144            .iter()
1145            .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1146            .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1147            .take(NUM_SYNC_CANDIDATE_PEERS)
1148            .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1149            .collect();
1150
1151        // Case 0: If there are no candidate peers, return `None`.
1152        if candidate_locators.is_empty() {
1153            trace!("Found no sync peers with height greater {current_height}");
1154            return None;
1155        }
1156
1157        // TODO (howardwu): Change this to the highest cumulative weight for Phase 3.
1158        // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height,
1159        // then pick the peer with the highest height, and find peers (up to extra redundancy) with
1160        // a common ancestor above the block request range. Set the end height to their common ancestor.
1161
1162        // Determine the threshold number of peers to sync from.
1163        let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1164
1165        // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height
1166        // and a cohort of peers who share a common ancestor above this node's latest ledger height.
1167        for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1168            // The height of the common ancestor shared by all selected peers.
1169            let mut min_common_ancestor = peer_locators.latest_locator_height();
1170
1171            // The peers we will synchronize from.
1172            // As the previous iteration did not succeed, restart with the next candidate peers.
1173            let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1174
1175            // Try adding other peers consistent with this one to the sync peer set.
1176            for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1177                // Check if these two peers have a common ancestor above the latest ledger height.
1178                if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1179                    // If so, then check that their block locators are consistent.
1180                    if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1181                        // If their common ancestor is less than the minimum common ancestor, then update it.
1182                        min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1183
1184                        // Add the other peer to the list of sync peers.
1185                        sync_peers.push((*other_ip, other_locators.clone()));
1186                    }
1187                }
1188            }
1189
1190            // If we have enough sync peers above the latest ledger height, finish and return them.
1191            if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1192                // Shuffle the sync peers prior to returning. This ensures the rest of the stack
1193                // does not rely on the order of the sync peers, and that the sync peers are not biased.
1194                sync_peers.shuffle(&mut rand::thread_rng());
1195
1196                // Collect into an IndexMap and return.
1197                return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1198            }
1199        }
1200
1201        // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None.
1202        None
1203    }
1204
1205    /// Given the sync peers and their minimum common ancestor, return a list of block requests.
1206    fn construct_requests(
1207        &self,
1208        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1209        sync_height: u32,
1210        min_common_ancestor: u32,
1211        max_blocks_to_request: u32,
1212        greatest_peer_height: u32,
1213    ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1214        // Compute the start height for the block requests.
1215        let start_height = {
1216            let requests = self.requests.read();
1217            let ledger_height = self.ledger.latest_block_height();
1218
1219            // Do not issue requests for blocks already contained in the ledger.
1220            let mut start_height = ledger_height.max(sync_height + 1);
1221
1222            // Do not issue requests that already exist.
1223            while requests.contains_key(&start_height) {
1224                start_height += 1;
1225            }
1226
1227            start_height
1228        };
1229
1230        // If the minimum common ancestor is below the start height, then return early.
1231        if min_common_ancestor < start_height {
1232            if start_height < greatest_peer_height {
1233                trace!(
1234                    "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1235                );
1236            }
1237            return Default::default();
1238        }
1239
1240        // Compute the end height for the block request.
1241        let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1242
1243        // Construct the block hashes to request.
1244        let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1245        // Track the largest number of sync IPs required for any block request in the sequence of requests.
1246        let mut max_num_sync_ips = 1;
1247
1248        for height in start_height..end_height {
1249            // Ensure the current height is not in the ledger or already requested.
1250            if let Err(err) = self.check_block_request(height) {
1251                trace!("{err}");
1252
1253                // If the sequence of block requests is interrupted, then return early.
1254                // Otherwise, continue until the first start height that is new.
1255                match request_hashes.is_empty() {
1256                    true => continue,
1257                    false => break,
1258                }
1259            }
1260
1261            // Construct the block request.
1262            let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1263
1264            // Handle the dishonest case.
1265            if !is_honest {
1266                // TODO (howardwu): Consider performing an integrity check on peers (to disconnect).
1267                warn!("Detected dishonest peer(s) when preparing block request");
1268                // If there are not enough peers in the dishonest case, then return early.
1269                if sync_peers.len() < num_sync_ips {
1270                    break;
1271                }
1272            }
1273
1274            // Update the maximum number of sync IPs.
1275            max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1276
1277            // Append the request.
1278            request_hashes.insert(height, (hash, previous_hash));
1279        }
1280
1281        // Construct the requests with the same sync ips.
1282        request_hashes
1283            .into_iter()
1284            .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1285            .collect()
1286    }
1287}
1288
1289/// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash,
1290/// in order to allow the caller to determine what to do.
1291fn construct_request<N: Network>(
1292    height: u32,
1293    sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1294) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1295    let mut hash = None;
1296    let mut hash_redundancy: usize = 0;
1297    let mut previous_hash = None;
1298    let mut is_honest = true;
1299
1300    for peer_locators in sync_peers.values() {
1301        if let Some(candidate_hash) = peer_locators.get_hash(height) {
1302            match hash {
1303                // Increment the redundancy count if the hash matches.
1304                Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1305                // Some peer is dishonest.
1306                Some(_) => {
1307                    hash = None;
1308                    hash_redundancy = 0;
1309                    previous_hash = None;
1310                    is_honest = false;
1311                    break;
1312                }
1313                // Set the hash if it is not set.
1314                None => {
1315                    hash = Some(candidate_hash);
1316                    hash_redundancy = 1;
1317                }
1318            }
1319        }
1320        if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1321            match previous_hash {
1322                // Increment the redundancy count if the previous hash matches.
1323                Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1324                // Some peer is dishonest.
1325                Some(_) => {
1326                    hash = None;
1327                    hash_redundancy = 0;
1328                    previous_hash = None;
1329                    is_honest = false;
1330                    break;
1331                }
1332                // Set the previous hash if it is not set.
1333                None => previous_hash = Some(candidate_previous_hash),
1334            }
1335        }
1336    }
1337
1338    // Note that we intentionally do not just pick the peers that have the hash we have chosen,
1339    // to give stronger confidence that we are syncing during times when the network is consistent/stable.
1340    let num_sync_ips = {
1341        // Extra redundant peers - as the block hash was dishonest.
1342        if !is_honest {
1343            // Choose up to the extra redundancy factor in sync peers.
1344            EXTRA_REDUNDANCY_FACTOR
1345        }
1346        // No redundant peers - as we have redundancy on the block hash.
1347        else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1348            // Choose one sync peer.
1349            1
1350        }
1351        // Redundant peers - as we do not have redundancy on the block hash.
1352        else {
1353            // Choose up to the redundancy factor in sync peers.
1354            REDUNDANCY_FACTOR
1355        }
1356    };
1357
1358    (hash, previous_hash, num_sync_ips, is_honest)
1359}
1360
1361#[cfg(test)]
1362mod tests {
1363    use super::*;
1364    use crate::locators::{
1365        CHECKPOINT_INTERVAL,
1366        NUM_RECENT_BLOCKS,
1367        test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1368    };
1369
1370    use snarkos_node_bft_ledger_service::MockLedgerService;
1371    use snarkos_node_network::{NodeType, Peer, Resolver};
1372    use snarkos_node_tcp::{P2P, Tcp};
1373    use snarkvm::{
1374        ledger::committee::Committee,
1375        prelude::{Field, TestRng},
1376    };
1377
1378    use indexmap::{IndexSet, indexset};
1379    #[cfg(feature = "locktick")]
1380    use locktick::parking_lot::RwLock;
1381    #[cfg(not(feature = "locktick"))]
1382    use parking_lot::RwLock;
1383    use rand::Rng;
1384    use std::net::{IpAddr, Ipv4Addr};
1385
1386    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1387
1388    #[derive(Default)]
1389    struct DummyPeerPoolHandler {
1390        peers_to_ban: RwLock<Vec<SocketAddr>>,
1391    }
1392
1393    impl P2P for DummyPeerPoolHandler {
1394        fn tcp(&self) -> &Tcp {
1395            unreachable!();
1396        }
1397    }
1398
1399    impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1400        const MAXIMUM_POOL_SIZE: usize = 10;
1401        const OWNER: &str = "[DummyPeerPoolHandler]";
1402        const PEER_SLASHING_COUNT: usize = 0;
1403
1404        fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1405            unreachable!();
1406        }
1407
1408        fn resolver(&self) -> &RwLock<Resolver<N>> {
1409            unreachable!();
1410        }
1411
1412        fn is_dev(&self) -> bool {
1413            true
1414        }
1415
1416        fn trusted_peers_only(&self) -> bool {
1417            false
1418        }
1419
1420        fn node_type(&self) -> NodeType {
1421            NodeType::Client
1422        }
1423
1424        fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1425            self.peers_to_ban.write().push(listener_addr);
1426        }
1427    }
1428
1429    /// Returns the peer IP for the sync pool.
1430    fn sample_peer_ip(id: u16) -> SocketAddr {
1431        assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1432        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1433    }
1434
1435    /// Returns a sample committee.
1436    fn sample_committee() -> Committee<CurrentNetwork> {
1437        let rng = &mut TestRng::default();
1438        snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1439    }
1440
1441    /// Returns the ledger service, initialized to the given height.
1442    fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1443        MockLedgerService::new_at_height(sample_committee(), height)
1444    }
1445
1446    /// Returns the sync pool, with the ledger initialized to the given height.
1447    fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1448        BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1449    }
1450
1451    /// Returns a vector of randomly sampled block heights in [0, max_height].
1452    ///
1453    /// The maximum value will always be included in the result.
1454    fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1455        assert!(num_values > 0, "Cannot generate an empty vector");
1456        assert!((max_height as usize) >= num_values);
1457
1458        let mut rng = TestRng::default();
1459
1460        let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1461
1462        heights.push(max_height);
1463
1464        heights
1465    }
1466
1467    /// Returns a duplicate (deep copy) of the sync pool with a different ledger height.
1468    fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1469        BlockSync::<CurrentNetwork> {
1470            peer_notify: Notify::new(),
1471            response_notify: Default::default(),
1472            ledger: Arc::new(sample_ledger_service(height)),
1473            locators: RwLock::new(sync.locators.read().clone()),
1474            common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1475            requests: RwLock::new(sync.requests.read().clone()),
1476            sync_state: RwLock::new(sync.sync_state.read().clone()),
1477            advance_with_sync_blocks_lock: Default::default(),
1478            metrics: Default::default(),
1479        }
1480    }
1481
1482    /// Checks that the sync pool (starting at genesis) returns the correct requests.
1483    fn check_prepare_block_requests(
1484        sync: BlockSync<CurrentNetwork>,
1485        min_common_ancestor: u32,
1486        peers: IndexSet<SocketAddr>,
1487    ) {
1488        let rng = &mut TestRng::default();
1489
1490        // Check test assumptions are met.
1491        assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1492
1493        // Determine the number of peers within range of this sync pool.
1494        let num_peers_within_recent_range_of_ledger = {
1495            // If no peers are within range, then set to 0.
1496            if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1497                0
1498            }
1499            // Otherwise, manually check the number of peers within range.
1500            else {
1501                peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1502            }
1503        };
1504
1505        // Prepare the block requests.
1506        let (requests, sync_peers) = sync.prepare_block_requests();
1507
1508        // If there are no peers, then there should be no requests.
1509        if peers.is_empty() {
1510            assert!(requests.is_empty());
1511            return;
1512        }
1513
1514        // Otherwise, there should be requests.
1515        let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1516        assert_eq!(requests.len(), expected_num_requests);
1517
1518        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1519            // Construct the sync IPs.
1520            let sync_ips: IndexSet<_> =
1521                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1522            assert_eq!(height, 1 + idx as u32);
1523            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1524            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1525
1526            if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1527                assert_eq!(sync_ips.len(), 1);
1528            } else {
1529                assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1530                assert_eq!(sync_ips, peers);
1531            }
1532        }
1533    }
1534
1535    /// Tests that height and hash values are set correctly using many different maximum block heights.
1536    #[test]
1537    fn test_latest_block_height() {
1538        for height in generate_block_heights(100_001, 5000) {
1539            let sync = sample_sync_at_height(height);
1540            // Check that the latest block height is the maximum height.
1541            assert_eq!(sync.ledger.latest_block_height(), height);
1542
1543            // Check the hash to height mapping
1544            assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1545            assert_eq!(
1546                sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1547                height
1548            );
1549        }
1550    }
1551
1552    #[test]
1553    fn test_get_block_hash() {
1554        for height in generate_block_heights(100_001, 5000) {
1555            let sync = sample_sync_at_height(height);
1556
1557            // Check the height to hash mapping
1558            assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1559            assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1560        }
1561    }
1562
1563    #[test]
1564    fn test_prepare_block_requests() {
1565        for num_peers in 0..111 {
1566            println!("Testing with {num_peers} peers");
1567
1568            let sync = sample_sync_at_height(0);
1569
1570            let mut peers = indexset![];
1571
1572            for peer_id in 1..=num_peers {
1573                // Add a peer.
1574                sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1575                // Add the peer to the set of peers.
1576                peers.insert(sample_peer_ip(peer_id));
1577            }
1578
1579            // If all peers are ahead, then requests should be prepared.
1580            check_prepare_block_requests(sync, 10, peers);
1581        }
1582    }
1583
1584    #[test]
1585    fn test_prepare_block_requests_with_leading_fork_at_11() {
1586        let sync = sample_sync_at_height(0);
1587
1588        // Intuitively, peer 1's fork is above peer 2 and peer 3's height.
1589        // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork.
1590        // Thus, you can sync up to block 10 from any of the 3 peers.
1591
1592        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11,
1593        // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers.
1594        // This is safe because the leading fork is at 11, and the sync pool is at 0,
1595        // so all candidate peers are at least 10 blocks ahead of the sync pool.
1596
1597        // Add a peer (fork).
1598        let peer_1 = sample_peer_ip(1);
1599        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1600
1601        // Add a peer.
1602        let peer_2 = sample_peer_ip(2);
1603        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1604
1605        // Add a peer.
1606        let peer_3 = sample_peer_ip(3);
1607        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1608
1609        // Prepare the block requests.
1610        let (requests, _) = sync.prepare_block_requests();
1611        assert_eq!(requests.len(), 10);
1612
1613        // Check the requests.
1614        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1615            assert_eq!(height, 1 + idx as u32);
1616            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1617            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1618            assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1619        }
1620    }
1621
1622    #[test]
1623    fn test_prepare_block_requests_with_leading_fork_at_10() {
1624        let rng = &mut TestRng::default();
1625        let sync = sample_sync_at_height(0);
1626
1627        // Intuitively, peer 1's fork is at peer 2 and peer 3's height.
1628        // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked.
1629        // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10.
1630        //
1631        // Now, while you could in theory sync up to block 9 from any of the 3 peers,
1632        // we choose not to do this as either side is likely to disconnect from us,
1633        // and we would rather wait for enough redundant peers before syncing.
1634
1635        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10,
1636        // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers.
1637        // We choose to sync with a cohort of peers that are *consistent* with each other,
1638        // and prioritize from descending heights (so the highest peer gets priority).
1639
1640        // Add a peer (fork).
1641        let peer_1 = sample_peer_ip(1);
1642        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1643
1644        // Add a peer.
1645        let peer_2 = sample_peer_ip(2);
1646        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1647
1648        // Add a peer.
1649        let peer_3 = sample_peer_ip(3);
1650        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1651
1652        // Prepare the block requests.
1653        let (requests, _) = sync.prepare_block_requests();
1654        assert_eq!(requests.len(), 0);
1655
1656        // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests.
1657
1658        // Add a peer.
1659        let peer_4 = sample_peer_ip(4);
1660        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1661
1662        // Prepare the block requests.
1663        let (requests, sync_peers) = sync.prepare_block_requests();
1664        assert_eq!(requests.len(), 10);
1665
1666        // Check the requests.
1667        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1668            // Construct the sync IPs.
1669            let sync_ips: IndexSet<_> =
1670                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1671            assert_eq!(height, 1 + idx as u32);
1672            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1673            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1674            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1675            assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer.
1676        }
1677    }
1678
1679    #[test]
1680    fn test_prepare_block_requests_with_trailing_fork_at_9() {
1681        let rng = &mut TestRng::default();
1682        let sync = sample_sync_at_height(0);
1683
1684        // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers
1685        // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2,
1686        // then you should be able to sync up to block 10, thereby biasing away from peer 3.
1687
1688        // Add a peer (fork).
1689        let peer_1 = sample_peer_ip(1);
1690        sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1691
1692        // Add a peer.
1693        let peer_2 = sample_peer_ip(2);
1694        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1695
1696        // Add a peer.
1697        let peer_3 = sample_peer_ip(3);
1698        sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1699
1700        // Prepare the block requests.
1701        let (requests, _) = sync.prepare_block_requests();
1702        assert_eq!(requests.len(), 0);
1703
1704        // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests.
1705
1706        // Add a peer.
1707        let peer_4 = sample_peer_ip(4);
1708        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1709
1710        // Prepare the block requests.
1711        let (requests, sync_peers) = sync.prepare_block_requests();
1712        assert_eq!(requests.len(), 10);
1713
1714        // Check the requests.
1715        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1716            // Construct the sync IPs.
1717            let sync_ips: IndexSet<_> =
1718                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1719            assert_eq!(height, 1 + idx as u32);
1720            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1721            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1722            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1723            assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer.
1724        }
1725    }
1726
1727    #[test]
1728    fn test_insert_block_requests() {
1729        let rng = &mut TestRng::default();
1730        let sync = sample_sync_at_height(0);
1731
1732        // Add a peer.
1733        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1734
1735        // Prepare the block requests.
1736        let (requests, sync_peers) = sync.prepare_block_requests();
1737        assert_eq!(requests.len(), 10);
1738
1739        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1740            // Construct the sync IPs.
1741            let sync_ips: IndexSet<_> =
1742                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1743            // Insert the block request.
1744            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1745            // Check that the block requests were inserted.
1746            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1747            assert!(sync.get_block_request_timestamp(height).is_some());
1748        }
1749
1750        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1751            // Construct the sync IPs.
1752            let sync_ips: IndexSet<_> =
1753                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1754            // Check that the block requests are still inserted.
1755            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1756            assert!(sync.get_block_request_timestamp(height).is_some());
1757        }
1758
1759        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1760            // Construct the sync IPs.
1761            let sync_ips: IndexSet<_> =
1762                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1763            // Ensure that the block requests cannot be inserted twice.
1764            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1765            // Check that the block requests are still inserted.
1766            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1767            assert!(sync.get_block_request_timestamp(height).is_some());
1768        }
1769    }
1770
1771    #[test]
1772    fn test_insert_block_requests_fails() {
1773        let sync = sample_sync_at_height(9);
1774
1775        // Add a peer.
1776        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1777
1778        // Inserting a block height that is already in the ledger should fail.
1779        sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1780        // Inserting a block height that is not in the ledger should succeed.
1781        sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1782    }
1783
1784    #[test]
1785    fn test_update_peer_locators() {
1786        let sync = sample_sync_at_height(0);
1787
1788        // Test 2 peers.
1789        let peer1_ip = sample_peer_ip(1);
1790        for peer1_height in 0..500u32 {
1791            sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1792            assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1793
1794            let peer2_ip = sample_peer_ip(2);
1795            for peer2_height in 0..500u32 {
1796                println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1797
1798                sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1799                assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1800
1801                // Compute the distance between the peers.
1802                let distance = peer1_height.abs_diff(peer2_height);
1803
1804                // Check the common ancestor.
1805                if distance < NUM_RECENT_BLOCKS as u32 {
1806                    let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1807                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1808                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1809                } else {
1810                    let min_checkpoints =
1811                        core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1812                    let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1813                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1814                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1815                }
1816            }
1817        }
1818    }
1819
1820    #[test]
1821    fn test_remove_peer() {
1822        let sync = sample_sync_at_height(0);
1823
1824        let peer_ip = sample_peer_ip(1);
1825        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1826        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1827
1828        sync.remove_peer(&peer_ip);
1829        assert_eq!(sync.get_peer_height(&peer_ip), None);
1830
1831        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1832        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1833
1834        sync.remove_peer(&peer_ip);
1835        assert_eq!(sync.get_peer_height(&peer_ip), None);
1836    }
1837
1838    #[test]
1839    fn test_locators_insert_remove_insert() {
1840        let sync = sample_sync_at_height(0);
1841
1842        let peer_ip = sample_peer_ip(1);
1843        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1844        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1845
1846        sync.remove_peer(&peer_ip);
1847        assert_eq!(sync.get_peer_height(&peer_ip), None);
1848
1849        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1850        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1851    }
1852
1853    #[test]
1854    fn test_requests_insert_remove_insert() {
1855        let rng = &mut TestRng::default();
1856        let sync = sample_sync_at_height(0);
1857
1858        // Add a peer.
1859        let peer_ip = sample_peer_ip(1);
1860        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1861
1862        // Prepare the block requests.
1863        let (requests, sync_peers) = sync.prepare_block_requests();
1864        assert_eq!(requests.len(), 10);
1865
1866        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1867            // Construct the sync IPs.
1868            let sync_ips: IndexSet<_> =
1869                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1870            // Insert the block request.
1871            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1872            // Check that the block requests were inserted.
1873            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1874            assert!(sync.get_block_request_timestamp(height).is_some());
1875        }
1876
1877        // Remove the peer.
1878        sync.remove_peer(&peer_ip);
1879
1880        for (height, _) in requests {
1881            // Check that the block requests were removed.
1882            assert_eq!(sync.get_block_request(height), None);
1883            assert!(sync.get_block_request_timestamp(height).is_none());
1884        }
1885
1886        // As there is no peer, it should not be possible to prepare block requests.
1887        let (requests, _) = sync.prepare_block_requests();
1888        assert_eq!(requests.len(), 0);
1889
1890        // Add the peer again.
1891        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1892
1893        // Prepare the block requests.
1894        let (requests, _) = sync.prepare_block_requests();
1895        assert_eq!(requests.len(), 10);
1896
1897        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1898            // Construct the sync IPs.
1899            let sync_ips: IndexSet<_> =
1900                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1901            // Insert the block request.
1902            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1903            // Check that the block requests were inserted.
1904            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1905            assert!(sync.get_block_request_timestamp(height).is_some());
1906        }
1907    }
1908
1909    #[test]
1910    fn test_obsolete_block_requests() {
1911        let rng = &mut TestRng::default();
1912        let sync = sample_sync_at_height(0);
1913
1914        let locator_height = rng.gen_range(0..50);
1915
1916        // Add a peer.
1917        let locators = sample_block_locators(locator_height);
1918        sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1919
1920        // Construct block requests
1921        let (requests, sync_peers) = sync.prepare_block_requests();
1922        assert_eq!(requests.len(), locator_height as usize);
1923
1924        // Add the block requests to the sync module.
1925        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1926            // Construct the sync IPs.
1927            let sync_ips: IndexSet<_> =
1928                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1929            // Insert the block request.
1930            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1931            // Check that the block requests were inserted.
1932            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1933            assert!(sync.get_block_request_timestamp(height).is_some());
1934        }
1935
1936        // Duplicate a new sync module with a different height to simulate block advancement.
1937        // This range needs to be inclusive, so that the range is never empty,
1938        // even with a locator height of 0.
1939        let ledger_height = rng.gen_range(0..=locator_height);
1940        let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1941
1942        // Check that the number of requests is the same.
1943        assert_eq!(new_sync.requests.read().len(), requests.len());
1944
1945        // Remove timed out block requests.
1946        let c = DummyPeerPoolHandler::default();
1947        new_sync.handle_block_request_timeouts(&c).unwrap();
1948
1949        // Check that the number of requests is reduced based on the ledger height.
1950        assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1951    }
1952
1953    #[test]
1954    fn test_timed_out_block_request() {
1955        let sync = sample_sync_at_height(0);
1956        let peer_ip = sample_peer_ip(1);
1957        let locators = sample_block_locators(10);
1958        let block_hash = locators.get_hash(1);
1959
1960        sync.update_peer_locators(peer_ip, &locators).unwrap();
1961
1962        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1963
1964        // Add a timed-out request
1965        sync.requests.write().insert(1, OutstandingRequest {
1966            request: (block_hash, None, [peer_ip].into()),
1967            timestamp,
1968            response: None,
1969        });
1970
1971        assert_eq!(sync.requests.read().len(), 1);
1972        assert_eq!(sync.locators.read().len(), 1);
1973
1974        // Remove timed out block requests.
1975        let c = DummyPeerPoolHandler::default();
1976        sync.handle_block_request_timeouts(&c).unwrap();
1977
1978        let ban_list = c.peers_to_ban.write();
1979        assert_eq!(ban_list.len(), 1);
1980        assert_eq!(ban_list.iter().next(), Some(&peer_ip));
1981
1982        assert!(sync.requests.read().is_empty());
1983        assert!(sync.locators.read().is_empty());
1984    }
1985
1986    #[test]
1987    fn test_reissue_timed_out_block_request() {
1988        let sync = sample_sync_at_height(0);
1989        let peer_ip1 = sample_peer_ip(1);
1990        let peer_ip2 = sample_peer_ip(2);
1991        let peer_ip3 = sample_peer_ip(3);
1992
1993        let locators = sample_block_locators(10);
1994        let block_hash1 = locators.get_hash(1);
1995        let block_hash2 = locators.get_hash(2);
1996
1997        sync.update_peer_locators(peer_ip1, &locators).unwrap();
1998        sync.update_peer_locators(peer_ip2, &locators).unwrap();
1999        sync.update_peer_locators(peer_ip3, &locators).unwrap();
2000
2001        assert_eq!(sync.locators.read().len(), 3);
2002
2003        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2004
2005        // Add a timed-out request
2006        sync.requests.write().insert(1, OutstandingRequest {
2007            request: (block_hash1, None, [peer_ip1].into()),
2008            timestamp,
2009            response: None,
2010        });
2011
2012        // Add a timed-out request
2013        sync.requests.write().insert(2, OutstandingRequest {
2014            request: (block_hash2, None, [peer_ip2].into()),
2015            timestamp: Instant::now(),
2016            response: None,
2017        });
2018
2019        assert_eq!(sync.requests.read().len(), 2);
2020
2021        // Remove timed out block requests.
2022        let c = DummyPeerPoolHandler::default();
2023
2024        let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2025
2026        let ban_list = c.peers_to_ban.write();
2027        assert_eq!(ban_list.len(), 1);
2028        assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
2029
2030        assert_eq!(sync.requests.read().len(), 1);
2031        assert_eq!(sync.locators.read().len(), 2);
2032
2033        let (new_requests, new_sync_ips) = re_requests.unwrap();
2034        assert_eq!(new_requests.len(), 1);
2035
2036        let (height, (hash, _, _)) = new_requests.first().unwrap();
2037        assert_eq!(*height, 1);
2038        assert_eq!(*hash, block_hash1);
2039        assert_eq!(new_sync_ips.len(), 2);
2040
2041        // Make sure the removed peer is not in the sync_peer set.
2042        let mut iter = new_sync_ips.iter();
2043        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2044        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2045    }
2046}