snarkos_node_sync/
block_sync.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18    locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_router::{PeerPoolHandling, messages::DataBlocks};
22use snarkos_node_sync_communication_service::CommunicationService;
23use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
24use snarkvm::prelude::{Network, block::Block};
25
26use anyhow::{Result, bail, ensure};
27use indexmap::{IndexMap, IndexSet};
28use itertools::Itertools;
29#[cfg(feature = "locktick")]
30use locktick::parking_lot::RwLock;
31#[cfg(feature = "locktick")]
32use locktick::tokio::Mutex as TMutex;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::RwLock;
35use rand::seq::{IteratorRandom, SliceRandom};
36use std::{
37    collections::{BTreeMap, HashMap, HashSet, hash_map},
38    net::{IpAddr, Ipv4Addr, SocketAddr},
39    sync::Arc,
40    time::{Duration, Instant},
41};
42#[cfg(not(feature = "locktick"))]
43use tokio::sync::Mutex as TMutex;
44use tokio::sync::Notify;
45
46mod helpers;
47use helpers::rangify_heights;
48
49mod sync_state;
50use sync_state::SyncState;
51
52mod metrics;
53use metrics::BlockSyncMetrics;
54
55// The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator
56// by requiring multiple peers to advertise the same (prefix of) block locators.
57// However, we do not use this in production yet.
58#[cfg(not(test))]
59pub const REDUNDANCY_FACTOR: usize = 1;
60#[cfg(test)]
61pub const REDUNDANCY_FACTOR: usize = 3;
62
63/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection.
64///
65/// The current rate limit for all messages is around 160k  per second (see [`Gateway::max_cache_events`]).
66/// This constant limits number of block requests to a much lower 100 per second.
67///
68// TODO(kaimast): base rate limits on how many requests were sent to each peer instead.
69pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
70
71const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
72const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
73
74const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
75
76/// The maximum number of outstanding block requests.
77/// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses.
78const MAX_BLOCK_REQUESTS: usize = 50; // 50 requests
79
80/// The maximum number of blocks tolerated before the primary is considered behind its peers.
81pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks
82
83/// This is a dummy IP address that is used to represent the local node.
84/// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections.
85pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
86
87/// Handle to an outstanding requested, containing the request itself and its timestamp.
88/// This does not contain the response so that checking for responses does not require iterating over all requests.
89#[derive(Clone)]
90struct OutstandingRequest<N: Network> {
91    request: SyncRequest<N>,
92    timestamp: Instant,
93    /// The corresponding response (if any).
94    /// This is guaranteed to be Some if sync_ips for the given request are empty.
95    response: Option<Block<N>>,
96}
97
98/// Information about a block request (used for the REST API).
99#[derive(Clone, serde::Serialize)]
100pub struct BlockRequestInfo {
101    /// Seconds since the request was created
102    elapsed: u64,
103    /// Has the request been responded to?
104    done: bool,
105}
106
107/// Summary of completed all in-flight requests.
108#[derive(Clone, serde::Serialize)]
109pub struct BlockRequestsSummary {
110    outstanding: String,
111    completed: String,
112}
113
114impl<N: Network> OutstandingRequest<N> {
115    /// Get a reference to the IPs of peers that have not responded to the request (yet).
116    fn sync_ips(&self) -> &IndexSet<SocketAddr> {
117        let (_, _, sync_ips) = &self.request;
118        sync_ips
119    }
120
121    /// Get a mutable reference to the IPs of peers that have not responded to the request (yet).
122    fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
123        let (_, _, sync_ips) = &mut self.request;
124        sync_ips
125    }
126}
127
128/// A struct that keeps track of synchronizing blocks with other nodes.
129///
130/// It generates requests to send to other peers and processes responses to those requests.
131/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from.
132///
133/// # Notes
134/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators).
135///
136/// - Validators only sync from other nodes using this struct if they fall behind, e.g.,
137///   because they experience a network partition.
138///   In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved
139///   by a supermajority of the committee.
140pub struct BlockSync<N: Network> {
141    /// The ledger.
142    ledger: Arc<dyn LedgerService<N>>,
143
144    /// The map of peer IP to their block locators.
145    /// The block locators are consistent with the ledger and every other peer's block locators.
146    locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
147
148    /// The map of peer-to-peer to their common ancestor.
149    /// This map is used to determine which peers to request blocks from.
150    ///
151    /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first.
152    common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
153
154    /// The block requests in progress and their responses.
155    requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
156
157    /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
158    ///
159    /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first.
160    sync_state: RwLock<SyncState>,
161
162    /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time.
163    advance_with_sync_blocks_lock: TMutex<()>,
164
165    /// Gets notified when there was an update to the locators or a peer disconnected.
166    peer_notify: Notify,
167
168    /// Gets notified when we received a new block response.
169    response_notify: Notify,
170
171    /// Tracks sync speed
172    metrics: BlockSyncMetrics,
173}
174
175impl<N: Network> BlockSync<N> {
176    /// Initializes a new block sync module.
177    pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
178        // Make sync state aware of the blocks that already exist on disk at startup.
179        let sync_state = SyncState::new_with_height(ledger.latest_block_height());
180
181        Self {
182            ledger,
183            sync_state: RwLock::new(sync_state),
184            peer_notify: Default::default(),
185            response_notify: Default::default(),
186            locators: Default::default(),
187            requests: Default::default(),
188            common_ancestors: Default::default(),
189            advance_with_sync_blocks_lock: Default::default(),
190            metrics: Default::default(),
191        }
192    }
193
194    /// Blocks until something about a peer changes,
195    /// or block request has been fully processed (either successfully or unsuccessfully).
196    ///
197    /// Used by the outgoing task.
198    pub async fn wait_for_peer_update(&self) {
199        self.peer_notify.notified().await
200    }
201
202    /// Blocks until there is a new response to a block request.
203    ///
204    /// Used by the incoming task.
205    pub async fn wait_for_block_responses(&self) {
206        self.response_notify.notified().await
207    }
208
209    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
210    #[inline]
211    pub fn is_block_synced(&self) -> bool {
212        self.sync_state.read().is_block_synced()
213    }
214
215    /// Returns `true` if there a blocks to fetch or responses to process.
216    ///
217    /// This will always return true if [`Self::is_block_synced`] returns false,
218    /// but it can return true when [`Self::is_block_synced`] returns true
219    /// (due to the latter having a tolerance of one block).
220    #[inline]
221    pub fn can_block_sync(&self) -> bool {
222        self.sync_state.read().can_block_sync() || self.has_pending_responses()
223    }
224
225    /// Returns the number of blocks the node is behind the greatest peer height,
226    /// or `None` if no peers are connected yet.
227    #[inline]
228    pub fn num_blocks_behind(&self) -> Option<u32> {
229        self.sync_state.read().num_blocks_behind()
230    }
231
232    /// Returns the greatest block height of any connected peer.
233    #[inline]
234    pub fn greatest_peer_block_height(&self) -> Option<u32> {
235        self.sync_state.read().get_greatest_peer_height()
236    }
237
238    /// Returns the current sync height of this node.
239    /// The sync height is always greater or equal to the ledger height.
240    #[inline]
241    pub fn get_sync_height(&self) -> u32 {
242        self.sync_state.read().get_sync_height()
243    }
244
245    /// Returns the number of blocks we requested from peers, but have not received yet.
246    #[inline]
247    pub fn num_outstanding_block_requests(&self) -> usize {
248        self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
249    }
250
251    /// The total number of block request, including the ones that have been answered already but not processed yet.
252    #[inline]
253    pub fn num_total_block_requests(&self) -> usize {
254        self.requests.read().len()
255    }
256
257    //// Returns the latest locator height for all known peers.
258    pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
259        self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
260    }
261
262    //// Returns information about all in-flight block requests.
263    pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
264        self.requests
265            .read()
266            .iter()
267            .map(|(height, request)| {
268                (*height, BlockRequestInfo {
269                    done: request.sync_ips().is_empty(),
270                    elapsed: request.timestamp.elapsed().as_secs(),
271                })
272            })
273            .collect()
274    }
275
276    /// Returns a summary of all in-flight requests.
277    pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
278        let completed = self
279            .requests
280            .read()
281            .iter()
282            .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
283            .collect::<Vec<_>>();
284
285        let outstanding = self
286            .requests
287            .read()
288            .iter()
289            .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
290            .collect::<Vec<_>>();
291
292        BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
293    }
294
295    pub fn get_sync_speed(&self) -> f64 {
296        self.metrics.get_sync_speed()
297    }
298}
299
300// Helper functions needed for testing
301#[cfg(test)]
302impl<N: Network> BlockSync<N> {
303    /// Returns the latest block height of the given peer IP.
304    fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
305        self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
306    }
307
308    /// Returns the common ancestor for the given peer pair, if it exists.
309    fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
310        self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
311    }
312
313    /// Returns the block request for the given height, if it exists.
314    fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
315        self.requests.read().get(&height).map(|e| e.request.clone())
316    }
317
318    /// Returns the timestamp of the last time the block was requested, if it exists.
319    fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
320        self.requests.read().get(&height).map(|e| e.timestamp)
321    }
322}
323
324impl<N: Network> BlockSync<N> {
325    /// Returns the block locators.
326    #[inline]
327    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
328        // Retrieve the latest block height.
329        let latest_height = self.ledger.latest_block_height();
330
331        // Initialize the recents map.
332        // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1
333        let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
334        // Retrieve the recent block hashes.
335        for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
336            recents.insert(height, self.ledger.get_block_hash(height)?);
337        }
338
339        // Initialize the checkpoints map.
340        let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
341        // Retrieve the checkpoint block hashes.
342        for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
343            checkpoints.insert(height, self.ledger.get_block_hash(height)?);
344        }
345
346        // Construct the block locators.
347        BlockLocators::new(recents, checkpoints)
348    }
349
350    /// Returns true if there are pending responses to block requests that need to be processed.
351    pub fn has_pending_responses(&self) -> bool {
352        self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
353    }
354
355    /// Send a batch of block requests.
356    pub async fn send_block_requests<C: CommunicationService>(
357        &self,
358        communication: &C,
359        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
360        requests: &[(u32, PrepareSyncRequest<N>)],
361    ) -> bool {
362        let (start_height, max_num_sync_ips) = match requests.first() {
363            Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
364            None => {
365                warn!("Block sync failed - no block requests");
366                return false;
367            }
368        };
369
370        debug!("Sending {len} block requests to peers {peers:?}", len = requests.len(), peers = sync_peers.keys());
371
372        // Use a randomly sampled subset of the sync IPs.
373        let sync_ips: IndexSet<_> =
374            sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
375
376        // Calculate the end height.
377        let end_height = start_height.saturating_add(requests.len() as u32);
378
379        // Insert the chunk of block requests.
380        for (height, (hash, previous_hash, _)) in requests.iter() {
381            // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk.
382            if let Err(error) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
383                warn!("Block sync failed - {error}");
384                return false;
385            }
386        }
387
388        /* Send the block request to the peers */
389
390        // Construct the message.
391        let message = C::prepare_block_request(start_height, end_height);
392
393        // Send the message to the peers.
394        let mut tasks = Vec::with_capacity(sync_ips.len());
395        for sync_ip in sync_ips {
396            let sender = communication.send(sync_ip, message.clone()).await;
397            let task = tokio::spawn(async move {
398                // Ensure the request is sent successfully.
399                match sender {
400                    Some(sender) => {
401                        if let Err(err) = sender.await {
402                            warn!("Failed to send block request to peer '{sync_ip}': {err}");
403                            false
404                        } else {
405                            true
406                        }
407                    }
408                    None => {
409                        warn!("Failed to send block request to peer '{sync_ip}': no such peer");
410                        false
411                    }
412                }
413            });
414
415            tasks.push(task);
416        }
417
418        // Wait for all sends to finish at the same time.
419        for result in futures::future::join_all(tasks).await {
420            let success = match result {
421                Ok(success) => success,
422                Err(err) => {
423                    error!("tokio join error: {err}");
424                    false
425                }
426            };
427
428            // If sending fails for any peer, remove the block request from the sync pool.
429            if !success {
430                // Remove the entire block request from the sync pool.
431                let mut requests = self.requests.write();
432                for height in start_height..end_height {
433                    requests.remove(&height);
434                }
435                // Break out of the loop.
436                return false;
437            }
438        }
439        true
440    }
441
442    /// Inserts a new block response from the given peer IP.
443    ///
444    /// Returns an error if the block was malformed, or we already received a different block for this height.
445    /// This function also removes all block requests from the given peer IP on failure.
446    ///
447    /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`.
448    ///
449    #[inline]
450    pub fn insert_block_responses(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
451        // Insert the candidate blocks into the sync pool.
452        for block in blocks {
453            if let Err(error) = self.insert_block_response(peer_ip, block) {
454                self.remove_block_requests_to_peer(&peer_ip);
455                bail!("{error}");
456            }
457        }
458        Ok(())
459    }
460
461    /// Returns the next block for the given `next_height` if the request is complete,
462    /// or `None` otherwise. This does not remove the block from the `responses` map.
463    #[inline]
464    pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
465        // Determine if the request is complete:
466        // either there is no request for `next_height`, or the request has no peer socket addresses left.
467        if let Some(entry) = self.requests.read().get(&next_height) {
468            let is_complete = entry.sync_ips().is_empty();
469            if !is_complete {
470                return None;
471            }
472
473            // If the request is complete, return the block from the responses, if there is one.
474            if entry.response.is_none() {
475                warn!("Request for height {next_height} is complete but no response exists");
476            }
477            entry.response.clone()
478        } else {
479            None
480        }
481    }
482
483    /// Attempts to advance synchronization by processing completed block responses.
484    ///
485    /// Returns true, if new blocks were added to the ledger.
486    ///
487    /// # Usage
488    /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks.
489    /// Validators do not call this function, and instead invoke
490    /// [`snarkos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state.
491    #[inline]
492    pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
493        // Acquire the lock to ensure this function is called only once at a time.
494        // If the lock is already acquired, return early.
495        //
496        // Note: This lock should not be needed anymore as there is only one place we call it from,
497        // but we keep it for now out of caution.
498        // TODO(kaimast): remove this eventually.
499        let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
500            trace!("Skipping attempt to advance block synchronziation as it is already in progress");
501            return Ok(false);
502        };
503
504        // Start with the current height.
505        let mut current_height = self.ledger.latest_block_height();
506        let start_height = current_height;
507        trace!(
508            "Try advancing with block responses (at block {current_height}, current sync speed is {})",
509            self.get_sync_speed()
510        );
511
512        loop {
513            let next_height = current_height + 1;
514
515            let Some(block) = self.peek_next_block(next_height) else {
516                break;
517            };
518
519            // Ensure the block height matches.
520            if block.height() != next_height {
521                warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
522                break;
523            }
524
525            let ledger = self.ledger.clone();
526            let advanced = tokio::task::spawn_blocking(move || {
527                // Try to check the next block and advance to it.
528                match ledger.check_next_block(&block) {
529                    Ok(_) => match ledger.advance_to_next_block(&block) {
530                        Ok(_) => true,
531                        Err(err) => {
532                            warn!(
533                                "Failed to advance to next block (height: {}, hash: '{}'): {err}",
534                                block.height(),
535                                block.hash()
536                            );
537                            false
538                        }
539                    },
540                    Err(err) => {
541                        warn!(
542                            "The next block (height: {}, hash: '{}') is invalid - {err}",
543                            block.height(),
544                            block.hash()
545                        );
546                        false
547                    }
548                }
549            })
550            .await?;
551
552            // Only count successful requests.
553            if advanced {
554                self.count_request_completed();
555            }
556
557            // Remove the block response.
558            self.remove_block_response(next_height);
559
560            // If advancing failed, exit the loop.
561            if !advanced {
562                break;
563            }
564
565            // Update the latest height.
566            current_height = next_height;
567        }
568
569        if current_height > start_height {
570            self.set_sync_height(current_height);
571            Ok(true)
572        } else {
573            Ok(false)
574        }
575    }
576}
577
578impl<N: Network> BlockSync<N> {
579    /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync.
580    /// This function returns peers that are consistent with each other, and have a block height
581    /// that is greater than the ledger height of this node.
582    ///
583    /// # Locking
584    /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time.
585    pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
586        // Retrieve the current sync height.
587        let current_height = self.get_sync_height();
588
589        if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
590            // Map the locators into the latest height.
591            let sync_peers =
592                sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
593            // Return the sync peers and their minimum common ancestor.
594            Some((sync_peers, min_common_ancestor))
595        } else {
596            None
597        }
598    }
599
600    /// Updates the block locators and common ancestors for the given peer IP.
601    ///
602    /// This function does not need to check that the block locators are well-formed,
603    /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`].
604    ///
605    /// This function does **not** check
606    /// that the block locators are consistent with the peer's previous block locators or other peers' block locators.
607    pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
608        // Update the locators entry for the given peer IP.
609        // We perform this update atomically, and drop the lock as soon as we are done with the update.
610        match self.locators.write().entry(peer_ip) {
611            hash_map::Entry::Occupied(mut e) => {
612                // Return early if the block locators did not change.
613                if e.get() == locators {
614                    return Ok(());
615                }
616
617                let old_height = e.get().latest_locator_height();
618                let new_height = locators.latest_locator_height();
619
620                if old_height > new_height {
621                    debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
622                }
623                e.insert(locators.clone());
624            }
625            hash_map::Entry::Vacant(e) => {
626                e.insert(locators.clone());
627            }
628        }
629
630        // Compute the common ancestor with this node.
631        let new_local_ancestor = {
632            let mut ancestor = 0;
633            // Attention: Please do not optimize this loop, as it performs fork-detection. In addition,
634            // by iterating upwards, it also early-terminates malicious block locators at the *first* point
635            // of bifurcation in their ledger history, which is a critical safety guarantee provided here.
636            for (height, hash) in locators.clone().into_iter() {
637                if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
638                    match ledger_hash == hash {
639                        true => ancestor = height,
640                        false => {
641                            debug!("Detected fork with peer \"{peer_ip}\" at height {height}");
642                            break;
643                        }
644                    }
645                }
646            }
647            ancestor
648        };
649
650        // Compute the common ancestor with every other peer.
651        // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers.
652        let ancestor_updates: Vec<_> = self
653            .locators
654            .read()
655            .iter()
656            .filter_map(|(other_ip, other_locators)| {
657                // Skip if the other peer is the given peer.
658                if other_ip == &peer_ip {
659                    return None;
660                }
661                // Compute the common ancestor with the other peer.
662                let mut ancestor = 0;
663                for (height, hash) in other_locators.clone().into_iter() {
664                    if let Some(expected_hash) = locators.get_hash(height) {
665                        match expected_hash == hash {
666                            true => ancestor = height,
667                            false => {
668                                debug!(
669                                    "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
670                                );
671                                break;
672                            }
673                        }
674                    }
675                }
676
677                Some((PeerPair(peer_ip, *other_ip), ancestor))
678            })
679            .collect();
680
681        // Update the map of common ancestors.
682        // Scope the lock, so it is dropped before locking `sync_state`.
683        {
684            let mut common_ancestors = self.common_ancestors.write();
685            common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
686
687            for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
688                common_ancestors.insert(peer_pair, new_ancestor);
689            }
690        }
691
692        // Update `is_synced`.
693        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
694            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
695        }
696
697        // Notify the sync loop that something changed.
698        self.peer_notify.notify_one();
699
700        Ok(())
701    }
702
703    /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe
704    ///  (that we don't rely upon it for safety when we re-connect with the same peer).
705    /// Removes the peer from the sync pool, if they exist.
706    pub fn remove_peer(&self, peer_ip: &SocketAddr) {
707        trace!("Removing peer {peer_ip} from block sync");
708
709        // Remove the locators entry for the given peer IP.
710        self.locators.write().remove(peer_ip);
711        // Remove all common ancestor entries for this peers.
712        self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
713        // Remove all block requests to the peer.
714        self.remove_block_requests_to_peer(peer_ip);
715
716        // Notify the sync loop that something changed.
717        self.peer_notify.notify_one();
718    }
719}
720
721// Helper type for prepare_block_requests
722pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
723
724impl<N: Network> BlockSync<N> {
725    /// Returns a list of block requests and the sync peers, if the node needs to sync.
726    ///
727    /// You usually want to call `remove_timed_out_block_requests` before invoking this function.
728    ///
729    /// # Concurrency
730    /// This should be called by at most one task at a time.
731    ///
732    /// # Usage
733    ///  - For validators, the primary spawns one task that periodically calls `bft::Sync::try_block_sync`. There is no possibility of multiple calls to it at a time.
734    ///  - For clients, `Client::initialize_sync` also spawns exactly one task that periodically calls this function.
735    ///  - Provers do not call this function.
736    pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
737        // Used to print more information when we max out on requests.
738        let print_requests = || {
739            if tracing::enabled!(tracing::Level::TRACE) {
740                let summary = self.get_block_requests_summary();
741
742                trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
743                trace!("The following requests are still outstanding: {:?}", summary.outstanding);
744            }
745        };
746
747        // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while.
748        let current_height = self.get_sync_height();
749
750        // Ensure to not exceed the maximum number of outstanding block requests.
751        let max_outstanding_block_requests =
752            (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
753        let max_total_requests = 4 * max_outstanding_block_requests;
754        let max_new_blocks_to_request =
755            max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
756
757        // Prepare the block requests.
758        if self.num_total_block_requests() >= max_total_requests as usize {
759            trace!(
760                "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
761            );
762
763            print_requests();
764
765            // Return an empty list of block requests.
766            (Default::default(), Default::default())
767        } else if max_new_blocks_to_request == 0 {
768            trace!(
769                "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
770            );
771            print_requests();
772
773            // Return an empty list of block requests.
774            (Default::default(), Default::default())
775        } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
776            // Retrieve the highest block height.
777            let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
778            // Update the state of `is_block_synced` for the sync module.
779            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
780            // Return the list of block requests.
781            (
782                self.construct_requests(
783                    &sync_peers,
784                    current_height,
785                    min_common_ancestor,
786                    max_new_blocks_to_request,
787                    greatest_peer_height,
788                ),
789                sync_peers,
790            )
791        } else {
792            // Update `is_block_synced` if there are no pending requests or responses.
793            if self.requests.read().is_empty() {
794                trace!("All requests have been processed. Will set block synced to true.");
795                // Update the state of `is_block_synced` for the sync module.
796                // TODO(kaimast): remove this workaround
797                self.sync_state.write().set_greatest_peer_height(0);
798            } else {
799                trace!("No new blocks can be requests, but there are still outstanding requests.");
800            }
801
802            // Return an empty list of block requests.
803            (Default::default(), Default::default())
804        }
805    }
806
807    /// Should only be called by validators when they successfully process a block request.
808    /// (for other nodes this will be automatically called internally)
809    ///
810    /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`.
811    pub fn count_request_completed(&self) {
812        self.metrics.count_request_completed();
813    }
814
815    /// Set the sync height to a the given value.
816    /// This is a no-op if `new_height` is equal or less to the current sync height.
817    pub fn set_sync_height(&self, new_height: u32) {
818        // Scope state lock to avoid locking state and metrics at the same time.
819        let fully_synced = {
820            let mut state = self.sync_state.write();
821            state.set_sync_height(new_height);
822            !state.can_block_sync()
823        };
824
825        if fully_synced {
826            self.metrics.mark_fully_synced();
827        }
828    }
829
830    /// Inserts a block request for the given height.
831    fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
832        // Ensure the block request does not already exist.
833        self.check_block_request(height)?;
834        // Ensure the sync IPs are not empty.
835        ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
836        // Insert the block request.
837        self.requests.write().insert(height, OutstandingRequest {
838            request: (hash, previous_hash, sync_ips),
839            timestamp: Instant::now(),
840            response: None,
841        });
842        Ok(())
843    }
844
845    /// Inserts the given block response, after checking that the request exists and the response is well-formed.
846    /// On success, this function removes the peer IP from the request sync peers and inserts the response.
847    fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<()> {
848        // Retrieve the block height.
849        let height = block.height();
850        let mut requests = self.requests.write();
851
852        if self.ledger.contains_block_height(height) {
853            bail!("The sync request was removed because we already advanced");
854        }
855
856        let Some(entry) = requests.get_mut(&height) else { bail!("The sync pool did not request block {height}") };
857
858        // Retrieve the request entry for the candidate block.
859        let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
860
861        // Ensure the candidate block hash matches the expected hash.
862        if let Some(expected_hash) = expected_hash {
863            if block.hash() != *expected_hash {
864                bail!("The block hash for candidate block {height} from '{peer_ip}' is incorrect")
865            }
866        }
867        // Ensure the previous block hash matches if it exists.
868        if let Some(expected_previous_hash) = expected_previous_hash {
869            if block.previous_hash() != *expected_previous_hash {
870                bail!("The previous block hash in candidate block {height} from '{peer_ip}' is incorrect")
871            }
872        }
873        // Ensure the sync pool requested this block from the given peer.
874        if !sync_ips.contains(&peer_ip) {
875            bail!("The sync pool did not request block {height} from '{peer_ip}'")
876        }
877
878        // Remove the peer IP from the request entry.
879        entry.sync_ips_mut().swap_remove(&peer_ip);
880
881        if let Some(existing_block) = &entry.response {
882            // If the candidate block was already present, ensure it is the same block.
883            if block != *existing_block {
884                bail!("Candidate block {height} from '{peer_ip}' is malformed");
885            }
886        } else {
887            entry.response = Some(block.clone());
888        }
889
890        // Notify the sync loop that something changed.
891        self.response_notify.notify_one();
892
893        Ok(())
894    }
895
896    /// Checks that a block request for the given height does not already exist.
897    fn check_block_request(&self, height: u32) -> Result<()> {
898        // Ensure the block height is not already in the ledger.
899        if self.ledger.contains_block_height(height) {
900            bail!("Failed to add block request, as block {height} exists in the ledger");
901        }
902        // Ensure the block height is not already requested.
903        if self.requests.read().contains_key(&height) {
904            bail!("Failed to add block request, as block {height} exists in the requests map");
905        }
906
907        Ok(())
908    }
909
910    /// Removes the block request and response for the given height
911    /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
912    ///
913    /// Precondition: This may only be called after `peek_next_block` has returned `Some`,
914    /// which has checked if the request for the given height is complete
915    /// and there is a block with the given `height` in the `responses` map.
916    pub fn remove_block_response(&self, height: u32) {
917        // Remove the request entry for the given height.
918        if let Some(e) = self.requests.write().remove(&height) {
919            trace!(
920                "Block request for height {height} was completed in {}ms (sync speed is {})",
921                e.timestamp.elapsed().as_millis(),
922                self.get_sync_speed()
923            );
924
925            // Notify the sending task that less requests are in-flight.
926            self.peer_notify.notify_one();
927        }
928    }
929
930    /// Removes all block requests for the given peer IP.
931    ///
932    /// This is used when disconnecting from a peer or when a peer sends invalid block responses.
933    fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
934        trace!("Block sync is removing all block requests to peer {peer_ip}...");
935
936        // Remove the peer IP from the requests map. If any request entry is now empty,
937        // and its corresponding response entry is also empty, then remove that request entry altogether.
938        self.requests.write().retain(|height, e| {
939            let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
940
941            // Only remove requests that were sent to this peer, that have no other peer that can respond instead,
942            // and that were not completed yet.
943            let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
944            if !retain {
945                trace!("Removed block request timestamp for {peer_ip} at height {height}");
946            }
947            retain
948        });
949
950        // No need to remove responses here, because requests with responses will be retained.
951    }
952
953    /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time.
954    ///
955    /// This removes the corresponding block responses and returns the set of peers/addresses that timed out.
956    /// It will ask the peer pool handling service to ban any timed-out peers.
957    ///
958    /// Finally, it will return a set of new of block requests that replaced the timed-out requests (if needed).
959    pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
960        &self,
961        peer_pool_handler: &P,
962    ) -> Option<BlockRequestBatch<N>> {
963        // Acquire the write lock on the requests map.
964        let mut requests = self.requests.write();
965
966        // Retrieve the current time.
967        let now = Instant::now();
968
969        // Retrieve the current block height
970        let current_height = self.ledger.latest_block_height();
971
972        // Track the number of timed out block requests (only used to print a log message).
973        let mut timed_out_requests = vec![];
974
975        // Track which peers should be banned due to unresponsiveness.
976        let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
977
978        // Remove timed out block requests.
979        requests.retain(|height, e| {
980            let is_obsolete = *height <= current_height;
981            // Determine if the duration since the request timestamp has exceeded the request timeout.
982            let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
983            // Determine if the request is incomplete.
984            let is_complete = e.sync_ips().is_empty();
985
986            // Determine if the request has timed out.
987            let is_timeout = timer_elapsed && !is_complete;
988
989            // Retain if this is not a timeout and is not obsolete.
990            let retain = !is_timeout && !is_obsolete;
991
992            if is_timeout {
993                trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
994
995                // Increment the number of timed out block requests.
996                timed_out_requests.push(*height);
997            } else if is_obsolete {
998                trace!("Block request at height {height} became obsolete (current_height={current_height})");
999            }
1000
1001            // If the request timed out, also remove and ban given peer.
1002            if is_timeout {
1003                for peer_ip in e.sync_ips().iter() {
1004                    peers_to_ban.insert(*peer_ip);
1005                }
1006            }
1007
1008            retain
1009        });
1010
1011        if !timed_out_requests.is_empty() {
1012            debug!("{num} block requests timed out", num = timed_out_requests.len());
1013        }
1014
1015        let next_request_height = requests.iter().next().map(|(h, _)| *h);
1016
1017        // Avoid locking `locators` and `requests` at the same time.
1018        drop(requests);
1019
1020        // Now remove and ban any unresponsive peers
1021        for peer_ip in peers_to_ban {
1022            self.remove_peer(&peer_ip);
1023            peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1024        }
1025
1026        // Re-issue any timed-out requests.
1027        //
1028        // Do this even if timed_out_requests is empty, because we might not be able to re-issue
1029        // requests immediately if there are no other peers at a given time.
1030        // Further, this only closes the first gap. So multiple calls to this might be needed.
1031        let sync_height = self.get_sync_height();
1032        if let Some(next_height) = next_request_height {
1033            let start = sync_height + 1;
1034
1035            // Is there a gap?
1036            if next_height > start {
1037                // Only request the given range, so there are no overlaps with other requests.
1038                let end = next_height; // exclusive
1039                let max_new_blocks_to_request = end - start;
1040
1041                let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start) else {
1042                    warn!("Block requests timed out, but found no other peers to re-request from");
1043                    return None;
1044                };
1045
1046                // Retrieve the highest block height.
1047                let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
1048
1049                debug!("Re-requesting blocks starting at height {start}");
1050
1051                return Some((
1052                    self.construct_requests(
1053                        &sync_peers,
1054                        sync_height,
1055                        min_common_ancestor,
1056                        max_new_blocks_to_request,
1057                        greatest_peer_height,
1058                    ),
1059                    sync_peers,
1060                ));
1061            }
1062        }
1063
1064        None
1065    }
1066
1067    /// Finds the peers to sync from and the shared common ancestor, starting at the give height.
1068    ///
1069    /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer.
1070    /// Returns `None` if there are no peers to sync from.
1071    ///
1072    /// # Locking
1073    /// This function will read-lock `common_ancstors`.
1074    fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1075        // Retrieve the latest ledger height.
1076        let latest_ledger_height = self.ledger.latest_block_height();
1077
1078        // Pick a set of peers above the latest ledger height, and include their locators.
1079        // This will sort the peers by locator height in descending order.
1080        let candidate_locators: IndexMap<_, _> = self
1081            .locators
1082            .read()
1083            .iter()
1084            .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1085            .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1086            .take(NUM_SYNC_CANDIDATE_PEERS)
1087            .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1088            .collect();
1089
1090        // Case 0: If there are no candidate peers, return `None`.
1091        if candidate_locators.is_empty() {
1092            trace!("Found no sync peers with height greater {current_height}");
1093            return None;
1094        }
1095
1096        // TODO (howardwu): Change this to the highest cumulative weight for Phase 3.
1097        // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height,
1098        // then pick the peer with the highest height, and find peers (up to extra redundancy) with
1099        // a common ancestor above the block request range. Set the end height to their common ancestor.
1100
1101        // Determine the threshold number of peers to sync from.
1102        let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1103
1104        // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height
1105        // and a cohort of peers who share a common ancestor above this node's latest ledger height.
1106        for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1107            // The height of the common ancestor shared by all selected peers.
1108            let mut min_common_ancestor = peer_locators.latest_locator_height();
1109
1110            // The peers we will synchronize from.
1111            // As the previous iteration did not succeed, restart with the next candidate peers.
1112            let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1113
1114            // Try adding other peers consistent with this one to the sync peer set.
1115            for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1116                // Check if these two peers have a common ancestor above the latest ledger height.
1117                if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1118                    // If so, then check that their block locators are consistent.
1119                    if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1120                        // If their common ancestor is less than the minimum common ancestor, then update it.
1121                        min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1122
1123                        // Add the other peer to the list of sync peers.
1124                        sync_peers.push((*other_ip, other_locators.clone()));
1125                    }
1126                }
1127            }
1128
1129            // If we have enough sync peers above the latest ledger height, finish and return them.
1130            if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1131                // Shuffle the sync peers prior to returning. This ensures the rest of the stack
1132                // does not rely on the order of the sync peers, and that the sync peers are not biased.
1133                sync_peers.shuffle(&mut rand::thread_rng());
1134
1135                // Collect into an IndexMap and return.
1136                return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1137            }
1138        }
1139
1140        // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None.
1141        None
1142    }
1143
1144    /// Given the sync peers and their minimum common ancestor, return a list of block requests.
1145    fn construct_requests(
1146        &self,
1147        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1148        sync_height: u32,
1149        min_common_ancestor: u32,
1150        max_blocks_to_request: u32,
1151        greatest_peer_height: u32,
1152    ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1153        // Compute the start height for the block requests.
1154        let start_height = {
1155            let requests = self.requests.read();
1156            let mut start_height = sync_height + 1;
1157
1158            loop {
1159                if requests.contains_key(&start_height) {
1160                    start_height += 1;
1161                } else {
1162                    break;
1163                }
1164            }
1165
1166            start_height
1167        };
1168
1169        // If the minimum common ancestor is below the start height, then return early.
1170        if min_common_ancestor < start_height {
1171            if start_height < greatest_peer_height {
1172                trace!(
1173                    "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1174                );
1175            }
1176            return Default::default();
1177        }
1178
1179        // Compute the end height for the block request.
1180        let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1181
1182        // Construct the block hashes to request.
1183        let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1184        // Track the largest number of sync IPs required for any block request in the sequence of requests.
1185        let mut max_num_sync_ips = 1;
1186
1187        for height in start_height..end_height {
1188            // Ensure the current height is not in the ledger or already requested.
1189            if let Err(err) = self.check_block_request(height) {
1190                trace!("Failed to issue new request for height {height}: {err}");
1191
1192                // If the sequence of block requests is interrupted, then return early.
1193                // Otherwise, continue until the first start height that is new.
1194                match request_hashes.is_empty() {
1195                    true => continue,
1196                    false => break,
1197                }
1198            }
1199
1200            // Construct the block request.
1201            let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1202
1203            // Handle the dishonest case.
1204            if !is_honest {
1205                // TODO (howardwu): Consider performing an integrity check on peers (to disconnect).
1206                warn!("Detected dishonest peer(s) when preparing block request");
1207                // If there are not enough peers in the dishonest case, then return early.
1208                if sync_peers.len() < num_sync_ips {
1209                    break;
1210                }
1211            }
1212
1213            // Update the maximum number of sync IPs.
1214            max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1215
1216            // Append the request.
1217            request_hashes.insert(height, (hash, previous_hash));
1218        }
1219
1220        // Construct the requests with the same sync ips.
1221        request_hashes
1222            .into_iter()
1223            .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1224            .collect()
1225    }
1226}
1227
1228/// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash,
1229/// in order to allow the caller to determine what to do.
1230fn construct_request<N: Network>(
1231    height: u32,
1232    sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1233) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1234    let mut hash = None;
1235    let mut hash_redundancy: usize = 0;
1236    let mut previous_hash = None;
1237    let mut is_honest = true;
1238
1239    for peer_locators in sync_peers.values() {
1240        if let Some(candidate_hash) = peer_locators.get_hash(height) {
1241            match hash {
1242                // Increment the redundancy count if the hash matches.
1243                Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1244                // Some peer is dishonest.
1245                Some(_) => {
1246                    hash = None;
1247                    hash_redundancy = 0;
1248                    previous_hash = None;
1249                    is_honest = false;
1250                    break;
1251                }
1252                // Set the hash if it is not set.
1253                None => {
1254                    hash = Some(candidate_hash);
1255                    hash_redundancy = 1;
1256                }
1257            }
1258        }
1259        if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1260            match previous_hash {
1261                // Increment the redundancy count if the previous hash matches.
1262                Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1263                // Some peer is dishonest.
1264                Some(_) => {
1265                    hash = None;
1266                    hash_redundancy = 0;
1267                    previous_hash = None;
1268                    is_honest = false;
1269                    break;
1270                }
1271                // Set the previous hash if it is not set.
1272                None => previous_hash = Some(candidate_previous_hash),
1273            }
1274        }
1275    }
1276
1277    // Note that we intentionally do not just pick the peers that have the hash we have chosen,
1278    // to give stronger confidence that we are syncing during times when the network is consistent/stable.
1279    let num_sync_ips = {
1280        // Extra redundant peers - as the block hash was dishonest.
1281        if !is_honest {
1282            // Choose up to the extra redundancy factor in sync peers.
1283            EXTRA_REDUNDANCY_FACTOR
1284        }
1285        // No redundant peers - as we have redundancy on the block hash.
1286        else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1287            // Choose one sync peer.
1288            1
1289        }
1290        // Redundant peers - as we do not have redundancy on the block hash.
1291        else {
1292            // Choose up to the redundancy factor in sync peers.
1293            REDUNDANCY_FACTOR
1294        }
1295    };
1296
1297    (hash, previous_hash, num_sync_ips, is_honest)
1298}
1299
1300#[cfg(test)]
1301mod tests {
1302    use super::*;
1303    use crate::locators::{
1304        CHECKPOINT_INTERVAL,
1305        NUM_RECENT_BLOCKS,
1306        test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1307    };
1308
1309    use snarkos_node_bft_ledger_service::MockLedgerService;
1310    use snarkos_node_router::{Peer, Resolver};
1311    use snarkos_node_tcp::{P2P, Tcp};
1312    use snarkvm::{
1313        ledger::committee::Committee,
1314        prelude::{Field, TestRng},
1315    };
1316
1317    use indexmap::{IndexSet, indexset};
1318    #[cfg(feature = "locktick")]
1319    use locktick::parking_lot::RwLock;
1320    #[cfg(not(feature = "locktick"))]
1321    use parking_lot::RwLock;
1322    use rand::Rng;
1323    use std::net::{IpAddr, Ipv4Addr};
1324
1325    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1326
1327    #[derive(Default)]
1328    struct DummyPeerPoolHandler {
1329        peers_to_ban: RwLock<Vec<SocketAddr>>,
1330    }
1331
1332    impl P2P for DummyPeerPoolHandler {
1333        fn tcp(&self) -> &Tcp {
1334            unreachable!();
1335        }
1336    }
1337
1338    impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1339        const MAXIMUM_POOL_SIZE: usize = 10;
1340        const OWNER: &str = "[DummyPeerPoolHandler]";
1341        const PEER_SLASHING_COUNT: usize = 0;
1342
1343        fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1344            unreachable!();
1345        }
1346
1347        fn resolver(&self) -> &RwLock<Resolver<N>> {
1348            unreachable!();
1349        }
1350
1351        fn is_dev(&self) -> bool {
1352            true
1353        }
1354
1355        fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1356            self.peers_to_ban.write().push(listener_addr);
1357        }
1358    }
1359
1360    /// Returns the peer IP for the sync pool.
1361    fn sample_peer_ip(id: u16) -> SocketAddr {
1362        assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1363        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1364    }
1365
1366    /// Returns a sample committee.
1367    fn sample_committee() -> Committee<CurrentNetwork> {
1368        let rng = &mut TestRng::default();
1369        snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1370    }
1371
1372    /// Returns the ledger service, initialized to the given height.
1373    fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1374        MockLedgerService::new_at_height(sample_committee(), height)
1375    }
1376
1377    /// Returns the sync pool, with the ledger initialized to the given height.
1378    fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1379        BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1380    }
1381
1382    /// Returns a vector of randomly sampled block heights in [0, max_height].
1383    ///
1384    /// The maximum value will always be included in the result.
1385    fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1386        assert!(num_values > 0, "Cannot generate an empty vector");
1387        assert!((max_height as usize) >= num_values);
1388
1389        let mut rng = TestRng::default();
1390
1391        let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1392
1393        heights.push(max_height);
1394
1395        heights
1396    }
1397
1398    /// Returns a duplicate (deep copy) of the sync pool with a different ledger height.
1399    fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1400        BlockSync::<CurrentNetwork> {
1401            peer_notify: Notify::new(),
1402            response_notify: Default::default(),
1403            ledger: Arc::new(sample_ledger_service(height)),
1404            locators: RwLock::new(sync.locators.read().clone()),
1405            common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1406            requests: RwLock::new(sync.requests.read().clone()),
1407            sync_state: RwLock::new(sync.sync_state.read().clone()),
1408            advance_with_sync_blocks_lock: Default::default(),
1409            metrics: Default::default(),
1410        }
1411    }
1412
1413    /// Checks that the sync pool (starting at genesis) returns the correct requests.
1414    fn check_prepare_block_requests(
1415        sync: BlockSync<CurrentNetwork>,
1416        min_common_ancestor: u32,
1417        peers: IndexSet<SocketAddr>,
1418    ) {
1419        let rng = &mut TestRng::default();
1420
1421        // Check test assumptions are met.
1422        assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1423
1424        // Determine the number of peers within range of this sync pool.
1425        let num_peers_within_recent_range_of_ledger = {
1426            // If no peers are within range, then set to 0.
1427            if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1428                0
1429            }
1430            // Otherwise, manually check the number of peers within range.
1431            else {
1432                peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1433            }
1434        };
1435
1436        // Prepare the block requests.
1437        let (requests, sync_peers) = sync.prepare_block_requests();
1438
1439        // If there are no peers, then there should be no requests.
1440        if peers.is_empty() {
1441            assert!(requests.is_empty());
1442            return;
1443        }
1444
1445        // Otherwise, there should be requests.
1446        let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1447        assert_eq!(requests.len(), expected_num_requests);
1448
1449        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1450            // Construct the sync IPs.
1451            let sync_ips: IndexSet<_> =
1452                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1453            assert_eq!(height, 1 + idx as u32);
1454            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1455            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1456
1457            if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1458                assert_eq!(sync_ips.len(), 1);
1459            } else {
1460                assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1461                assert_eq!(sync_ips, peers);
1462            }
1463        }
1464    }
1465
1466    /// Tests that height and hash values are set correctly using many different maximum block heights.
1467    #[test]
1468    fn test_latest_block_height() {
1469        for height in generate_block_heights(100_001, 5000) {
1470            let sync = sample_sync_at_height(height);
1471            // Check that the latest block height is the maximum height.
1472            assert_eq!(sync.ledger.latest_block_height(), height);
1473
1474            // Check the hash to height mapping
1475            assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1476            assert_eq!(
1477                sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1478                height
1479            );
1480        }
1481    }
1482
1483    #[test]
1484    fn test_get_block_hash() {
1485        for height in generate_block_heights(100_001, 5000) {
1486            let sync = sample_sync_at_height(height);
1487
1488            // Check the height to hash mapping
1489            assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1490            assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1491        }
1492    }
1493
1494    #[test]
1495    fn test_prepare_block_requests() {
1496        for num_peers in 0..111 {
1497            println!("Testing with {num_peers} peers");
1498
1499            let sync = sample_sync_at_height(0);
1500
1501            let mut peers = indexset![];
1502
1503            for peer_id in 1..=num_peers {
1504                // Add a peer.
1505                sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1506                // Add the peer to the set of peers.
1507                peers.insert(sample_peer_ip(peer_id));
1508            }
1509
1510            // If all peers are ahead, then requests should be prepared.
1511            check_prepare_block_requests(sync, 10, peers);
1512        }
1513    }
1514
1515    #[test]
1516    fn test_prepare_block_requests_with_leading_fork_at_11() {
1517        let sync = sample_sync_at_height(0);
1518
1519        // Intuitively, peer 1's fork is above peer 2 and peer 3's height.
1520        // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork.
1521        // Thus, you can sync up to block 10 from any of the 3 peers.
1522
1523        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11,
1524        // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers.
1525        // This is safe because the leading fork is at 11, and the sync pool is at 0,
1526        // so all candidate peers are at least 10 blocks ahead of the sync pool.
1527
1528        // Add a peer (fork).
1529        let peer_1 = sample_peer_ip(1);
1530        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1531
1532        // Add a peer.
1533        let peer_2 = sample_peer_ip(2);
1534        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1535
1536        // Add a peer.
1537        let peer_3 = sample_peer_ip(3);
1538        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1539
1540        // Prepare the block requests.
1541        let (requests, _) = sync.prepare_block_requests();
1542        assert_eq!(requests.len(), 10);
1543
1544        // Check the requests.
1545        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1546            assert_eq!(height, 1 + idx as u32);
1547            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1548            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1549            assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1550        }
1551    }
1552
1553    #[test]
1554    fn test_prepare_block_requests_with_leading_fork_at_10() {
1555        let rng = &mut TestRng::default();
1556        let sync = sample_sync_at_height(0);
1557
1558        // Intuitively, peer 1's fork is at peer 2 and peer 3's height.
1559        // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked.
1560        // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10.
1561        //
1562        // Now, while you could in theory sync up to block 9 from any of the 3 peers,
1563        // we choose not to do this as either side is likely to disconnect from us,
1564        // and we would rather wait for enough redundant peers before syncing.
1565
1566        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10,
1567        // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers.
1568        // We choose to sync with a cohort of peers that are *consistent* with each other,
1569        // and prioritize from descending heights (so the highest peer gets priority).
1570
1571        // Add a peer (fork).
1572        let peer_1 = sample_peer_ip(1);
1573        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1574
1575        // Add a peer.
1576        let peer_2 = sample_peer_ip(2);
1577        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1578
1579        // Add a peer.
1580        let peer_3 = sample_peer_ip(3);
1581        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1582
1583        // Prepare the block requests.
1584        let (requests, _) = sync.prepare_block_requests();
1585        assert_eq!(requests.len(), 0);
1586
1587        // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests.
1588
1589        // Add a peer.
1590        let peer_4 = sample_peer_ip(4);
1591        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1592
1593        // Prepare the block requests.
1594        let (requests, sync_peers) = sync.prepare_block_requests();
1595        assert_eq!(requests.len(), 10);
1596
1597        // Check the requests.
1598        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1599            // Construct the sync IPs.
1600            let sync_ips: IndexSet<_> =
1601                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1602            assert_eq!(height, 1 + idx as u32);
1603            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1604            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1605            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1606            assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer.
1607        }
1608    }
1609
1610    #[test]
1611    fn test_prepare_block_requests_with_trailing_fork_at_9() {
1612        let rng = &mut TestRng::default();
1613        let sync = sample_sync_at_height(0);
1614
1615        // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers
1616        // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2,
1617        // then you should be able to sync up to block 10, thereby biasing away from peer 3.
1618
1619        // Add a peer (fork).
1620        let peer_1 = sample_peer_ip(1);
1621        sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1622
1623        // Add a peer.
1624        let peer_2 = sample_peer_ip(2);
1625        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1626
1627        // Add a peer.
1628        let peer_3 = sample_peer_ip(3);
1629        sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1630
1631        // Prepare the block requests.
1632        let (requests, _) = sync.prepare_block_requests();
1633        assert_eq!(requests.len(), 0);
1634
1635        // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests.
1636
1637        // Add a peer.
1638        let peer_4 = sample_peer_ip(4);
1639        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1640
1641        // Prepare the block requests.
1642        let (requests, sync_peers) = sync.prepare_block_requests();
1643        assert_eq!(requests.len(), 10);
1644
1645        // Check the requests.
1646        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1647            // Construct the sync IPs.
1648            let sync_ips: IndexSet<_> =
1649                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1650            assert_eq!(height, 1 + idx as u32);
1651            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1652            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1653            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1654            assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer.
1655        }
1656    }
1657
1658    #[test]
1659    fn test_insert_block_requests() {
1660        let rng = &mut TestRng::default();
1661        let sync = sample_sync_at_height(0);
1662
1663        // Add a peer.
1664        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1665
1666        // Prepare the block requests.
1667        let (requests, sync_peers) = sync.prepare_block_requests();
1668        assert_eq!(requests.len(), 10);
1669
1670        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1671            // Construct the sync IPs.
1672            let sync_ips: IndexSet<_> =
1673                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1674            // Insert the block request.
1675            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1676            // Check that the block requests were inserted.
1677            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1678            assert!(sync.get_block_request_timestamp(height).is_some());
1679        }
1680
1681        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1682            // Construct the sync IPs.
1683            let sync_ips: IndexSet<_> =
1684                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1685            // Check that the block requests are still inserted.
1686            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1687            assert!(sync.get_block_request_timestamp(height).is_some());
1688        }
1689
1690        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1691            // Construct the sync IPs.
1692            let sync_ips: IndexSet<_> =
1693                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1694            // Ensure that the block requests cannot be inserted twice.
1695            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1696            // Check that the block requests are still inserted.
1697            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1698            assert!(sync.get_block_request_timestamp(height).is_some());
1699        }
1700    }
1701
1702    #[test]
1703    fn test_insert_block_requests_fails() {
1704        let sync = sample_sync_at_height(9);
1705
1706        // Add a peer.
1707        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1708
1709        // Inserting a block height that is already in the ledger should fail.
1710        sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1711        // Inserting a block height that is not in the ledger should succeed.
1712        sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1713    }
1714
1715    #[test]
1716    fn test_update_peer_locators() {
1717        let sync = sample_sync_at_height(0);
1718
1719        // Test 2 peers.
1720        let peer1_ip = sample_peer_ip(1);
1721        for peer1_height in 0..500u32 {
1722            sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1723            assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1724
1725            let peer2_ip = sample_peer_ip(2);
1726            for peer2_height in 0..500u32 {
1727                println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1728
1729                sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1730                assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1731
1732                // Compute the distance between the peers.
1733                let distance = peer1_height.abs_diff(peer2_height);
1734
1735                // Check the common ancestor.
1736                if distance < NUM_RECENT_BLOCKS as u32 {
1737                    let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1738                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1739                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1740                } else {
1741                    let min_checkpoints =
1742                        core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1743                    let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1744                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1745                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1746                }
1747            }
1748        }
1749    }
1750
1751    #[test]
1752    fn test_remove_peer() {
1753        let sync = sample_sync_at_height(0);
1754
1755        let peer_ip = sample_peer_ip(1);
1756        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1757        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1758
1759        sync.remove_peer(&peer_ip);
1760        assert_eq!(sync.get_peer_height(&peer_ip), None);
1761
1762        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1763        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1764
1765        sync.remove_peer(&peer_ip);
1766        assert_eq!(sync.get_peer_height(&peer_ip), None);
1767    }
1768
1769    #[test]
1770    fn test_locators_insert_remove_insert() {
1771        let sync = sample_sync_at_height(0);
1772
1773        let peer_ip = sample_peer_ip(1);
1774        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1775        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1776
1777        sync.remove_peer(&peer_ip);
1778        assert_eq!(sync.get_peer_height(&peer_ip), None);
1779
1780        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1781        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1782    }
1783
1784    #[test]
1785    fn test_requests_insert_remove_insert() {
1786        let rng = &mut TestRng::default();
1787        let sync = sample_sync_at_height(0);
1788
1789        // Add a peer.
1790        let peer_ip = sample_peer_ip(1);
1791        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1792
1793        // Prepare the block requests.
1794        let (requests, sync_peers) = sync.prepare_block_requests();
1795        assert_eq!(requests.len(), 10);
1796
1797        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1798            // Construct the sync IPs.
1799            let sync_ips: IndexSet<_> =
1800                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1801            // Insert the block request.
1802            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1803            // Check that the block requests were inserted.
1804            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1805            assert!(sync.get_block_request_timestamp(height).is_some());
1806        }
1807
1808        // Remove the peer.
1809        sync.remove_peer(&peer_ip);
1810
1811        for (height, _) in requests {
1812            // Check that the block requests were removed.
1813            assert_eq!(sync.get_block_request(height), None);
1814            assert!(sync.get_block_request_timestamp(height).is_none());
1815        }
1816
1817        // As there is no peer, it should not be possible to prepare block requests.
1818        let (requests, _) = sync.prepare_block_requests();
1819        assert_eq!(requests.len(), 0);
1820
1821        // Add the peer again.
1822        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1823
1824        // Prepare the block requests.
1825        let (requests, _) = sync.prepare_block_requests();
1826        assert_eq!(requests.len(), 10);
1827
1828        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1829            // Construct the sync IPs.
1830            let sync_ips: IndexSet<_> =
1831                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1832            // Insert the block request.
1833            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1834            // Check that the block requests were inserted.
1835            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1836            assert!(sync.get_block_request_timestamp(height).is_some());
1837        }
1838    }
1839
1840    #[test]
1841    fn test_obsolete_block_requests() {
1842        let rng = &mut TestRng::default();
1843        let sync = sample_sync_at_height(0);
1844
1845        let locator_height = rng.gen_range(0..50);
1846
1847        // Add a peer.
1848        let locators = sample_block_locators(locator_height);
1849        sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1850
1851        // Construct block requests
1852        let (requests, sync_peers) = sync.prepare_block_requests();
1853        assert_eq!(requests.len(), locator_height as usize);
1854
1855        // Add the block requests to the sync module.
1856        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1857            // Construct the sync IPs.
1858            let sync_ips: IndexSet<_> =
1859                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1860            // Insert the block request.
1861            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1862            // Check that the block requests were inserted.
1863            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1864            assert!(sync.get_block_request_timestamp(height).is_some());
1865        }
1866
1867        // Duplicate a new sync module with a different height to simulate block advancement.
1868        // This range needs to be inclusive, so that the range is never empty,
1869        // even with a locator height of 0.
1870        let ledger_height = rng.gen_range(0..=locator_height);
1871        let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
1872
1873        // Check that the number of requests is the same.
1874        assert_eq!(new_sync.requests.read().len(), requests.len());
1875
1876        // Remove timed out block requests.
1877        let c = DummyPeerPoolHandler::default();
1878        new_sync.handle_block_request_timeouts(&c);
1879
1880        // Check that the number of requests is reduced based on the ledger height.
1881        assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
1882    }
1883
1884    #[test]
1885    fn test_timed_out_block_request() {
1886        let sync = sample_sync_at_height(0);
1887        let peer_ip = sample_peer_ip(1);
1888        let locators = sample_block_locators(10);
1889        let block_hash = locators.get_hash(1);
1890
1891        sync.update_peer_locators(peer_ip, &locators).unwrap();
1892
1893        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1894
1895        // Add a timed-out request
1896        sync.requests.write().insert(1, OutstandingRequest {
1897            request: (block_hash, None, [peer_ip].into()),
1898            timestamp,
1899            response: None,
1900        });
1901
1902        assert_eq!(sync.requests.read().len(), 1);
1903        assert_eq!(sync.locators.read().len(), 1);
1904
1905        // Remove timed out block requests.
1906        let c = DummyPeerPoolHandler::default();
1907        sync.handle_block_request_timeouts(&c);
1908
1909        let ban_list = c.peers_to_ban.write();
1910        assert_eq!(ban_list.len(), 1);
1911        assert_eq!(ban_list.iter().next(), Some(&peer_ip));
1912
1913        assert!(sync.requests.read().is_empty());
1914        assert!(sync.locators.read().is_empty());
1915    }
1916
1917    #[test]
1918    fn test_reissue_timed_out_block_request() {
1919        let sync = sample_sync_at_height(0);
1920        let peer_ip1 = sample_peer_ip(1);
1921        let peer_ip2 = sample_peer_ip(2);
1922        let peer_ip3 = sample_peer_ip(3);
1923
1924        let locators = sample_block_locators(10);
1925        let block_hash1 = locators.get_hash(1);
1926        let block_hash2 = locators.get_hash(2);
1927
1928        sync.update_peer_locators(peer_ip1, &locators).unwrap();
1929        sync.update_peer_locators(peer_ip2, &locators).unwrap();
1930        sync.update_peer_locators(peer_ip3, &locators).unwrap();
1931
1932        assert_eq!(sync.locators.read().len(), 3);
1933
1934        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
1935
1936        // Add a timed-out request
1937        sync.requests.write().insert(1, OutstandingRequest {
1938            request: (block_hash1, None, [peer_ip1].into()),
1939            timestamp,
1940            response: None,
1941        });
1942
1943        // Add a timed-out request
1944        sync.requests.write().insert(2, OutstandingRequest {
1945            request: (block_hash2, None, [peer_ip2].into()),
1946            timestamp: Instant::now(),
1947            response: None,
1948        });
1949
1950        assert_eq!(sync.requests.read().len(), 2);
1951
1952        // Remove timed out block requests.
1953        let c = DummyPeerPoolHandler::default();
1954
1955        let re_requests = sync.handle_block_request_timeouts(&c);
1956
1957        let ban_list = c.peers_to_ban.write();
1958        assert_eq!(ban_list.len(), 1);
1959        assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
1960
1961        assert_eq!(sync.requests.read().len(), 1);
1962        assert_eq!(sync.locators.read().len(), 2);
1963
1964        let (new_requests, new_sync_ips) = re_requests.unwrap();
1965        assert_eq!(new_requests.len(), 1);
1966
1967        let (height, (hash, _, _)) = new_requests.first().unwrap();
1968        assert_eq!(*height, 1);
1969        assert_eq!(*hash, block_hash1);
1970        assert_eq!(new_sync_ips.len(), 2);
1971
1972        // Make sure the removed peer is not in the sync_peer set.
1973        let mut iter = new_sync_ips.iter();
1974        assert_ne!(iter.next().unwrap().0, &peer_ip1);
1975        assert_ne!(iter.next().unwrap().0, &peer_ip1);
1976    }
1977}