Skip to main content

snarkos_node_sync/
block_sync.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18    locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_network::PeerPoolHandling;
22use snarkos_node_router::messages::DataBlocks;
23use snarkos_node_sync_communication_service::CommunicationService;
24use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
25
26use snarkvm::{
27    console::network::{ConsensusVersion, Network},
28    prelude::block::Block,
29    utilities::flatten_error,
30};
31
32use anyhow::{Result, bail, ensure};
33use indexmap::{IndexMap, IndexSet};
34use itertools::Itertools;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(feature = "locktick")]
38use locktick::tokio::Mutex as TMutex;
39#[cfg(not(feature = "locktick"))]
40use parking_lot::RwLock;
41use rand::seq::{IteratorRandom, SliceRandom};
42use std::{
43    collections::{BTreeMap, HashMap, HashSet, hash_map},
44    net::{IpAddr, Ipv4Addr, SocketAddr},
45    sync::Arc,
46    time::{Duration, Instant},
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::sync::Notify;
51
52mod helpers;
53use helpers::rangify_heights;
54
55mod sync_state;
56use sync_state::SyncState;
57
58mod metrics;
59use metrics::BlockSyncMetrics;
60
61// The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator
62// by requiring multiple peers to advertise the same (prefix of) block locators.
63// However, we do not use this in production yet.
64#[cfg(not(test))]
65pub const REDUNDANCY_FACTOR: usize = 1;
66#[cfg(test)]
67pub const REDUNDANCY_FACTOR: usize = 3;
68
69/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection.
70///
71/// The current rate limit for all messages is around 160k  per second (see [`Gateway::max_cache_events`]).
72/// This constant limits number of block requests to a much lower 100 per second.
73///
74// TODO(kaimast): base rate limits on how many requests were sent to each peer instead.
75pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
76
77const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
78const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
79
80const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
81
82/// The maximum number of outstanding block requests.
83/// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses.
84const MAX_BLOCK_REQUESTS: usize = 50; // 50 requests
85
86/// The maximum number of blocks tolerated before the primary is considered behind its peers.
87pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks
88
89/// This is a dummy IP address that is used to represent the local node.
90/// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections.
91pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
92
93/// Handle to an outstanding requested, containing the request itself and its timestamp.
94/// This does not contain the response so that checking for responses does not require iterating over all requests.
95#[derive(Clone)]
96struct OutstandingRequest<N: Network> {
97    request: SyncRequest<N>,
98    timestamp: Instant,
99    /// The corresponding response (if any).
100    /// This is guaranteed to be Some if sync_ips for the given request are empty.
101    response: Option<Block<N>>,
102}
103
104/// Information about a block request (used for the REST API).
105#[derive(Clone, serde::Serialize)]
106pub struct BlockRequestInfo {
107    /// Seconds since the request was created
108    elapsed: u64,
109    /// Has the request been responded to?
110    done: bool,
111}
112
113/// Summary of completed all in-flight requests.
114#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestsSummary {
116    outstanding: String,
117    completed: String,
118}
119
120#[derive(thiserror::Error, Debug)]
121pub enum InsertBlockResponseError {
122    #[error("Empty block response")]
123    EmptyBlockResponse,
124    #[error("The peer did not send a consensus version")]
125    NoConsensusVersion,
126    #[error(
127        "The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
128    )]
129    ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
130    #[error("{}", flatten_error(.0))]
131    Other(#[from] anyhow::Error),
132}
133
134impl<N: Network> OutstandingRequest<N> {
135    /// Get a reference to the IPs of peers that have not responded to the request (yet).
136    fn sync_ips(&self) -> &IndexSet<SocketAddr> {
137        let (_, _, sync_ips) = &self.request;
138        sync_ips
139    }
140
141    /// Get a mutable reference to the IPs of peers that have not responded to the request (yet).
142    fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
143        let (_, _, sync_ips) = &mut self.request;
144        sync_ips
145    }
146}
147
148/// A struct that keeps track of synchronizing blocks with other nodes.
149///
150/// It generates requests to send to other peers and processes responses to those requests.
151/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from.
152///
153/// # Notes
154/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators).
155///
156/// - Validators only sync from other nodes using this struct if they fall behind, e.g.,
157///   because they experience a network partition.
158///   In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved
159///   by a supermajority of the committee.
160pub struct BlockSync<N: Network> {
161    /// The ledger.
162    ledger: Arc<dyn LedgerService<N>>,
163
164    /// The map of peer IP to their block locators.
165    /// The block locators are consistent with the ledger and every other peer's block locators.
166    locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
167
168    /// The map of peer-to-peer to their common ancestor.
169    /// This map is used to determine which peers to request blocks from.
170    ///
171    /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first.
172    common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
173
174    /// The block requests in progress and their responses.
175    requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
176
177    /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
178    ///
179    /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first.
180    sync_state: RwLock<SyncState>,
181
182    /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time.
183    advance_with_sync_blocks_lock: TMutex<()>,
184
185    /// Gets notified when there was an update to the locators or a peer disconnected.
186    peer_notify: Notify,
187
188    /// Gets notified when we received a new block response.
189    response_notify: Notify,
190
191    /// Tracks sync speed
192    metrics: BlockSyncMetrics,
193}
194
195impl<N: Network> BlockSync<N> {
196    /// Initializes a new block sync module.
197    pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
198        // Make sync state aware of the blocks that already exist on disk at startup.
199        let sync_state = SyncState::new_with_height(ledger.latest_block_height());
200
201        Self {
202            ledger,
203            sync_state: RwLock::new(sync_state),
204            peer_notify: Default::default(),
205            response_notify: Default::default(),
206            locators: Default::default(),
207            requests: Default::default(),
208            common_ancestors: Default::default(),
209            advance_with_sync_blocks_lock: Default::default(),
210            metrics: Default::default(),
211        }
212    }
213
214    /// Blocks until something about a peer changes,
215    /// or block request has been fully processed (either successfully or unsuccessfully).
216    ///
217    /// Used by the outgoing task.
218    pub async fn wait_for_peer_update(&self) {
219        self.peer_notify.notified().await
220    }
221
222    /// Blocks until there is a new response to a block request.
223    ///
224    /// Used by the incoming task.
225    pub async fn wait_for_block_responses(&self) {
226        self.response_notify.notified().await
227    }
228
229    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
230    #[inline]
231    pub fn is_block_synced(&self) -> bool {
232        self.sync_state.read().is_block_synced()
233    }
234
235    /// Returns `true` if there a blocks to fetch or responses to process.
236    ///
237    /// This will always return true if [`Self::is_block_synced`] returns false,
238    /// but it can return true when [`Self::is_block_synced`] returns true
239    /// (due to the latter having a tolerance of one block).
240    #[inline]
241    pub fn can_block_sync(&self) -> bool {
242        self.sync_state.read().can_block_sync() || self.has_pending_responses()
243    }
244
245    /// Returns the number of blocks the node is behind the greatest peer height,
246    /// or `None` if no peers are connected yet.
247    #[inline]
248    pub fn num_blocks_behind(&self) -> Option<u32> {
249        self.sync_state.read().num_blocks_behind()
250    }
251
252    /// Returns the greatest block height of any connected peer.
253    #[inline]
254    pub fn greatest_peer_block_height(&self) -> Option<u32> {
255        self.sync_state.read().get_greatest_peer_height()
256    }
257
258    /// Returns the current sync height of this node.
259    /// The sync height is always greater or equal to the ledger height.
260    #[inline]
261    pub fn get_sync_height(&self) -> u32 {
262        self.sync_state.read().get_sync_height()
263    }
264
265    /// Returns the number of blocks we requested from peers, but have not received yet.
266    #[inline]
267    pub fn num_outstanding_block_requests(&self) -> usize {
268        self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
269    }
270
271    /// The total number of block request, including the ones that have been answered already but not processed yet.
272    #[inline]
273    pub fn num_total_block_requests(&self) -> usize {
274        self.requests.read().len()
275    }
276
277    //// Returns the latest locator height for all known peers.
278    pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
279        self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
280    }
281
282    //// Returns information about all in-flight block requests.
283    pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
284        self.requests
285            .read()
286            .iter()
287            .map(|(height, request)| {
288                (*height, BlockRequestInfo {
289                    done: request.sync_ips().is_empty(),
290                    elapsed: request.timestamp.elapsed().as_secs(),
291                })
292            })
293            .collect()
294    }
295
296    /// Returns a summary of all in-flight requests.
297    pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
298        let completed = self
299            .requests
300            .read()
301            .iter()
302            .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
303            .collect::<Vec<_>>();
304
305        let outstanding = self
306            .requests
307            .read()
308            .iter()
309            .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
310            .collect::<Vec<_>>();
311
312        BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
313    }
314
315    pub fn get_sync_speed(&self) -> f64 {
316        self.metrics.get_sync_speed()
317    }
318}
319
320// Helper functions needed for testing
321#[cfg(test)]
322impl<N: Network> BlockSync<N> {
323    /// Returns the latest block height of the given peer IP.
324    fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
325        self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
326    }
327
328    /// Returns the common ancestor for the given peer pair, if it exists.
329    fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
330        self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
331    }
332
333    /// Returns the block request for the given height, if it exists.
334    fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
335        self.requests.read().get(&height).map(|e| e.request.clone())
336    }
337
338    /// Returns the timestamp of the last time the block was requested, if it exists.
339    fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
340        self.requests.read().get(&height).map(|e| e.timestamp)
341    }
342}
343
344impl<N: Network> BlockSync<N> {
345    /// Returns the block locators.
346    #[inline]
347    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
348        // Retrieve the latest block height.
349        let latest_height = self.ledger.latest_block_height();
350
351        // Initialize the recents map.
352        // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1
353        let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
354        // Retrieve the recent block hashes.
355        for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
356            recents.insert(height, self.ledger.get_block_hash(height)?);
357        }
358
359        // Initialize the checkpoints map.
360        let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
361        // Retrieve the checkpoint block hashes.
362        for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
363            checkpoints.insert(height, self.ledger.get_block_hash(height)?);
364        }
365
366        // Construct the block locators.
367        BlockLocators::new(recents, checkpoints)
368    }
369
370    /// Returns true if there are pending responses to block requests that need to be processed.
371    pub fn has_pending_responses(&self) -> bool {
372        self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
373    }
374
375    /// Send a batch of block requests.
376    pub async fn send_block_requests<C: CommunicationService>(
377        &self,
378        communication: &C,
379        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
380        requests: &[(u32, PrepareSyncRequest<N>)],
381    ) -> bool {
382        let (start_height, max_num_sync_ips) = match requests.first() {
383            Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
384            None => {
385                warn!("Block sync failed - no block requests");
386                return false;
387            }
388        };
389
390        debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
391
392        // Use a randomly sampled subset of the sync IPs.
393        let sync_ips: IndexSet<_> =
394            sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
395
396        // Calculate the end height.
397        let end_height = start_height.saturating_add(requests.len() as u32);
398
399        // Insert the chunk of block requests.
400        for (height, (hash, previous_hash, _)) in requests.iter() {
401            // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk.
402            if let Err(err) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
403                let err = err.context(format!("Failed to insert block request for height {height}"));
404                warn!("{}", flatten_error(&err));
405                return false;
406            }
407        }
408
409        /* Send the block request to the peers */
410
411        // Construct the message.
412        let message = C::prepare_block_request(start_height, end_height);
413
414        // Send the message to the peers.
415        let mut tasks = Vec::with_capacity(sync_ips.len());
416        for sync_ip in sync_ips {
417            let sender = communication.send(sync_ip, message.clone()).await;
418            let task = tokio::spawn(async move {
419                // Ensure the request is sent successfully.
420                match sender {
421                    Some(sender) => {
422                        if let Err(err) = sender.await {
423                            warn!("Failed to send block request to peer '{sync_ip}': {err}");
424                            false
425                        } else {
426                            true
427                        }
428                    }
429                    None => {
430                        warn!("Failed to send block request to peer '{sync_ip}': no such peer");
431                        false
432                    }
433                }
434            });
435
436            tasks.push(task);
437        }
438
439        // Wait for all sends to finish at the same time.
440        for result in futures::future::join_all(tasks).await {
441            let success = match result {
442                Ok(success) => success,
443                Err(err) => {
444                    error!("tokio join error: {err}");
445                    false
446                }
447            };
448
449            // If sending fails for any peer, remove the block request from the sync pool.
450            if !success {
451                // Remove the entire block request from the sync pool.
452                let mut requests = self.requests.write();
453                for height in start_height..end_height {
454                    requests.remove(&height);
455                }
456                // Break out of the loop.
457                return false;
458            }
459        }
460        true
461    }
462
463    /// Inserts a new block response from the given peer IP.
464    ///
465    /// Returns an error if the block was malformed, or we already received a different block for this height.
466    /// This function also removes all block requests from the given peer IP on failure.
467    ///
468    /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`.
469    ///
470    #[inline]
471    pub fn insert_block_responses(
472        &self,
473        peer_ip: SocketAddr,
474        blocks: Vec<Block<N>>,
475        latest_consensus_version: Option<ConsensusVersion>,
476    ) -> Result<(), InsertBlockResponseError> {
477        // Attempt to insert the block responses, and break if we encounter an error.
478        let result = 'outer: {
479            let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
480                break 'outer Err(InsertBlockResponseError::EmptyBlockResponse);
481            };
482
483            let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
484
485            // Perform consensus version check, if possible.
486            // This check is only enabled after nodes have reached V12.
487            if expected_consensus_version >= ConsensusVersion::V12 {
488                if let Some(peer_version) = latest_consensus_version {
489                    if peer_version != expected_consensus_version {
490                        break 'outer Err(InsertBlockResponseError::ConsensusVersionMismatch {
491                            peer_version,
492                            expected_version: expected_consensus_version,
493                            last_height,
494                        });
495                    }
496                } else {
497                    break 'outer Err(InsertBlockResponseError::NoConsensusVersion);
498                }
499            }
500
501            // Insert the candidate blocks into the sync pool.
502            for block in blocks {
503                if let Err(error) = self.insert_block_response(peer_ip, block) {
504                    break 'outer Err(error.into());
505                }
506            }
507
508            Ok(())
509        };
510
511        // On failure, remove all block requests to the peer.
512        if result.is_err() {
513            self.remove_block_requests_to_peer(&peer_ip);
514        }
515
516        // Return the result.
517        result
518    }
519
520    /// Returns the next block for the given `next_height` if the request is complete,
521    /// or `None` otherwise. This does not remove the block from the `responses` map.
522    #[inline]
523    pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
524        // Determine if the request is complete:
525        // either there is no request for `next_height`, or the request has no peer socket addresses left.
526        if let Some(entry) = self.requests.read().get(&next_height) {
527            let is_complete = entry.sync_ips().is_empty();
528            if !is_complete {
529                return None;
530            }
531
532            // If the request is complete, return the block from the responses, if there is one.
533            if entry.response.is_none() {
534                warn!("Request for height {next_height} is complete but no response exists");
535            }
536            entry.response.clone()
537        } else {
538            None
539        }
540    }
541
542    /// Attempts to advance synchronization by processing completed block responses.
543    ///
544    /// Returns true, if new blocks were added to the ledger.
545    ///
546    /// # Usage
547    /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks.
548    /// Validators do not call this function, and instead invoke
549    /// [`snarkos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state.
550    #[inline]
551    pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
552        // Acquire the lock to ensure this function is called only once at a time.
553        // If the lock is already acquired, return early.
554        //
555        // Note: This lock should not be needed anymore as there is only one place we call it from,
556        // but we keep it for now out of caution.
557        // TODO(kaimast): remove this eventually.
558        let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
559            trace!("Skipping attempt to advance block synchronziation as it is already in progress");
560            return Ok(false);
561        };
562
563        // Start with the current height.
564        let mut current_height = self.ledger.latest_block_height();
565        let start_height = current_height;
566        trace!(
567            "Try advancing with block responses (at block {current_height}, current sync speed is {})",
568            self.get_sync_speed()
569        );
570
571        loop {
572            let next_height = current_height + 1;
573
574            let Some(block) = self.peek_next_block(next_height) else {
575                break;
576            };
577
578            // Ensure the block height matches.
579            if block.height() != next_height {
580                warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
581                break;
582            }
583
584            let ledger = self.ledger.clone();
585            let advanced = tokio::task::spawn_blocking(move || {
586                // Try to check the next block and advance to it.
587                match ledger.check_next_block(&block) {
588                    Ok(_) => match ledger.advance_to_next_block(&block) {
589                        Ok(_) => true,
590                        Err(err) => {
591                            let err = err.context(format!(
592                                "Failed to advance to next block (height: {}, hash: '{}')",
593                                block.height(),
594                                block.hash()
595                            ));
596                            warn!("{}", flatten_error(&err));
597                            false
598                        }
599                    },
600                    Err(err) => {
601                        let err = err.context(format!(
602                            "The next block (height: {}, hash: '{}') is invalid",
603                            block.height(),
604                            block.hash()
605                        ));
606                        warn!("{}", flatten_error(&err));
607                        false
608                    }
609                }
610            })
611            .await?;
612
613            // Only count successful requests.
614            if advanced {
615                self.count_request_completed();
616            }
617
618            // Remove the block response.
619            self.remove_block_response(next_height);
620
621            // If advancing failed, exit the loop.
622            if !advanced {
623                break;
624            }
625
626            // Update the latest height.
627            current_height = next_height;
628        }
629
630        if current_height > start_height {
631            self.set_sync_height(current_height);
632            Ok(true)
633        } else {
634            Ok(false)
635        }
636    }
637}
638
639impl<N: Network> BlockSync<N> {
640    /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync.
641    /// This function returns peers that are consistent with each other, and have a block height
642    /// that is greater than the ledger height of this node.
643    ///
644    /// # Locking
645    /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time.
646    pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
647        // Retrieve the current sync height.
648        let current_height = self.get_sync_height();
649
650        if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
651            // Map the locators into the latest height.
652            let sync_peers =
653                sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
654            // Return the sync peers and their minimum common ancestor.
655            Some((sync_peers, min_common_ancestor))
656        } else {
657            None
658        }
659    }
660
661    /// Updates the block locators and common ancestors for the given peer IP.
662    ///
663    /// This function does not need to check that the block locators are well-formed,
664    /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`].
665    ///
666    /// This function does **not** check
667    /// that the block locators are consistent with the peer's previous block locators or other peers' block locators.
668    pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
669        // Update the locators entry for the given peer IP.
670        // We perform this update atomically, and drop the lock as soon as we are done with the update.
671        match self.locators.write().entry(peer_ip) {
672            hash_map::Entry::Occupied(mut e) => {
673                // Return early if the block locators did not change.
674                if e.get() == locators {
675                    return Ok(());
676                }
677
678                let old_height = e.get().latest_locator_height();
679                let new_height = locators.latest_locator_height();
680
681                if old_height > new_height {
682                    debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
683                }
684                e.insert(locators.clone());
685            }
686            hash_map::Entry::Vacant(e) => {
687                e.insert(locators.clone());
688            }
689        }
690
691        // Compute the common ancestor with this node.
692        let new_local_ancestor = {
693            let mut ancestor = 0;
694            // Attention: Please do not optimize this loop, as it performs fork-detection. In addition,
695            // by iterating upwards, it also early-terminates malicious block locators at the *first* point
696            // of bifurcation in their ledger history, which is a critical safety guarantee provided here.
697            for (height, hash) in locators.clone().into_iter() {
698                if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
699                    match ledger_hash == hash {
700                        true => ancestor = height,
701                        false => {
702                            debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
703                            break;
704                        }
705                    }
706                }
707            }
708            ancestor
709        };
710
711        // Compute the common ancestor with every other peer.
712        // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers.
713        let ancestor_updates: Vec<_> = self
714            .locators
715            .read()
716            .iter()
717            .filter_map(|(other_ip, other_locators)| {
718                // Skip if the other peer is the given peer.
719                if other_ip == &peer_ip {
720                    return None;
721                }
722                // Compute the common ancestor with the other peer.
723                let mut ancestor = 0;
724                for (height, hash) in other_locators.clone().into_iter() {
725                    if let Some(expected_hash) = locators.get_hash(height) {
726                        match expected_hash == hash {
727                            true => ancestor = height,
728                            false => {
729                                debug!(
730                                    "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
731                                );
732                                break;
733                            }
734                        }
735                    }
736                }
737
738                Some((PeerPair(peer_ip, *other_ip), ancestor))
739            })
740            .collect();
741
742        // Update the map of common ancestors.
743        // Scope the lock, so it is dropped before locking `sync_state`.
744        {
745            let mut common_ancestors = self.common_ancestors.write();
746            common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
747
748            for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
749                common_ancestors.insert(peer_pair, new_ancestor);
750            }
751        }
752
753        // Update sync state, because the greatest peer height may have decreased.
754        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
755            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
756        } else {
757            error!("Got new block locators but greatest peer height is zero.");
758        }
759
760        // Notify the sync loop that something changed.
761        self.peer_notify.notify_one();
762
763        Ok(())
764    }
765
766    /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe
767    ///  (that we don't rely upon it for safety when we re-connect with the same peer).
768    /// Removes the peer from the sync pool, if they exist.
769    pub fn remove_peer(&self, peer_ip: &SocketAddr) {
770        trace!("Removing peer {peer_ip} from block sync");
771
772        // Remove the locators entry for the given peer IP.
773        self.locators.write().remove(peer_ip);
774        // Remove all common ancestor entries for this peers.
775        self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
776        // Remove all block requests to the peer.
777        self.remove_block_requests_to_peer(peer_ip);
778
779        // Update sync state, because the greatest peer height may have decreased.
780        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
781            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
782        } else {
783            // There are no more peers left.
784            self.sync_state.write().clear_greatest_peer_height();
785        }
786
787        // Notify the sync loop that something changed.
788        self.peer_notify.notify_one();
789    }
790}
791
792// Helper type for prepare_block_requests
793pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
794
795impl<N: Network> BlockSync<N> {
796    /// Returns a list of block requests and the sync peers, if the node needs to sync.
797    ///
798    /// You usually want to call `remove_timed_out_block_requests` before invoking this function.
799    ///
800    /// # Concurrency
801    /// This should be called by at most one task at a time.
802    ///
803    /// # Usage
804    ///  - For validators, the primary spawns exactly one task that periodically calls
805    ///    `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it.
806    ///  - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls
807    ///    `Client::try_issuing_block_requests` which calls this function.
808    ///  - Provers do not call this function.
809    pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
810        // Used to print more information when we max out on requests.
811        let print_requests = || {
812            if tracing::enabled!(tracing::Level::TRACE) {
813                let summary = self.get_block_requests_summary();
814
815                trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
816                trace!("The following requests are still outstanding: {:?}", summary.outstanding);
817            }
818        };
819
820        // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while.
821        let current_height = self.get_sync_height();
822
823        // Ensure to not exceed the maximum number of outstanding block requests.
824        let max_outstanding_block_requests =
825            (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
826
827        // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet.
828        let max_total_requests = 4 * max_outstanding_block_requests;
829
830        let max_new_blocks_to_request =
831            max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
832
833        // Prepare the block requests and sync peers, or returns an empty result if there is nothing to request.
834        if self.num_total_block_requests() >= max_total_requests as usize {
835            trace!(
836                "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
837            );
838
839            print_requests();
840            Default::default()
841        } else if max_new_blocks_to_request == 0 {
842            trace!(
843                "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
844            );
845
846            print_requests();
847            Default::default()
848        } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
849            // Retrieve the greatest block height of any connected peer.
850            // We do not need to update the sync state here, as that already happens when the block locators are received.
851            let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
852
853            // Construct the list of block requests.
854            let requests = self.construct_requests(
855                &sync_peers,
856                current_height,
857                min_common_ancestor,
858                max_new_blocks_to_request,
859                greatest_peer_height,
860            );
861
862            (requests, sync_peers)
863        } else if self.requests.read().is_empty() {
864            // This can happen during a race condition where the node just finished syncing.
865            // It does not make sense to log or change the sync status here.
866            // Checking the sync status here also does not make sense, as the node might as well have switched back
867            //  from `synced` to `syncing` between calling `find_sync_peers_inner` and this line.
868
869            Default::default()
870        } else {
871            // This happens if we already requested all advertised blocks.
872            trace!("No new blocks can be requested, but there are still outstanding requests.");
873
874            print_requests();
875            Default::default()
876        }
877    }
878
879    /// Should only be called by validators when they successfully process a block request.
880    /// (for other nodes this will be automatically called internally)
881    ///
882    /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`.
883    pub fn count_request_completed(&self) {
884        self.metrics.count_request_completed();
885    }
886
887    /// Set the sync height to a the given value.
888    /// This is a no-op if `new_height` is equal or less to the current sync height.
889    pub fn set_sync_height(&self, new_height: u32) {
890        // Scope state lock to avoid locking state and metrics at the same time.
891        let fully_synced = {
892            let mut state = self.sync_state.write();
893            state.set_sync_height(new_height);
894            !state.can_block_sync()
895        };
896
897        if fully_synced {
898            self.metrics.mark_fully_synced();
899        }
900    }
901
902    /// Inserts a block request for the given height.
903    fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
904        // Ensure the block request does not already exist.
905        self.check_block_request(height)?;
906        // Ensure the sync IPs are not empty.
907        ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
908        // Insert the block request.
909        self.requests.write().insert(height, OutstandingRequest {
910            request: (hash, previous_hash, sync_ips),
911            timestamp: Instant::now(),
912            response: None,
913        });
914        Ok(())
915    }
916
917    /// Inserts the given block response, after checking that the request exists and the response is well-formed.
918    /// On success, this function removes the peer IP from the request sync peers and inserts the response.
919    fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
920        // Retrieve the block height.
921        let height = block.height();
922        let mut requests = self.requests.write();
923
924        if self.ledger.contains_block_height(height) {
925            bail!("The sync request was removed because we already advanced");
926        }
927
928        let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
929
930        // Retrieve the request entry for the candidate block.
931        let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
932
933        // Ensure the candidate block hash matches the expected hash.
934        if let Some(expected_hash) = expected_hash {
935            if block.hash() != *expected_hash {
936                bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
937            }
938        }
939        // Ensure the previous block hash matches if it exists.
940        if let Some(expected_previous_hash) = expected_previous_hash {
941            if block.previous_hash() != *expected_previous_hash {
942                bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
943            }
944        }
945        // Ensure the sync pool requested this block from the given peer.
946        if !sync_ips.contains(&peer_ip) {
947            bail!("The sync pool did not request block {height} from '{peer_ip}'")
948        }
949
950        // Remove the peer IP from the request entry.
951        entry.sync_ips_mut().swap_remove(&peer_ip);
952
953        if let Some(existing_block) = &entry.response {
954            // If the candidate block was already present, ensure it is the same block.
955            if block != *existing_block {
956                bail!("Candidate block {height} from '{peer_ip}' is malformed");
957            }
958        } else {
959            entry.response = Some(block.clone());
960        }
961
962        trace!("Received a new and valid block response for height {height}");
963
964        // Notify the sync loop that something changed.
965        self.response_notify.notify_one();
966
967        Ok(())
968    }
969
970    /// Checks that a block request for the given height does not already exist.
971    fn check_block_request(&self, height: u32) -> Result<()> {
972        // Ensure the block height is not already in the ledger.
973        if self.ledger.contains_block_height(height) {
974            bail!("Failed to add block request, as block {height} exists in the ledger");
975        }
976        // Ensure the block height is not already requested.
977        if self.requests.read().contains_key(&height) {
978            bail!("Failed to add block request, as block {height} exists in the requests map");
979        }
980
981        Ok(())
982    }
983
984    /// Removes the block request and response for the given height
985    /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
986    ///
987    /// Precondition: This may only be called after `peek_next_block` has returned `Some`,
988    /// which has checked if the request for the given height is complete
989    /// and there is a block with the given `height` in the `responses` map.
990    pub fn remove_block_response(&self, height: u32) {
991        // Remove the request entry for the given height.
992        if let Some(e) = self.requests.write().remove(&height) {
993            trace!(
994                "Block request for height {height} was completed in {}ms (sync speed is {})",
995                e.timestamp.elapsed().as_millis(),
996                self.get_sync_speed()
997            );
998
999            // Notify the sending task that less requests are in-flight.
1000            self.peer_notify.notify_one();
1001        }
1002    }
1003
1004    /// Removes all block requests for the given peer IP.
1005    ///
1006    /// This is used when disconnecting from a peer or when a peer sends invalid block responses.
1007    fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
1008        trace!("Block sync is removing all block requests to peer {peer_ip}...");
1009
1010        // Remove the peer IP from the requests map. If any request entry is now empty,
1011        // and its corresponding response entry is also empty, then remove that request entry altogether.
1012        self.requests.write().retain(|height, e| {
1013            let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
1014
1015            // Only remove requests that were sent to this peer, that have no other peer that can respond instead,
1016            // and that were not completed yet.
1017            let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
1018            if !retain {
1019                trace!("Removed block request timestamp for {peer_ip} at height {height}");
1020            }
1021            retain
1022        });
1023
1024        // No need to remove responses here, because requests with responses will be retained.
1025    }
1026
1027    /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time.
1028    ///
1029    /// This removes the corresponding block responses and returns the set of peers/addresses that timed out.
1030    /// It will ask the peer pool handling service to ban any timed-out peers.
1031    ///
1032    /// # Return Value
1033    /// On success it will return `None` if there is nothing to re-request, or a set of new of block requests that replaced the timed-out requests.
1034    /// This set of new requests can also replace requests that timed out earlier, and which we were not able to re-request yet.
1035    ///
1036    /// This function will return an error if it cannot re-request blocks due to a lack of peers.
1037    /// In this case, the current iteration of block synchronization should not continue and the node should re-try later instead.
1038    pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1039        &self,
1040        _peer_pool_handler: &P,
1041    ) -> Result<Option<BlockRequestBatch<N>>> {
1042        // Acquire the write lock on the requests map.
1043        let mut requests = self.requests.write();
1044
1045        // Retrieve the current time.
1046        let now = Instant::now();
1047
1048        // Retrieve the current block height
1049        let current_height = self.ledger.latest_block_height();
1050
1051        // Track the number of timed out block requests (only used to print a log message).
1052        let mut timed_out_requests = vec![];
1053
1054        // Track which peers should be banned due to unresponsiveness.
1055        let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1056
1057        // Remove timed out block requests.
1058        requests.retain(|height, e| {
1059            let is_obsolete = *height <= current_height;
1060            // Determine if the duration since the request timestamp has exceeded the request timeout.
1061            let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1062            // Determine if the request is incomplete.
1063            let is_complete = e.sync_ips().is_empty();
1064
1065            // Determine if the request has timed out.
1066            let is_timeout = timer_elapsed && !is_complete;
1067
1068            // Retain if this is not a timeout and is not obsolete.
1069            let retain = !is_timeout && !is_obsolete;
1070
1071            if is_timeout {
1072                trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1073
1074                // Increment the number of timed out block requests.
1075                timed_out_requests.push(*height);
1076            } else if is_obsolete {
1077                trace!("Block request at height {height} became obsolete (current_height={current_height})");
1078            }
1079
1080            // If the request timed out, also remove and ban given peer.
1081            if is_timeout {
1082                for peer_ip in e.sync_ips().iter() {
1083                    peers_to_ban.insert(*peer_ip);
1084                }
1085            }
1086
1087            retain
1088        });
1089
1090        if !timed_out_requests.is_empty() {
1091            debug!("{num} block requests timed out", num = timed_out_requests.len());
1092        }
1093
1094        let next_request_height = requests.iter().next().map(|(h, _)| *h);
1095
1096        // Avoid locking `locators` and `requests` at the same time.
1097        drop(requests);
1098
1099        // Now remove and ban any unresponsive peers
1100        for peer_ip in peers_to_ban {
1101            self.remove_peer(&peer_ip);
1102            // TODO: Uncomment this when we have a more rigorous analysis and testing of peer banning.
1103            // peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1104        }
1105
1106        // Determine if we need to re-issue any timed-out requests.
1107        // If there are no requests remaining or no gap at the beginning,
1108        // we do not need to re-issue requests and will just issue them regularly.
1109        //
1110        // This needs to be checked even if timed_out_requests is empty, because we might not be able to re-issue
1111        // requests immediately if there are no other peers at a given time.
1112        // Further, this only closes the first gap. So multiple calls to this might be needed.
1113        let sync_height = self.get_sync_height();
1114        let start_height = sync_height + 1;
1115
1116        let end_height = if let Some(next_height) = next_request_height
1117            && next_height > start_height
1118        {
1119            // The end height is exclusive, so use the height of the first existing block requests as the end
1120            next_height
1121        } else {
1122            // Nothing to do.
1123            // Do not log here as this check happens frequently.
1124            return Ok(None);
1125        };
1126
1127        // Set the maximum number of blocks, so that they do not exceed the end height.
1128        let max_new_blocks_to_request = end_height - start_height;
1129
1130        let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1131            // This generally shouldn't happen, because there cannot be outstanding requests when no peers are connected.
1132            bail!("Cannot re-request blocks because no or not enough peers are connected");
1133        };
1134
1135        // Retrieve the greatest block height of any connected peer.
1136        let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1137            // This should never happen because `sync_peers` is guaranteed to be non-empty.
1138            bail!("Cannot re-request blocks because no or not enough peers are connected");
1139        };
1140
1141        // (Try to) construct the requests.
1142        let requests = self.construct_requests(
1143            &sync_peers,
1144            sync_height,
1145            min_common_ancestor,
1146            max_new_blocks_to_request,
1147            greatest_peer_height,
1148        );
1149
1150        // If the ledger advanced concurrenctly, there may be no requests to issue after all.
1151        // The given height may also be greater `start_height` due to concurerent block advancement.
1152        if let Some((height, _)) = requests.as_slice().first() {
1153            debug!("Re-requesting blocks starting at height {height}");
1154            Ok(Some((requests, sync_peers)))
1155        } else {
1156            // Do not log here as this constitutes a benign race condition.
1157            Ok(None)
1158        }
1159    }
1160
1161    /// Finds the peers to sync from and the shared common ancestor, starting at the give height.
1162    ///
1163    /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer.
1164    /// Returns `None` if there are no peers to sync from.
1165    ///
1166    /// # Locking
1167    /// This function will read-lock `common_ancestors`.
1168    fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1169        // Retrieve the latest ledger height.
1170        let latest_ledger_height = self.ledger.latest_block_height();
1171
1172        // Pick a set of peers above the latest ledger height, and include their locators.
1173        // This will sort the peers by locator height in descending order.
1174        let candidate_locators: IndexMap<_, _> = self
1175            .locators
1176            .read()
1177            .iter()
1178            .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1179            .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1180            .take(NUM_SYNC_CANDIDATE_PEERS)
1181            .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1182            .collect();
1183
1184        // Case 0: If there are no candidate peers, return `None`.
1185        if candidate_locators.is_empty() {
1186            trace!("Found no sync peers with height greater {current_height}");
1187            return None;
1188        }
1189
1190        // TODO (howardwu): Change this to the highest cumulative weight for Phase 3.
1191        // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height,
1192        // then pick the peer with the highest height, and find peers (up to extra redundancy) with
1193        // a common ancestor above the block request range. Set the end height to their common ancestor.
1194
1195        // Determine the threshold number of peers to sync from.
1196        let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1197
1198        // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height
1199        // and a cohort of peers who share a common ancestor above this node's latest ledger height.
1200        for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1201            // The height of the common ancestor shared by all selected peers.
1202            let mut min_common_ancestor = peer_locators.latest_locator_height();
1203
1204            // The peers we will synchronize from.
1205            // As the previous iteration did not succeed, restart with the next candidate peers.
1206            let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1207
1208            // Try adding other peers consistent with this one to the sync peer set.
1209            for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1210                // Check if these two peers have a common ancestor above the latest ledger height.
1211                if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1212                    // If so, then check that their block locators are consistent.
1213                    if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1214                        // If their common ancestor is less than the minimum common ancestor, then update it.
1215                        min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1216
1217                        // Add the other peer to the list of sync peers.
1218                        sync_peers.push((*other_ip, other_locators.clone()));
1219                    }
1220                }
1221            }
1222
1223            // If we have enough sync peers above the latest ledger height, finish and return them.
1224            if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1225                // Shuffle the sync peers prior to returning. This ensures the rest of the stack
1226                // does not rely on the order of the sync peers, and that the sync peers are not biased.
1227                sync_peers.shuffle(&mut rand::thread_rng());
1228
1229                // Collect into an IndexMap and return.
1230                return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1231            }
1232        }
1233
1234        // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None.
1235        None
1236    }
1237
1238    /// Given the sync peers and their minimum common ancestor, return a list of block requests.
1239    fn construct_requests(
1240        &self,
1241        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1242        sync_height: u32,
1243        min_common_ancestor: u32,
1244        max_blocks_to_request: u32,
1245        greatest_peer_height: u32,
1246    ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1247        // Compute the start height for the block requests.
1248        let start_height = {
1249            let requests = self.requests.read();
1250            let ledger_height = self.ledger.latest_block_height();
1251
1252            // Do not issue requests for blocks already contained in the ledger.
1253            let mut start_height = ledger_height.max(sync_height + 1);
1254
1255            // Do not issue requests that already exist.
1256            while requests.contains_key(&start_height) {
1257                start_height += 1;
1258            }
1259
1260            start_height
1261        };
1262
1263        // If the minimum common ancestor is below the start height, then return early.
1264        if min_common_ancestor < start_height {
1265            if start_height < greatest_peer_height {
1266                trace!(
1267                    "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1268                );
1269            }
1270            return Default::default();
1271        }
1272
1273        // Compute the end height for the block request.
1274        let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1275
1276        // Construct the block hashes to request.
1277        let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1278        // Track the largest number of sync IPs required for any block request in the sequence of requests.
1279        let mut max_num_sync_ips = 1;
1280
1281        for height in start_height..end_height {
1282            // Ensure the current height is not in the ledger or already requested.
1283            if let Err(err) = self.check_block_request(height) {
1284                trace!("{err}");
1285
1286                // If the sequence of block requests is interrupted, then return early.
1287                // Otherwise, continue until the first start height that is new.
1288                match request_hashes.is_empty() {
1289                    true => continue,
1290                    false => break,
1291                }
1292            }
1293
1294            // Construct the block request.
1295            let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1296
1297            // Handle the dishonest case.
1298            if !is_honest {
1299                // TODO (howardwu): Consider performing an integrity check on peers (to disconnect).
1300                warn!("Detected dishonest peer(s) when preparing block request");
1301                // If there are not enough peers in the dishonest case, then return early.
1302                if sync_peers.len() < num_sync_ips {
1303                    break;
1304                }
1305            }
1306
1307            // Update the maximum number of sync IPs.
1308            max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1309
1310            // Append the request.
1311            request_hashes.insert(height, (hash, previous_hash));
1312        }
1313
1314        // Construct the requests with the same sync ips.
1315        request_hashes
1316            .into_iter()
1317            .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1318            .collect()
1319    }
1320}
1321
1322/// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash,
1323/// in order to allow the caller to determine what to do.
1324fn construct_request<N: Network>(
1325    height: u32,
1326    sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1327) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1328    let mut hash = None;
1329    let mut hash_redundancy: usize = 0;
1330    let mut previous_hash = None;
1331    let mut is_honest = true;
1332
1333    for peer_locators in sync_peers.values() {
1334        if let Some(candidate_hash) = peer_locators.get_hash(height) {
1335            match hash {
1336                // Increment the redundancy count if the hash matches.
1337                Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1338                // Some peer is dishonest.
1339                Some(_) => {
1340                    hash = None;
1341                    hash_redundancy = 0;
1342                    previous_hash = None;
1343                    is_honest = false;
1344                    break;
1345                }
1346                // Set the hash if it is not set.
1347                None => {
1348                    hash = Some(candidate_hash);
1349                    hash_redundancy = 1;
1350                }
1351            }
1352        }
1353        if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1354            match previous_hash {
1355                // Increment the redundancy count if the previous hash matches.
1356                Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1357                // Some peer is dishonest.
1358                Some(_) => {
1359                    hash = None;
1360                    hash_redundancy = 0;
1361                    previous_hash = None;
1362                    is_honest = false;
1363                    break;
1364                }
1365                // Set the previous hash if it is not set.
1366                None => previous_hash = Some(candidate_previous_hash),
1367            }
1368        }
1369    }
1370
1371    // Note that we intentionally do not just pick the peers that have the hash we have chosen,
1372    // to give stronger confidence that we are syncing during times when the network is consistent/stable.
1373    let num_sync_ips = {
1374        // Extra redundant peers - as the block hash was dishonest.
1375        if !is_honest {
1376            // Choose up to the extra redundancy factor in sync peers.
1377            EXTRA_REDUNDANCY_FACTOR
1378        }
1379        // No redundant peers - as we have redundancy on the block hash.
1380        else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1381            // Choose one sync peer.
1382            1
1383        }
1384        // Redundant peers - as we do not have redundancy on the block hash.
1385        else {
1386            // Choose up to the redundancy factor in sync peers.
1387            REDUNDANCY_FACTOR
1388        }
1389    };
1390
1391    (hash, previous_hash, num_sync_ips, is_honest)
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396    use super::*;
1397    use crate::locators::{
1398        CHECKPOINT_INTERVAL,
1399        NUM_RECENT_BLOCKS,
1400        test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1401    };
1402
1403    use snarkos_node_bft_ledger_service::MockLedgerService;
1404    use snarkos_node_network::{NodeType, Peer, Resolver};
1405    use snarkos_node_tcp::{P2P, Tcp};
1406    use snarkvm::{
1407        ledger::committee::Committee,
1408        prelude::{Field, TestRng},
1409    };
1410
1411    use indexmap::{IndexSet, indexset};
1412    #[cfg(feature = "locktick")]
1413    use locktick::parking_lot::RwLock;
1414    #[cfg(not(feature = "locktick"))]
1415    use parking_lot::RwLock;
1416    use rand::Rng;
1417    use std::net::{IpAddr, Ipv4Addr};
1418
1419    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1420
1421    #[derive(Default)]
1422    struct DummyPeerPoolHandler {
1423        peers_to_ban: RwLock<Vec<SocketAddr>>,
1424    }
1425
1426    impl P2P for DummyPeerPoolHandler {
1427        fn tcp(&self) -> &Tcp {
1428            unreachable!();
1429        }
1430    }
1431
1432    impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1433        const MAXIMUM_POOL_SIZE: usize = 10;
1434        const OWNER: &str = "[DummyPeerPoolHandler]";
1435        const PEER_SLASHING_COUNT: usize = 0;
1436
1437        fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1438            unreachable!();
1439        }
1440
1441        fn resolver(&self) -> &RwLock<Resolver<N>> {
1442            unreachable!();
1443        }
1444
1445        fn is_dev(&self) -> bool {
1446            true
1447        }
1448
1449        fn trusted_peers_only(&self) -> bool {
1450            false
1451        }
1452
1453        fn node_type(&self) -> NodeType {
1454            NodeType::Client
1455        }
1456
1457        fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1458            self.peers_to_ban.write().push(listener_addr);
1459        }
1460    }
1461
1462    /// Returns the peer IP for the sync pool.
1463    fn sample_peer_ip(id: u16) -> SocketAddr {
1464        assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1465        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1466    }
1467
1468    /// Returns a sample committee.
1469    fn sample_committee() -> Committee<CurrentNetwork> {
1470        let rng = &mut TestRng::default();
1471        snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1472    }
1473
1474    /// Returns the ledger service, initialized to the given height.
1475    fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1476        MockLedgerService::new_at_height(sample_committee(), height)
1477    }
1478
1479    /// Returns the sync pool, with the ledger initialized to the given height.
1480    fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1481        BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1482    }
1483
1484    /// Returns a vector of randomly sampled block heights in [0, max_height].
1485    ///
1486    /// The maximum value will always be included in the result.
1487    fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1488        assert!(num_values > 0, "Cannot generate an empty vector");
1489        assert!((max_height as usize) >= num_values);
1490
1491        let mut rng = TestRng::default();
1492
1493        let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1494
1495        heights.push(max_height);
1496
1497        heights
1498    }
1499
1500    /// Returns a duplicate (deep copy) of the sync pool with a different ledger height.
1501    fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1502        BlockSync::<CurrentNetwork> {
1503            peer_notify: Notify::new(),
1504            response_notify: Default::default(),
1505            ledger: Arc::new(sample_ledger_service(height)),
1506            locators: RwLock::new(sync.locators.read().clone()),
1507            common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1508            requests: RwLock::new(sync.requests.read().clone()),
1509            sync_state: RwLock::new(sync.sync_state.read().clone()),
1510            advance_with_sync_blocks_lock: Default::default(),
1511            metrics: Default::default(),
1512        }
1513    }
1514
1515    /// Checks that the sync pool (starting at genesis) returns the correct requests.
1516    fn check_prepare_block_requests(
1517        sync: BlockSync<CurrentNetwork>,
1518        min_common_ancestor: u32,
1519        peers: IndexSet<SocketAddr>,
1520    ) {
1521        let rng = &mut TestRng::default();
1522
1523        // Check test assumptions are met.
1524        assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1525
1526        // Determine the number of peers within range of this sync pool.
1527        let num_peers_within_recent_range_of_ledger = {
1528            // If no peers are within range, then set to 0.
1529            if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1530                0
1531            }
1532            // Otherwise, manually check the number of peers within range.
1533            else {
1534                peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1535            }
1536        };
1537
1538        // Prepare the block requests.
1539        let (requests, sync_peers) = sync.prepare_block_requests();
1540
1541        // If there are no peers, then there should be no requests.
1542        if peers.is_empty() {
1543            assert!(requests.is_empty());
1544            return;
1545        }
1546
1547        // Otherwise, there should be requests.
1548        let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1549        assert_eq!(requests.len(), expected_num_requests);
1550
1551        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1552            // Construct the sync IPs.
1553            let sync_ips: IndexSet<_> =
1554                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1555            assert_eq!(height, 1 + idx as u32);
1556            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1557            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1558
1559            if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1560                assert_eq!(sync_ips.len(), 1);
1561            } else {
1562                assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1563                assert_eq!(sync_ips, peers);
1564            }
1565        }
1566    }
1567
1568    /// Tests that height and hash values are set correctly using many different maximum block heights.
1569    #[test]
1570    fn test_latest_block_height() {
1571        for height in generate_block_heights(100_001, 5000) {
1572            let sync = sample_sync_at_height(height);
1573            // Check that the latest block height is the maximum height.
1574            assert_eq!(sync.ledger.latest_block_height(), height);
1575
1576            // Check the hash to height mapping
1577            assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1578            assert_eq!(
1579                sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1580                height
1581            );
1582        }
1583    }
1584
1585    #[test]
1586    fn test_get_block_hash() {
1587        for height in generate_block_heights(100_001, 5000) {
1588            let sync = sample_sync_at_height(height);
1589
1590            // Check the height to hash mapping
1591            assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1592            assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1593        }
1594    }
1595
1596    #[test]
1597    fn test_prepare_block_requests() {
1598        for num_peers in 0..111 {
1599            println!("Testing with {num_peers} peers");
1600
1601            let sync = sample_sync_at_height(0);
1602
1603            let mut peers = indexset![];
1604
1605            for peer_id in 1..=num_peers {
1606                // Add a peer.
1607                sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1608                // Add the peer to the set of peers.
1609                peers.insert(sample_peer_ip(peer_id));
1610            }
1611
1612            // If all peers are ahead, then requests should be prepared.
1613            check_prepare_block_requests(sync, 10, peers);
1614        }
1615    }
1616
1617    #[test]
1618    fn test_prepare_block_requests_with_leading_fork_at_11() {
1619        let sync = sample_sync_at_height(0);
1620
1621        // Intuitively, peer 1's fork is above peer 2 and peer 3's height.
1622        // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork.
1623        // Thus, you can sync up to block 10 from any of the 3 peers.
1624
1625        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11,
1626        // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers.
1627        // This is safe because the leading fork is at 11, and the sync pool is at 0,
1628        // so all candidate peers are at least 10 blocks ahead of the sync pool.
1629
1630        // Add a peer (fork).
1631        let peer_1 = sample_peer_ip(1);
1632        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1633
1634        // Add a peer.
1635        let peer_2 = sample_peer_ip(2);
1636        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1637
1638        // Add a peer.
1639        let peer_3 = sample_peer_ip(3);
1640        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1641
1642        // Prepare the block requests.
1643        let (requests, _) = sync.prepare_block_requests();
1644        assert_eq!(requests.len(), 10);
1645
1646        // Check the requests.
1647        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1648            assert_eq!(height, 1 + idx as u32);
1649            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1650            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1651            assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1652        }
1653    }
1654
1655    #[test]
1656    fn test_prepare_block_requests_with_leading_fork_at_10() {
1657        let rng = &mut TestRng::default();
1658        let sync = sample_sync_at_height(0);
1659
1660        // Intuitively, peer 1's fork is at peer 2 and peer 3's height.
1661        // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked.
1662        // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10.
1663        //
1664        // Now, while you could in theory sync up to block 9 from any of the 3 peers,
1665        // we choose not to do this as either side is likely to disconnect from us,
1666        // and we would rather wait for enough redundant peers before syncing.
1667
1668        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10,
1669        // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers.
1670        // We choose to sync with a cohort of peers that are *consistent* with each other,
1671        // and prioritize from descending heights (so the highest peer gets priority).
1672
1673        // Add a peer (fork).
1674        let peer_1 = sample_peer_ip(1);
1675        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1676
1677        // Add a peer.
1678        let peer_2 = sample_peer_ip(2);
1679        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1680
1681        // Add a peer.
1682        let peer_3 = sample_peer_ip(3);
1683        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1684
1685        // Prepare the block requests.
1686        let (requests, _) = sync.prepare_block_requests();
1687        assert_eq!(requests.len(), 0);
1688
1689        // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests.
1690
1691        // Add a peer.
1692        let peer_4 = sample_peer_ip(4);
1693        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1694
1695        // Prepare the block requests.
1696        let (requests, sync_peers) = sync.prepare_block_requests();
1697        assert_eq!(requests.len(), 10);
1698
1699        // Check the requests.
1700        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1701            // Construct the sync IPs.
1702            let sync_ips: IndexSet<_> =
1703                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1704            assert_eq!(height, 1 + idx as u32);
1705            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1706            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1707            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1708            assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer.
1709        }
1710    }
1711
1712    #[test]
1713    fn test_prepare_block_requests_with_trailing_fork_at_9() {
1714        let rng = &mut TestRng::default();
1715        let sync = sample_sync_at_height(0);
1716
1717        // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers
1718        // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2,
1719        // then you should be able to sync up to block 10, thereby biasing away from peer 3.
1720
1721        // Add a peer (fork).
1722        let peer_1 = sample_peer_ip(1);
1723        sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1724
1725        // Add a peer.
1726        let peer_2 = sample_peer_ip(2);
1727        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1728
1729        // Add a peer.
1730        let peer_3 = sample_peer_ip(3);
1731        sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1732
1733        // Prepare the block requests.
1734        let (requests, _) = sync.prepare_block_requests();
1735        assert_eq!(requests.len(), 0);
1736
1737        // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests.
1738
1739        // Add a peer.
1740        let peer_4 = sample_peer_ip(4);
1741        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1742
1743        // Prepare the block requests.
1744        let (requests, sync_peers) = sync.prepare_block_requests();
1745        assert_eq!(requests.len(), 10);
1746
1747        // Check the requests.
1748        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1749            // Construct the sync IPs.
1750            let sync_ips: IndexSet<_> =
1751                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1752            assert_eq!(height, 1 + idx as u32);
1753            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1754            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1755            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1756            assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer.
1757        }
1758    }
1759
1760    #[test]
1761    fn test_insert_block_requests() {
1762        let rng = &mut TestRng::default();
1763        let sync = sample_sync_at_height(0);
1764
1765        // Add a peer.
1766        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1767
1768        // Prepare the block requests.
1769        let (requests, sync_peers) = sync.prepare_block_requests();
1770        assert_eq!(requests.len(), 10);
1771
1772        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1773            // Construct the sync IPs.
1774            let sync_ips: IndexSet<_> =
1775                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1776            // Insert the block request.
1777            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1778            // Check that the block requests were inserted.
1779            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1780            assert!(sync.get_block_request_timestamp(height).is_some());
1781        }
1782
1783        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1784            // Construct the sync IPs.
1785            let sync_ips: IndexSet<_> =
1786                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1787            // Check that the block requests are still inserted.
1788            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1789            assert!(sync.get_block_request_timestamp(height).is_some());
1790        }
1791
1792        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1793            // Construct the sync IPs.
1794            let sync_ips: IndexSet<_> =
1795                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1796            // Ensure that the block requests cannot be inserted twice.
1797            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1798            // Check that the block requests are still inserted.
1799            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1800            assert!(sync.get_block_request_timestamp(height).is_some());
1801        }
1802    }
1803
1804    #[test]
1805    fn test_insert_block_requests_fails() {
1806        let sync = sample_sync_at_height(9);
1807
1808        // Add a peer.
1809        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1810
1811        // Inserting a block height that is already in the ledger should fail.
1812        sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1813        // Inserting a block height that is not in the ledger should succeed.
1814        sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1815    }
1816
1817    #[test]
1818    fn test_update_peer_locators() {
1819        let sync = sample_sync_at_height(0);
1820
1821        // Test 2 peers.
1822        let peer1_ip = sample_peer_ip(1);
1823        for peer1_height in 0..500u32 {
1824            sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1825            assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1826
1827            let peer2_ip = sample_peer_ip(2);
1828            for peer2_height in 0..500u32 {
1829                println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1830
1831                sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1832                assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1833
1834                // Compute the distance between the peers.
1835                let distance = peer1_height.abs_diff(peer2_height);
1836
1837                // Check the common ancestor.
1838                if distance < NUM_RECENT_BLOCKS as u32 {
1839                    let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1840                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1841                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1842                } else {
1843                    let min_checkpoints =
1844                        core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1845                    let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1846                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1847                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1848                }
1849            }
1850        }
1851    }
1852
1853    #[test]
1854    fn test_remove_peer() {
1855        let sync = sample_sync_at_height(0);
1856
1857        let peer_ip = sample_peer_ip(1);
1858        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1859        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1860
1861        sync.remove_peer(&peer_ip);
1862        assert_eq!(sync.get_peer_height(&peer_ip), None);
1863
1864        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1865        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1866
1867        sync.remove_peer(&peer_ip);
1868        assert_eq!(sync.get_peer_height(&peer_ip), None);
1869    }
1870
1871    #[test]
1872    fn test_locators_insert_remove_insert() {
1873        let sync = sample_sync_at_height(0);
1874
1875        let peer_ip = sample_peer_ip(1);
1876        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1877        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1878
1879        sync.remove_peer(&peer_ip);
1880        assert_eq!(sync.get_peer_height(&peer_ip), None);
1881
1882        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1883        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1884    }
1885
1886    #[test]
1887    fn test_requests_insert_remove_insert() {
1888        let rng = &mut TestRng::default();
1889        let sync = sample_sync_at_height(0);
1890
1891        // Add a peer.
1892        let peer_ip = sample_peer_ip(1);
1893        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1894
1895        // Prepare the block requests.
1896        let (requests, sync_peers) = sync.prepare_block_requests();
1897        assert_eq!(requests.len(), 10);
1898
1899        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1900            // Construct the sync IPs.
1901            let sync_ips: IndexSet<_> =
1902                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1903            // Insert the block request.
1904            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1905            // Check that the block requests were inserted.
1906            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1907            assert!(sync.get_block_request_timestamp(height).is_some());
1908        }
1909
1910        // Remove the peer.
1911        sync.remove_peer(&peer_ip);
1912
1913        for (height, _) in requests {
1914            // Check that the block requests were removed.
1915            assert_eq!(sync.get_block_request(height), None);
1916            assert!(sync.get_block_request_timestamp(height).is_none());
1917        }
1918
1919        // As there is no peer, it should not be possible to prepare block requests.
1920        let (requests, _) = sync.prepare_block_requests();
1921        assert_eq!(requests.len(), 0);
1922
1923        // Add the peer again.
1924        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1925
1926        // Prepare the block requests.
1927        let (requests, _) = sync.prepare_block_requests();
1928        assert_eq!(requests.len(), 10);
1929
1930        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1931            // Construct the sync IPs.
1932            let sync_ips: IndexSet<_> =
1933                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1934            // Insert the block request.
1935            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1936            // Check that the block requests were inserted.
1937            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1938            assert!(sync.get_block_request_timestamp(height).is_some());
1939        }
1940    }
1941
1942    #[test]
1943    fn test_obsolete_block_requests() {
1944        let rng = &mut TestRng::default();
1945        let sync = sample_sync_at_height(0);
1946
1947        let locator_height = rng.gen_range(0..50);
1948
1949        // Add a peer.
1950        let locators = sample_block_locators(locator_height);
1951        sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1952
1953        // Construct block requests
1954        let (requests, sync_peers) = sync.prepare_block_requests();
1955        assert_eq!(requests.len(), locator_height as usize);
1956
1957        // Add the block requests to the sync module.
1958        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1959            // Construct the sync IPs.
1960            let sync_ips: IndexSet<_> =
1961                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1962            // Insert the block request.
1963            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1964            // Check that the block requests were inserted.
1965            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1966            assert!(sync.get_block_request_timestamp(height).is_some());
1967        }
1968
1969        // Duplicate a new sync module with a different height to simulate block advancement.
1970        // This range needs to be inclusive, so that the range is never empty,
1971        // even with a locator height of 0.
1972        let ledger_height = rng.gen_range(0..=locator_height);
1973        let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1974
1975        // Check that the number of requests is the same.
1976        assert_eq!(new_sync.requests.read().len(), requests.len());
1977
1978        // Remove timed out block requests.
1979        let c = DummyPeerPoolHandler::default();
1980        new_sync.handle_block_request_timeouts(&c).unwrap();
1981
1982        // Check that the number of requests is reduced based on the ledger height.
1983        assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1984    }
1985
1986    #[test]
1987    fn test_timed_out_block_request() {
1988        let sync = sample_sync_at_height(0);
1989        let peer_ip = sample_peer_ip(1);
1990        let locators = sample_block_locators(10);
1991        let block_hash = locators.get_hash(1);
1992
1993        sync.update_peer_locators(peer_ip, &locators).unwrap();
1994
1995        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1996
1997        // Add a timed-out request
1998        sync.requests.write().insert(1, OutstandingRequest {
1999            request: (block_hash, None, [peer_ip].into()),
2000            timestamp,
2001            response: None,
2002        });
2003
2004        assert_eq!(sync.requests.read().len(), 1);
2005        assert_eq!(sync.locators.read().len(), 1);
2006
2007        // Remove timed out block requests.
2008        let c = DummyPeerPoolHandler::default();
2009        sync.handle_block_request_timeouts(&c).unwrap();
2010
2011        // let ban_list = c.peers_to_ban.write();
2012        // assert_eq!(ban_list.len(), 1);
2013        // assert_eq!(ban_list.iter().next(), Some(&peer_ip));
2014
2015        assert!(sync.requests.read().is_empty());
2016        assert!(sync.locators.read().is_empty());
2017    }
2018
2019    #[test]
2020    fn test_reissue_timed_out_block_request() {
2021        let sync = sample_sync_at_height(0);
2022        let peer_ip1 = sample_peer_ip(1);
2023        let peer_ip2 = sample_peer_ip(2);
2024        let peer_ip3 = sample_peer_ip(3);
2025
2026        let locators = sample_block_locators(10);
2027        let block_hash1 = locators.get_hash(1);
2028        let block_hash2 = locators.get_hash(2);
2029
2030        sync.update_peer_locators(peer_ip1, &locators).unwrap();
2031        sync.update_peer_locators(peer_ip2, &locators).unwrap();
2032        sync.update_peer_locators(peer_ip3, &locators).unwrap();
2033
2034        assert_eq!(sync.locators.read().len(), 3);
2035
2036        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2037
2038        // Add a timed-out request
2039        sync.requests.write().insert(1, OutstandingRequest {
2040            request: (block_hash1, None, [peer_ip1].into()),
2041            timestamp,
2042            response: None,
2043        });
2044
2045        // Add a timed-out request
2046        sync.requests.write().insert(2, OutstandingRequest {
2047            request: (block_hash2, None, [peer_ip2].into()),
2048            timestamp: Instant::now(),
2049            response: None,
2050        });
2051
2052        assert_eq!(sync.requests.read().len(), 2);
2053
2054        // Remove timed out block requests.
2055        let c = DummyPeerPoolHandler::default();
2056
2057        let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2058
2059        // let ban_list = c.peers_to_ban.write();
2060        // assert_eq!(ban_list.len(), 1);
2061        // assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
2062
2063        assert_eq!(sync.requests.read().len(), 1);
2064        assert_eq!(sync.locators.read().len(), 2);
2065
2066        let (new_requests, new_sync_ips) = re_requests.unwrap();
2067        assert_eq!(new_requests.len(), 1);
2068
2069        let (height, (hash, _, _)) = new_requests.first().unwrap();
2070        assert_eq!(*height, 1);
2071        assert_eq!(*hash, block_hash1);
2072        assert_eq!(new_sync_ips.len(), 2);
2073
2074        // Make sure the removed peer is not in the sync_peer set.
2075        let mut iter = new_sync_ips.iter();
2076        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2077        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2078    }
2079}