Skip to main content

snarkos_node_sync/
block_sync.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18    locators::BlockLocators,
19};
20use snarkos_node_bft_ledger_service::LedgerService;
21use snarkos_node_network::PeerPoolHandling;
22use snarkos_node_router::messages::DataBlocks;
23use snarkos_node_sync_communication_service::CommunicationService;
24use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
25
26use snarkvm::{
27    console::network::{ConsensusVersion, Network},
28    prelude::block::Block,
29    utilities::flatten_error,
30};
31
32use anyhow::{Result, bail, ensure};
33use indexmap::{IndexMap, IndexSet};
34use itertools::Itertools;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(feature = "locktick")]
38use locktick::tokio::Mutex as TMutex;
39#[cfg(not(feature = "locktick"))]
40use parking_lot::RwLock;
41use rand::seq::{IteratorRandom, SliceRandom};
42use std::{
43    collections::{BTreeMap, HashMap, HashSet, hash_map},
44    net::{IpAddr, Ipv4Addr, SocketAddr},
45    sync::Arc,
46    time::{Duration, Instant},
47};
48#[cfg(not(feature = "locktick"))]
49use tokio::sync::Mutex as TMutex;
50use tokio::sync::Notify;
51
52mod helpers;
53use helpers::rangify_heights;
54
55mod sync_state;
56use sync_state::SyncState;
57
58mod metrics;
59use metrics::BlockSyncMetrics;
60
61// The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator
62// by requiring multiple peers to advertise the same (prefix of) block locators.
63// However, we do not use this in production yet.
64#[cfg(not(test))]
65pub const REDUNDANCY_FACTOR: usize = 1;
66#[cfg(test)]
67pub const REDUNDANCY_FACTOR: usize = 3;
68
69/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection.
70///
71/// The current rate limit for all messages is around 160k  per second (see [`Gateway::max_cache_events`]).
72/// This constant limits number of block requests to a much lower 100 per second.
73///
74// TODO(kaimast): base rate limits on how many requests were sent to each peer instead.
75pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
76
77const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
78const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
79
80const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(600);
81
82/// The maximum number of outstanding block requests.
83/// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses.
84const MAX_BLOCK_REQUESTS: usize = 50; // 50 requests
85
86/// The maximum number of blocks tolerated before the primary is considered behind its peers.
87pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks
88
89/// This is a dummy IP address that is used to represent the local node.
90/// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections.
91pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
92
93/// Handle to an outstanding requested, containing the request itself and its timestamp.
94/// This does not contain the response so that checking for responses does not require iterating over all requests.
95#[derive(Clone)]
96struct OutstandingRequest<N: Network> {
97    request: SyncRequest<N>,
98    timestamp: Instant,
99    /// The corresponding response (if any).
100    /// This is guaranteed to be Some if sync_ips for the given request are empty.
101    response: Option<Block<N>>,
102}
103
104/// Information about a block request (used for the REST API).
105#[derive(Clone, serde::Serialize)]
106pub struct BlockRequestInfo {
107    /// Seconds since the request was created
108    elapsed: u64,
109    /// Has the request been responded to?
110    done: bool,
111}
112
113/// Summary of completed all in-flight requests.
114#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestsSummary {
116    outstanding: String,
117    completed: String,
118}
119
120#[derive(thiserror::Error, Debug)]
121pub enum InsertBlockResponseError<N: Network> {
122    #[error("Empty block response")]
123    EmptyBlockResponse,
124    #[error("The peer did not send a consensus version")]
125    NoConsensusVersion,
126    #[error(
127        "The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
128    )]
129    ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
130    #[error("Block Sync already advanced to block {height}")]
131    BlockSyncAlreadyAdvanced { height: u32 },
132    #[error("No such request for height {height}")]
133    NoSuchRequest { height: u32 },
134    #[error("Invalid block hash for height {height} from '{peer_ip}'")]
135    InvalidBlockHash { height: u32, peer_ip: SocketAddr },
136    #[error(
137        "The previous block hash in candidate block {height} from '{peer_ip}' is incorrect: expected {expected}, but got {actual}"
138    )]
139    InvalidPreviousBlockHash { height: u32, peer_ip: SocketAddr, expected: N::BlockHash, actual: N::BlockHash },
140    #[error("Candidate block {height} from '{peer_ip}' is malformed")]
141    MalformedBlock { height: u32, peer_ip: SocketAddr },
142    #[error("The sync pool did not request block {height} from '{peer_ip}'")]
143    WrongSyncPeer { height: u32, peer_ip: SocketAddr },
144    #[error("{}", flatten_error(.0))]
145    Other(#[from] anyhow::Error),
146}
147
148impl<N: Network> InsertBlockResponseError<N> {
149    /// Returns `true` if the error does not indicate malicious or faulty behavior.
150    pub fn is_benign(&self) -> bool {
151        matches!(self, Self::NoSuchRequest { .. } | Self::BlockSyncAlreadyAdvanced { .. })
152    }
153
154    // Returns true if the error is about an invalid consensus version.
155    pub fn is_invalid_consensus_version(&self) -> bool {
156        matches!(self, Self::ConsensusVersionMismatch { .. } | Self::NoConsensusVersion)
157    }
158}
159
160impl<N: Network> OutstandingRequest<N> {
161    /// Get a reference to the IPs of peers that have not responded to the request (yet).
162    fn sync_ips(&self) -> &IndexSet<SocketAddr> {
163        let (_, _, sync_ips) = &self.request;
164        sync_ips
165    }
166
167    /// Get a mutable reference to the IPs of peers that have not responded to the request (yet).
168    fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
169        let (_, _, sync_ips) = &mut self.request;
170        sync_ips
171    }
172}
173
174/// A struct that keeps track of synchronizing blocks with other nodes.
175///
176/// It generates requests to send to other peers and processes responses to those requests.
177/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from.
178///
179/// # Notes
180/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators).
181///
182/// - Validators only sync from other nodes using this struct if they fall behind, e.g.,
183///   because they experience a network partition.
184///   In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved
185///   by a supermajority of the committee.
186pub struct BlockSync<N: Network> {
187    /// The ledger.
188    ledger: Arc<dyn LedgerService<N>>,
189
190    /// The map of peer IP to their block locators.
191    /// The block locators are consistent with the ledger and every other peer's block locators.
192    locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
193
194    /// The map of peer-to-peer to their common ancestor.
195    /// This map is used to determine which peers to request blocks from.
196    ///
197    /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first.
198    common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
199
200    /// The block requests in progress and their responses.
201    requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
202
203    /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
204    ///
205    /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first.
206    sync_state: RwLock<SyncState>,
207
208    /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time.
209    advance_with_sync_blocks_lock: TMutex<()>,
210
211    /// Gets notified when there was an update to the locators or a peer disconnected.
212    peer_notify: Notify,
213
214    /// Gets notified when we received a new block response.
215    response_notify: Notify,
216
217    /// Tracks sync speed
218    metrics: BlockSyncMetrics,
219}
220
221impl<N: Network> BlockSync<N> {
222    /// Initializes a new block sync module.
223    pub fn new(ledger: Arc<dyn LedgerService<N>>) -> Self {
224        // Make sync state aware of the blocks that already exist on disk at startup.
225        let sync_state = SyncState::new_with_height(ledger.latest_block_height());
226
227        Self {
228            ledger,
229            sync_state: RwLock::new(sync_state),
230            peer_notify: Default::default(),
231            response_notify: Default::default(),
232            locators: Default::default(),
233            requests: Default::default(),
234            common_ancestors: Default::default(),
235            advance_with_sync_blocks_lock: Default::default(),
236            metrics: Default::default(),
237        }
238    }
239
240    /// Blocks until something about a peer changes,
241    /// or block request has been fully processed (either successfully or unsuccessfully).
242    ///
243    /// Used by the outgoing task.
244    pub async fn wait_for_peer_update(&self) {
245        self.peer_notify.notified().await
246    }
247
248    /// Blocks until there is a new response to a block request.
249    ///
250    /// Used by the incoming task.
251    pub async fn wait_for_block_responses(&self) {
252        self.response_notify.notified().await
253    }
254
255    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
256    #[inline]
257    pub fn is_block_synced(&self) -> bool {
258        self.sync_state.read().is_block_synced()
259    }
260
261    /// Returns `true` if there a blocks to fetch or responses to process.
262    ///
263    /// This will always return true if [`Self::is_block_synced`] returns false,
264    /// but it can return true when [`Self::is_block_synced`] returns true
265    /// (due to the latter having a tolerance of one block).
266    #[inline]
267    pub fn can_block_sync(&self) -> bool {
268        self.sync_state.read().can_block_sync() || self.has_pending_responses()
269    }
270
271    /// Returns the number of blocks the node is behind the greatest peer height,
272    /// or `None` if no peers are connected yet.
273    #[inline]
274    pub fn num_blocks_behind(&self) -> Option<u32> {
275        self.sync_state.read().num_blocks_behind()
276    }
277
278    /// Returns the greatest block height of any connected peer.
279    #[inline]
280    pub fn greatest_peer_block_height(&self) -> Option<u32> {
281        self.sync_state.read().get_greatest_peer_height()
282    }
283
284    /// Returns the current sync height of this node.
285    /// The sync height is always greater or equal to the ledger height.
286    #[inline]
287    pub fn get_sync_height(&self) -> u32 {
288        self.sync_state.read().get_sync_height()
289    }
290
291    /// Returns the number of blocks we requested from peers, but have not received yet.
292    #[inline]
293    pub fn num_outstanding_block_requests(&self) -> usize {
294        self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
295    }
296
297    /// The total number of block request, including the ones that have been answered already but not processed yet.
298    #[inline]
299    pub fn num_total_block_requests(&self) -> usize {
300        self.requests.read().len()
301    }
302
303    //// Returns the latest locator height for all known peers.
304    pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
305        self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
306    }
307
308    //// Returns information about all in-flight block requests.
309    pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
310        self.requests
311            .read()
312            .iter()
313            .map(|(height, request)| {
314                (*height, BlockRequestInfo {
315                    done: request.sync_ips().is_empty(),
316                    elapsed: request.timestamp.elapsed().as_secs(),
317                })
318            })
319            .collect()
320    }
321
322    /// Returns a summary of all in-flight requests.
323    pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
324        let completed = self
325            .requests
326            .read()
327            .iter()
328            .filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None })
329            .collect::<Vec<_>>();
330
331        let outstanding = self
332            .requests
333            .read()
334            .iter()
335            .filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None })
336            .collect::<Vec<_>>();
337
338        BlockRequestsSummary { completed: rangify_heights(&completed), outstanding: rangify_heights(&outstanding) }
339    }
340
341    pub fn get_sync_speed(&self) -> f64 {
342        self.metrics.get_sync_speed()
343    }
344}
345
346// Helper functions needed for testing
347#[cfg(test)]
348impl<N: Network> BlockSync<N> {
349    /// Returns the latest block height of the given peer IP.
350    fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
351        self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
352    }
353
354    /// Returns the common ancestor for the given peer pair, if it exists.
355    fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
356        self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
357    }
358
359    /// Returns the block request for the given height, if it exists.
360    fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
361        self.requests.read().get(&height).map(|e| e.request.clone())
362    }
363
364    /// Returns the timestamp of the last time the block was requested, if it exists.
365    fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
366        self.requests.read().get(&height).map(|e| e.timestamp)
367    }
368}
369
370impl<N: Network> BlockSync<N> {
371    /// Returns the block locators.
372    #[inline]
373    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
374        // Retrieve the latest block height.
375        let latest_height = self.ledger.latest_block_height();
376
377        // Initialize the recents map.
378        // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1
379        let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
380        // Retrieve the recent block hashes.
381        for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
382            recents.insert(height, self.ledger.get_block_hash(height)?);
383        }
384
385        // Initialize the checkpoints map.
386        let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
387        // Retrieve the checkpoint block hashes.
388        for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
389            checkpoints.insert(height, self.ledger.get_block_hash(height)?);
390        }
391
392        // Construct the block locators.
393        BlockLocators::new(recents, checkpoints)
394    }
395
396    /// Returns true if there are pending responses to block requests that need to be processed.
397    pub fn has_pending_responses(&self) -> bool {
398        self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
399    }
400
401    /// Send a batch of block requests.
402    pub async fn send_block_requests<C: CommunicationService>(
403        &self,
404        communication: &C,
405        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
406        requests: &[(u32, PrepareSyncRequest<N>)],
407    ) -> bool {
408        let (start_height, max_num_sync_ips) = match requests.first() {
409            Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
410            None => {
411                warn!("Block sync failed - no block requests");
412                return false;
413            }
414        };
415
416        debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_peers.keys());
417
418        // Use a randomly sampled subset of the sync IPs.
419        let sync_ips: IndexSet<_> =
420            sync_peers.keys().copied().choose_multiple(&mut rand::thread_rng(), max_num_sync_ips).into_iter().collect();
421
422        // Calculate the end height.
423        let end_height = start_height.saturating_add(requests.len() as u32);
424
425        // Insert the chunk of block requests.
426        for (height, (hash, previous_hash, _)) in requests.iter() {
427            // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk.
428            if let Err(err) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
429                let err = err.context(format!("Failed to insert block request for height {height}"));
430                warn!("{}", flatten_error(&err));
431                return false;
432            }
433        }
434
435        /* Send the block request to the peers */
436
437        // Construct the message.
438        let message = C::prepare_block_request(start_height, end_height);
439
440        // Send the message to the peers.
441        let mut tasks = Vec::with_capacity(sync_ips.len());
442        for sync_ip in sync_ips {
443            let sender = communication.send(sync_ip, message.clone()).await;
444            let task = tokio::spawn(async move {
445                // Ensure the request is sent successfully.
446                match sender {
447                    Some(sender) => {
448                        if let Err(err) = sender.await {
449                            warn!("Failed to send block request to peer '{sync_ip}': {err}");
450                            false
451                        } else {
452                            true
453                        }
454                    }
455                    None => {
456                        warn!("Failed to send block request to peer '{sync_ip}': no such peer");
457                        false
458                    }
459                }
460            });
461
462            tasks.push(task);
463        }
464
465        // Wait for all sends to finish at the same time.
466        for result in futures::future::join_all(tasks).await {
467            let success = match result {
468                Ok(success) => success,
469                Err(err) => {
470                    error!("tokio join error: {err}");
471                    false
472                }
473            };
474
475            // If sending fails for any peer, remove the block request from the sync pool.
476            if !success {
477                // Remove the entire block request from the sync pool.
478                let mut requests = self.requests.write();
479                for height in start_height..end_height {
480                    requests.remove(&height);
481                }
482                // Break out of the loop.
483                return false;
484            }
485        }
486        true
487    }
488
489    /// Inserts a new block response from the given peer IP.
490    ///
491    /// Returns an error if the block was malformed, or we already received a different block for this height.
492    /// This function also removes all block requests from the given peer IP on failure.
493    ///
494    /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`.
495    ///
496    #[inline]
497    pub fn insert_block_responses(
498        &self,
499        peer_ip: SocketAddr,
500        blocks: Vec<Block<N>>,
501        latest_consensus_version: Option<ConsensusVersion>,
502    ) -> Result<(), InsertBlockResponseError<N>> {
503        // Attempt to insert the block responses, and break if we encounter an error.
504        let result = 'outer: {
505            let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
506                break 'outer Err(InsertBlockResponseError::EmptyBlockResponse);
507            };
508
509            let expected_consensus_version = N::CONSENSUS_VERSION(last_height)?;
510
511            // Perform consensus version check, if possible.
512            // This check is only enabled after nodes have reached V12.
513            if expected_consensus_version >= ConsensusVersion::V12 {
514                if let Some(peer_version) = latest_consensus_version {
515                    if peer_version != expected_consensus_version {
516                        break 'outer Err(InsertBlockResponseError::ConsensusVersionMismatch {
517                            peer_version,
518                            expected_version: expected_consensus_version,
519                            last_height,
520                        });
521                    }
522                } else {
523                    break 'outer Err(InsertBlockResponseError::NoConsensusVersion);
524                }
525            }
526
527            // Insert the candidate blocks into the sync pool.
528            for block in blocks {
529                self.insert_block_response(peer_ip, block)?;
530            }
531
532            Ok(())
533        };
534
535        // On failure, remove all block requests to the peer.
536        if result.is_err() {
537            self.remove_block_requests_to_peer(&peer_ip);
538        }
539
540        // Return the result.
541        result
542    }
543
544    /// Returns the next block for the given `next_height` if the request is complete,
545    /// or `None` otherwise. This does not remove the block from the `responses` map.
546    #[inline]
547    pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
548        // Determine if the request is complete:
549        // either there is no request for `next_height`, or the request has no peer socket addresses left.
550        if let Some(entry) = self.requests.read().get(&next_height) {
551            let is_complete = entry.sync_ips().is_empty();
552            if !is_complete {
553                return None;
554            }
555
556            // If the request is complete, return the block from the responses, if there is one.
557            if entry.response.is_none() {
558                warn!("Request for height {next_height} is complete but no response exists");
559            }
560            entry.response.clone()
561        } else {
562            None
563        }
564    }
565
566    /// Attempts to advance synchronization by processing completed block responses.
567    ///
568    /// Returns true, if new blocks were added to the ledger.
569    ///
570    /// # Usage
571    /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks.
572    /// Validators do not call this function, and instead invoke
573    /// [`snarkos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state.
574    #[inline]
575    pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
576        // Acquire the lock to ensure this function is called only once at a time.
577        // If the lock is already acquired, return early.
578        //
579        // Note: This lock should not be needed anymore as there is only one place we call it from,
580        // but we keep it for now out of caution.
581        // TODO(kaimast): remove this eventually.
582        let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
583            trace!("Skipping attempt to advance block synchronziation as it is already in progress");
584            return Ok(false);
585        };
586
587        // Start with the current height.
588        let mut current_height = self.ledger.latest_block_height();
589        let start_height = current_height;
590        trace!(
591            "Try advancing with block responses (at block {current_height}, current sync speed is {})",
592            self.get_sync_speed()
593        );
594
595        loop {
596            let next_height = current_height + 1;
597
598            let Some(block) = self.peek_next_block(next_height) else {
599                break;
600            };
601
602            // Ensure the block height matches.
603            if block.height() != next_height {
604                warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
605                break;
606            }
607
608            let ledger = self.ledger.clone();
609            let advanced = tokio::task::spawn_blocking(move || {
610                // Try to check the next block and advance to it.
611                match ledger.check_next_block(&block) {
612                    Ok(_) => match ledger.advance_to_next_block(&block) {
613                        Ok(_) => true,
614                        Err(err) => {
615                            let err = err.context(format!(
616                                "Failed to advance to next block (height: {}, hash: '{}')",
617                                block.height(),
618                                block.hash()
619                            ));
620                            warn!("{}", flatten_error(&err));
621                            false
622                        }
623                    },
624                    Err(err) => {
625                        let err = err.context(format!(
626                            "The next block (height: {}, hash: '{}') is invalid",
627                            block.height(),
628                            block.hash()
629                        ));
630                        warn!("{}", flatten_error(&err));
631                        false
632                    }
633                }
634            })
635            .await?;
636
637            // Only count successful requests.
638            if advanced {
639                self.count_request_completed();
640            }
641
642            // Remove the block response.
643            self.remove_block_response(next_height);
644
645            // If advancing failed, exit the loop.
646            if !advanced {
647                break;
648            }
649
650            // Update the latest height.
651            current_height = next_height;
652        }
653
654        if current_height > start_height {
655            self.set_sync_height(current_height);
656            Ok(true)
657        } else {
658            Ok(false)
659        }
660    }
661}
662
663impl<N: Network> BlockSync<N> {
664    /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync.
665    /// This function returns peers that are consistent with each other, and have a block height
666    /// that is greater than the ledger height of this node.
667    ///
668    /// # Locking
669    /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time.
670    pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
671        // Retrieve the current sync height.
672        let current_height = self.get_sync_height();
673
674        if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
675            // Map the locators into the latest height.
676            let sync_peers =
677                sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
678            // Return the sync peers and their minimum common ancestor.
679            Some((sync_peers, min_common_ancestor))
680        } else {
681            None
682        }
683    }
684
685    /// Updates the block locators and common ancestors for the given peer IP.
686    ///
687    /// This function does not need to check that the block locators are well-formed,
688    /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`].
689    ///
690    /// This function does **not** check
691    /// that the block locators are consistent with the peer's previous block locators or other peers' block locators.
692    pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
693        // -- First, update the locators entry for the given peer IP. --
694        // We perform this update atomically, and drop the lock as soon as we are done with the update.
695        match self.locators.write().entry(peer_ip) {
696            hash_map::Entry::Occupied(mut e) => {
697                // Return early if the block locators did not change.
698                if e.get() == locators {
699                    return Ok(());
700                }
701
702                let old_height = e.get().latest_locator_height();
703                let new_height = locators.latest_locator_height();
704
705                if old_height > new_height {
706                    debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
707                }
708                e.insert(locators.clone());
709            }
710            hash_map::Entry::Vacant(e) => {
711                e.insert(locators.clone());
712            }
713        }
714
715        // -- Second, compute the common ancestor with this node. --
716        let new_local_ancestor = {
717            let mut ancestor = 0;
718            // Attention: Please do not optimize this loop, as it performs fork-detection. In addition,
719            // by iterating upwards, it also early-terminates malicious block locators at the *first* point
720            // of bifurcation in their ledger history, which is a critical safety guarantee provided here.
721            for (height, hash) in locators.clone().into_iter() {
722                if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
723                    match ledger_hash == hash {
724                        true => ancestor = height,
725                        false => {
726                            warn!("Detected fork between this node and peer \"{peer_ip}\" at height {height}");
727                            break;
728                        }
729                    }
730                }
731            }
732            ancestor
733        };
734
735        // -- Third, compute the common ancestor with every other peer, and determine if this peer is forked from others. --
736        // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers.
737        let ancestor_updates: Vec<_> = self
738            .locators
739            .read()
740            .iter()
741            .filter_map(|(other_ip, other_locators)| {
742                // Skip if the other peer is the given peer.
743                if other_ip == &peer_ip {
744                    return None;
745                }
746                // Compute the common ancestor with the other peer.
747                let mut ancestor = 0;
748                for (height, hash) in other_locators.clone().into_iter() {
749                    if let Some(expected_hash) = locators.get_hash(height) {
750                        match expected_hash == hash {
751                            true => ancestor = height,
752                            false => {
753                                debug!(
754                                    "Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
755                                );
756                                break;
757                            }
758                        }
759                    }
760                }
761
762                Some((PeerPair(peer_ip, *other_ip), ancestor))
763            })
764            .collect();
765
766        // -- Forth, update the map of common ancestors. --
767        // Scope the lock, so it is dropped before locking `sync_state`.
768        {
769            let mut common_ancestors = self.common_ancestors.write();
770            common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
771
772            for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
773                common_ancestors.insert(peer_pair, new_ancestor);
774            }
775        }
776
777        // -- Finally, update sync state and notify the sync loop about the change. --
778        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
779            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
780        } else {
781            error!("Got new block locators but greatest peer height is zero.");
782        }
783        // Even if the greatest peer height did not change, we still received new block locators
784        // that the sync loop might need to proceed.
785        self.peer_notify.notify_one();
786
787        Ok(())
788    }
789
790    /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe
791    ///  (that we don't rely upon it for safety when we re-connect with the same peer).
792    /// Removes the peer from the sync pool, if they exist.
793    pub fn remove_peer(&self, peer_ip: &SocketAddr) {
794        trace!("Removing peer {peer_ip} from block sync");
795
796        // Remove the locators entry for the given peer IP.
797        self.locators.write().remove(peer_ip);
798        // Remove all common ancestor entries for this peers.
799        self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
800        // Remove all block requests to the peer.
801        self.remove_block_requests_to_peer(peer_ip);
802
803        // Update sync state, because the greatest peer height may have decreased.
804        if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
805            self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
806        } else {
807            // There are no more peers left.
808            self.sync_state.write().clear_greatest_peer_height();
809        }
810
811        // Notify the sync loop that something changed.
812        self.peer_notify.notify_one();
813    }
814}
815
816// Helper type for prepare_block_requests
817pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
818
819impl<N: Network> BlockSync<N> {
820    /// Returns a list of block requests and the sync peers, if the node needs to sync.
821    ///
822    /// You usually want to call `remove_timed_out_block_requests` before invoking this function.
823    ///
824    /// # Concurrency
825    /// This should be called by at most one task at a time.
826    ///
827    /// # Usage
828    ///  - For validators, the primary spawns exactly one task that periodically calls
829    ///    `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it.
830    ///  - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls
831    ///    `Client::try_issuing_block_requests` which calls this function.
832    ///  - Provers do not call this function.
833    pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
834        // Used to print more information when we max out on requests.
835        let print_requests = || {
836            if tracing::enabled!(tracing::Level::TRACE) {
837                let summary = self.get_block_requests_summary();
838
839                trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
840                trace!("The following requests are still outstanding: {:?}", summary.outstanding);
841            }
842        };
843
844        // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while.
845        let current_height = self.get_sync_height();
846
847        // Ensure to not exceed the maximum number of outstanding block requests.
848        let max_outstanding_block_requests =
849            (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
850
851        // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet.
852        let max_total_requests = 4 * max_outstanding_block_requests;
853
854        let max_new_blocks_to_request =
855            max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
856
857        // Prepare the block requests and sync peers, or returns an empty result if there is nothing to request.
858        if self.num_total_block_requests() >= max_total_requests as usize {
859            trace!(
860                "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
861            );
862
863            print_requests();
864            Default::default()
865        } else if max_new_blocks_to_request == 0 {
866            trace!(
867                "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
868            );
869
870            print_requests();
871            Default::default()
872        } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
873            // Retrieve the greatest block height of any connected peer.
874            // We do not need to update the sync state here, as that already happens when the block locators are received.
875            let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
876
877            // Construct the list of block requests.
878            let requests = self.construct_requests(
879                &sync_peers,
880                current_height,
881                min_common_ancestor,
882                max_new_blocks_to_request,
883                greatest_peer_height,
884            );
885
886            (requests, sync_peers)
887        } else if self.requests.read().is_empty() {
888            // This can happen during a race condition where the node just finished syncing.
889            // It does not make sense to log or change the sync status here.
890            // Checking the sync status here also does not make sense, as the node might as well have switched back
891            //  from `synced` to `syncing` between calling `find_sync_peers_inner` and this line.
892
893            Default::default()
894        } else {
895            // This happens if we already requested all advertised blocks.
896            trace!("No new blocks can be requested, but there are still outstanding requests.");
897
898            print_requests();
899            Default::default()
900        }
901    }
902
903    /// Should only be called by validators when they successfully process a block request.
904    /// (for other nodes this will be automatically called internally)
905    ///
906    /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`.
907    pub fn count_request_completed(&self) {
908        self.metrics.count_request_completed();
909    }
910
911    /// Set the sync height to a the given value.
912    /// This is a no-op if `new_height` is equal or less to the current sync height.
913    pub fn set_sync_height(&self, new_height: u32) {
914        // Scope state lock to avoid locking state and metrics at the same time.
915        let fully_synced = {
916            let mut state = self.sync_state.write();
917            state.set_sync_height(new_height);
918            !state.can_block_sync()
919        };
920
921        if fully_synced {
922            self.metrics.mark_fully_synced();
923        }
924    }
925
926    /// Inserts a block request for the given height.
927    fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
928        // Ensure the block request does not already exist.
929        self.check_block_request(height)?;
930        // Ensure the sync IPs are not empty.
931        ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
932        // Insert the block request.
933        self.requests.write().insert(height, OutstandingRequest {
934            request: (hash, previous_hash, sync_ips),
935            timestamp: Instant::now(),
936            response: None,
937        });
938        Ok(())
939    }
940
941    /// Inserts the given block response, after checking that the request exists and the response is well-formed.
942    /// On success, this function removes the peer IP from the request sync peers and inserts the response.
943    fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<(), InsertBlockResponseError<N>> {
944        // Retrieve the block height.
945        let height = block.height();
946        let mut requests = self.requests.write();
947
948        if self.ledger.contains_block_height(height) {
949            return Err(InsertBlockResponseError::BlockSyncAlreadyAdvanced { height });
950        }
951
952        let Some(entry) = requests.get_mut(&height) else {
953            return Err(InsertBlockResponseError::NoSuchRequest { height });
954        };
955
956        // Retrieve the request entry for the candidate block.
957        let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
958
959        // Ensure the candidate block hash matches the expected hash.
960        if let Some(expected_hash) = expected_hash
961            && block.hash() != *expected_hash
962        {
963            return Err(InsertBlockResponseError::InvalidBlockHash { height, peer_ip });
964        }
965        // Ensure the previous block hash matches if it exists.
966        if let Some(expected_previous_hash) = expected_previous_hash
967            && block.previous_hash() != *expected_previous_hash
968        {
969            return Err(InsertBlockResponseError::InvalidPreviousBlockHash {
970                height,
971                peer_ip,
972                expected: *expected_previous_hash,
973                actual: block.previous_hash(),
974            });
975        }
976        // Ensure the sync pool requested this block from the given peer.
977        if !sync_ips.contains(&peer_ip) {
978            return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip });
979        }
980
981        // Remove the peer IP from the request entry.
982        entry.sync_ips_mut().swap_remove(&peer_ip);
983
984        if let Some(existing_block) = &entry.response {
985            // If the candidate block was already present, ensure it is the same block.
986            if block != *existing_block {
987                return Err(InsertBlockResponseError::MalformedBlock { height, peer_ip });
988            }
989        } else {
990            entry.response = Some(block.clone());
991        }
992
993        trace!("Received a new and valid block response for height {height}");
994
995        // Notify the sync loop that something changed.
996        self.response_notify.notify_one();
997
998        Ok(())
999    }
1000
1001    /// Checks that a block request for the given height does not already exist.
1002    fn check_block_request(&self, height: u32) -> Result<()> {
1003        // Ensure the block height is not already in the ledger.
1004        if self.ledger.contains_block_height(height) {
1005            bail!("Failed to add block request, as block {height} exists in the ledger");
1006        }
1007        // Ensure the block height is not already requested.
1008        if self.requests.read().contains_key(&height) {
1009            bail!("Failed to add block request, as block {height} exists in the requests map");
1010        }
1011
1012        Ok(())
1013    }
1014
1015    /// Removes the block request and response for the given height
1016    /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
1017    ///
1018    /// Precondition: This may only be called after `peek_next_block` has returned `Some`,
1019    /// which has checked if the request for the given height is complete
1020    /// and there is a block with the given `height` in the `responses` map.
1021    pub fn remove_block_response(&self, height: u32) {
1022        // Remove the request entry for the given height.
1023        if let Some(e) = self.requests.write().remove(&height) {
1024            trace!(
1025                "Block request for height {height} was completed in {}ms (sync speed is {})",
1026                e.timestamp.elapsed().as_millis(),
1027                self.get_sync_speed()
1028            );
1029
1030            // Notify the sending task that less requests are in-flight.
1031            self.peer_notify.notify_one();
1032        }
1033    }
1034
1035    /// Removes all block requests for the given peer IP.
1036    ///
1037    /// This is used when disconnecting from a peer or when a peer sends invalid block responses.
1038    fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
1039        trace!("Block sync is removing all block requests to peer {peer_ip}...");
1040
1041        // Remove the peer IP from the requests map. If any request entry is now empty,
1042        // and its corresponding response entry is also empty, then remove that request entry altogether.
1043        self.requests.write().retain(|height, e| {
1044            let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
1045
1046            // Only remove requests that were sent to this peer, that have no other peer that can respond instead,
1047            // and that were not completed yet.
1048            let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
1049            if !retain {
1050                trace!("Removed block request timestamp for {peer_ip} at height {height}");
1051            }
1052            retain
1053        });
1054
1055        // No need to remove responses here, because requests with responses will be retained.
1056    }
1057
1058    /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time.
1059    ///
1060    /// This removes the corresponding block responses and returns the set of peers/addresses that timed out.
1061    /// It will ask the peer pool handling service to ban any timed-out peers.
1062    ///
1063    /// # Return Value
1064    /// On success it will return `None` if there is nothing to re-request, or a set of new of block requests that replaced the timed-out requests.
1065    /// This set of new requests can also replace requests that timed out earlier, and which we were not able to re-request yet.
1066    ///
1067    /// This function will return an error if it cannot re-request blocks due to a lack of peers.
1068    /// In this case, the current iteration of block synchronization should not continue and the node should re-try later instead.
1069    pub fn handle_block_request_timeouts<P: PeerPoolHandling<N>>(
1070        &self,
1071        _peer_pool_handler: &P,
1072    ) -> Result<Option<BlockRequestBatch<N>>> {
1073        // Acquire the write lock on the requests map.
1074        let mut requests = self.requests.write();
1075
1076        // Retrieve the current time.
1077        let now = Instant::now();
1078
1079        // Retrieve the current block height
1080        let current_height = self.ledger.latest_block_height();
1081
1082        // Track the number of timed out block requests (only used to print a log message).
1083        let mut timed_out_requests = vec![];
1084
1085        // Track which peers should be banned due to unresponsiveness.
1086        let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1087
1088        // Remove timed out block requests.
1089        requests.retain(|height, e| {
1090            let is_obsolete = *height <= current_height;
1091            // Determine if the duration since the request timestamp has exceeded the request timeout.
1092            let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1093            // Determine if the request is incomplete.
1094            let is_complete = e.sync_ips().is_empty();
1095
1096            // Determine if the request has timed out.
1097            let is_timeout = timer_elapsed && !is_complete;
1098
1099            // Retain if this is not a timeout and is not obsolete.
1100            let retain = !is_timeout && !is_obsolete;
1101
1102            if is_timeout {
1103                trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1104
1105                // Increment the number of timed out block requests.
1106                timed_out_requests.push(*height);
1107            } else if is_obsolete {
1108                trace!("Block request at height {height} became obsolete (current_height={current_height})");
1109            }
1110
1111            // If the request timed out, also remove and ban given peer.
1112            if is_timeout {
1113                for peer_ip in e.sync_ips().iter() {
1114                    peers_to_ban.insert(*peer_ip);
1115                }
1116            }
1117
1118            retain
1119        });
1120
1121        if !timed_out_requests.is_empty() {
1122            debug!("{num} block requests timed out", num = timed_out_requests.len());
1123        }
1124
1125        let next_request_height = requests.iter().next().map(|(h, _)| *h);
1126
1127        // Avoid locking `locators` and `requests` at the same time.
1128        drop(requests);
1129
1130        // Now remove and ban any unresponsive peers
1131        for peer_ip in peers_to_ban {
1132            self.remove_peer(&peer_ip);
1133            // TODO: Uncomment this when we have a more rigorous analysis and testing of peer banning.
1134            // peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1135        }
1136
1137        // Determine if we need to re-issue any timed-out requests.
1138        // If there are no requests remaining or no gap at the beginning,
1139        // we do not need to re-issue requests and will just issue them regularly.
1140        //
1141        // This needs to be checked even if timed_out_requests is empty, because we might not be able to re-issue
1142        // requests immediately if there are no other peers at a given time.
1143        // Further, this only closes the first gap. So multiple calls to this might be needed.
1144        let sync_height = self.get_sync_height();
1145        let start_height = sync_height + 1;
1146
1147        let end_height = if let Some(next_height) = next_request_height
1148            && next_height > start_height
1149        {
1150            // The end height is exclusive, so use the height of the first existing block requests as the end
1151            next_height
1152        } else {
1153            // Nothing to do.
1154            // Do not log here as this check happens frequently.
1155            return Ok(None);
1156        };
1157
1158        // Set the maximum number of blocks, so that they do not exceed the end height.
1159        let max_new_blocks_to_request = end_height - start_height;
1160
1161        let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1162            // This generally shouldn't happen, because there cannot be outstanding requests when no peers are connected.
1163            bail!("Cannot re-request blocks because no or not enough peers are connected");
1164        };
1165
1166        // Retrieve the greatest block height of any connected peer.
1167        let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1168            // This should never happen because `sync_peers` is guaranteed to be non-empty.
1169            bail!("Cannot re-request blocks because no or not enough peers are connected");
1170        };
1171
1172        // (Try to) construct the requests.
1173        let requests = self.construct_requests(
1174            &sync_peers,
1175            sync_height,
1176            min_common_ancestor,
1177            max_new_blocks_to_request,
1178            greatest_peer_height,
1179        );
1180
1181        // If the ledger advanced concurrenctly, there may be no requests to issue after all.
1182        // The given height may also be greater `start_height` due to concurerent block advancement.
1183        if let Some((height, _)) = requests.as_slice().first() {
1184            debug!("Re-requesting blocks starting at height {height}");
1185            Ok(Some((requests, sync_peers)))
1186        } else {
1187            // Do not log here as this constitutes a benign race condition.
1188            Ok(None)
1189        }
1190    }
1191
1192    /// Finds the peers to sync from and the shared common ancestor, starting at the give height.
1193    ///
1194    /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer.
1195    /// Returns `None` if there are no peers to sync from.
1196    ///
1197    /// # Locking
1198    /// This function will read-lock `common_ancestors`.
1199    fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1200        // Retrieve the latest ledger height.
1201        let latest_ledger_height = self.ledger.latest_block_height();
1202
1203        // Pick a set of peers above the latest ledger height, and include their locators.
1204        // This will sort the peers by locator height in descending order.
1205        let candidate_locators: IndexMap<_, _> = self
1206            .locators
1207            .read()
1208            .iter()
1209            .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1210            .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1211            .take(NUM_SYNC_CANDIDATE_PEERS)
1212            .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1213            .collect();
1214
1215        // Case 0: If there are no candidate peers, return `None`.
1216        if candidate_locators.is_empty() {
1217            trace!("Found no sync peers with height greater {current_height}");
1218            return None;
1219        }
1220
1221        // TODO (howardwu): Change this to the highest cumulative weight for Phase 3.
1222        // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height,
1223        // then pick the peer with the highest height, and find peers (up to extra redundancy) with
1224        // a common ancestor above the block request range. Set the end height to their common ancestor.
1225
1226        // Determine the threshold number of peers to sync from.
1227        let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1228
1229        // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height
1230        // and a cohort of peers who share a common ancestor above this node's latest ledger height.
1231        for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1232            // The height of the common ancestor shared by all selected peers.
1233            let mut min_common_ancestor = peer_locators.latest_locator_height();
1234
1235            // The peers we will synchronize from.
1236            // As the previous iteration did not succeed, restart with the next candidate peers.
1237            let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1238
1239            // Try adding other peers consistent with this one to the sync peer set.
1240            for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1241                // Check if these two peers have a common ancestor above the latest ledger height.
1242                if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1243                    // If so, then check that their block locators are consistent.
1244                    if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1245                        // If their common ancestor is less than the minimum common ancestor, then update it.
1246                        min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1247
1248                        // Add the other peer to the list of sync peers.
1249                        sync_peers.push((*other_ip, other_locators.clone()));
1250                    }
1251                }
1252            }
1253
1254            // If we have enough sync peers above the latest ledger height, finish and return them.
1255            if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1256                // Shuffle the sync peers prior to returning. This ensures the rest of the stack
1257                // does not rely on the order of the sync peers, and that the sync peers are not biased.
1258                sync_peers.shuffle(&mut rand::thread_rng());
1259
1260                // Collect into an IndexMap and return.
1261                return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1262            }
1263        }
1264
1265        // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None.
1266        None
1267    }
1268
1269    /// Given the sync peers and their minimum common ancestor, return a list of block requests.
1270    fn construct_requests(
1271        &self,
1272        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1273        sync_height: u32,
1274        min_common_ancestor: u32,
1275        max_blocks_to_request: u32,
1276        greatest_peer_height: u32,
1277    ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1278        // Compute the start height for the block requests.
1279        let start_height = {
1280            let requests = self.requests.read();
1281            let ledger_height = self.ledger.latest_block_height();
1282
1283            // Do not issue requests for blocks already contained in the ledger.
1284            let mut start_height = ledger_height.max(sync_height + 1);
1285
1286            // Do not issue requests that already exist.
1287            while requests.contains_key(&start_height) {
1288                start_height += 1;
1289            }
1290
1291            start_height
1292        };
1293
1294        // If the minimum common ancestor is below the start height, then return early.
1295        if min_common_ancestor < start_height {
1296            if start_height < greatest_peer_height {
1297                trace!(
1298                    "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1299                );
1300            }
1301            return Default::default();
1302        }
1303
1304        // Compute the end height for the block request.
1305        let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1306
1307        // Construct the block hashes to request.
1308        let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1309        // Track the largest number of sync IPs required for any block request in the sequence of requests.
1310        let mut max_num_sync_ips = 1;
1311
1312        for height in start_height..end_height {
1313            // Ensure the current height is not in the ledger or already requested.
1314            if let Err(err) = self.check_block_request(height) {
1315                trace!("{err}");
1316
1317                // If the sequence of block requests is interrupted, then return early.
1318                // Otherwise, continue until the first start height that is new.
1319                match request_hashes.is_empty() {
1320                    true => continue,
1321                    false => break,
1322                }
1323            }
1324
1325            // Construct the block request.
1326            let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1327
1328            // Handle the dishonest case.
1329            if !is_honest {
1330                // TODO (howardwu): Consider performing an integrity check on peers (to disconnect).
1331                warn!("Detected dishonest peer(s) when preparing block request");
1332                // If there are not enough peers in the dishonest case, then return early.
1333                if sync_peers.len() < num_sync_ips {
1334                    break;
1335                }
1336            }
1337
1338            // Update the maximum number of sync IPs.
1339            max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1340
1341            // Append the request.
1342            request_hashes.insert(height, (hash, previous_hash));
1343        }
1344
1345        // Construct the requests with the same sync ips.
1346        request_hashes
1347            .into_iter()
1348            .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1349            .collect()
1350    }
1351}
1352
1353/// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash,
1354/// in order to allow the caller to determine what to do.
1355fn construct_request<N: Network>(
1356    height: u32,
1357    sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1358) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1359    let mut hash = None;
1360    let mut hash_redundancy: usize = 0;
1361    let mut previous_hash = None;
1362    let mut is_honest = true;
1363
1364    for peer_locators in sync_peers.values() {
1365        if let Some(candidate_hash) = peer_locators.get_hash(height) {
1366            match hash {
1367                // Increment the redundancy count if the hash matches.
1368                Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1369                // Some peer is dishonest.
1370                Some(_) => {
1371                    hash = None;
1372                    hash_redundancy = 0;
1373                    previous_hash = None;
1374                    is_honest = false;
1375                    break;
1376                }
1377                // Set the hash if it is not set.
1378                None => {
1379                    hash = Some(candidate_hash);
1380                    hash_redundancy = 1;
1381                }
1382            }
1383        }
1384        if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1385            match previous_hash {
1386                // Increment the redundancy count if the previous hash matches.
1387                Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1388                // Some peer is dishonest.
1389                Some(_) => {
1390                    hash = None;
1391                    hash_redundancy = 0;
1392                    previous_hash = None;
1393                    is_honest = false;
1394                    break;
1395                }
1396                // Set the previous hash if it is not set.
1397                None => previous_hash = Some(candidate_previous_hash),
1398            }
1399        }
1400    }
1401
1402    // Note that we intentionally do not just pick the peers that have the hash we have chosen,
1403    // to give stronger confidence that we are syncing during times when the network is consistent/stable.
1404    let num_sync_ips = {
1405        // Extra redundant peers - as the block hash was dishonest.
1406        if !is_honest {
1407            // Choose up to the extra redundancy factor in sync peers.
1408            EXTRA_REDUNDANCY_FACTOR
1409        }
1410        // No redundant peers - as we have redundancy on the block hash.
1411        else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1412            // Choose one sync peer.
1413            1
1414        }
1415        // Redundant peers - as we do not have redundancy on the block hash.
1416        else {
1417            // Choose up to the redundancy factor in sync peers.
1418            REDUNDANCY_FACTOR
1419        }
1420    };
1421
1422    (hash, previous_hash, num_sync_ips, is_honest)
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427    use super::*;
1428    use crate::locators::{
1429        CHECKPOINT_INTERVAL,
1430        NUM_RECENT_BLOCKS,
1431        test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1432    };
1433
1434    use snarkos_node_bft_ledger_service::MockLedgerService;
1435    use snarkos_node_network::{NodeType, Peer, Resolver};
1436    use snarkos_node_tcp::{P2P, Tcp};
1437    use snarkvm::{
1438        ledger::committee::Committee,
1439        prelude::{Field, TestRng},
1440    };
1441
1442    use indexmap::{IndexSet, indexset};
1443    #[cfg(feature = "locktick")]
1444    use locktick::parking_lot::RwLock;
1445    #[cfg(not(feature = "locktick"))]
1446    use parking_lot::RwLock;
1447    use rand::Rng;
1448    use std::net::{IpAddr, Ipv4Addr};
1449
1450    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1451
1452    #[derive(Default)]
1453    struct DummyPeerPoolHandler {
1454        peers_to_ban: RwLock<Vec<SocketAddr>>,
1455    }
1456
1457    impl P2P for DummyPeerPoolHandler {
1458        fn tcp(&self) -> &Tcp {
1459            unreachable!();
1460        }
1461    }
1462
1463    impl<N: Network> PeerPoolHandling<N> for DummyPeerPoolHandler {
1464        const MAXIMUM_POOL_SIZE: usize = 10;
1465        const OWNER: &str = "[DummyPeerPoolHandler]";
1466        const PEER_SLASHING_COUNT: usize = 0;
1467
1468        fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
1469            unreachable!();
1470        }
1471
1472        fn resolver(&self) -> &RwLock<Resolver<N>> {
1473            unreachable!();
1474        }
1475
1476        fn is_dev(&self) -> bool {
1477            true
1478        }
1479
1480        fn trusted_peers_only(&self) -> bool {
1481            false
1482        }
1483
1484        fn node_type(&self) -> NodeType {
1485            NodeType::Client
1486        }
1487
1488        fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) {
1489            self.peers_to_ban.write().push(listener_addr);
1490        }
1491    }
1492
1493    /// Returns the peer IP for the sync pool.
1494    fn sample_peer_ip(id: u16) -> SocketAddr {
1495        assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1496        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1497    }
1498
1499    /// Returns a sample committee.
1500    fn sample_committee() -> Committee<CurrentNetwork> {
1501        let rng = &mut TestRng::default();
1502        snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1503    }
1504
1505    /// Returns the ledger service, initialized to the given height.
1506    fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1507        MockLedgerService::new_at_height(sample_committee(), height)
1508    }
1509
1510    /// Returns the sync pool, with the ledger initialized to the given height.
1511    fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1512        BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)))
1513    }
1514
1515    /// Returns a vector of randomly sampled block heights in [0, max_height].
1516    ///
1517    /// The maximum value will always be included in the result.
1518    fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1519        assert!(num_values > 0, "Cannot generate an empty vector");
1520        assert!((max_height as usize) >= num_values);
1521
1522        let mut rng = TestRng::default();
1523
1524        let mut heights: Vec<u32> = (0..(max_height - 1)).choose_multiple(&mut rng, num_values);
1525
1526        heights.push(max_height);
1527
1528        heights
1529    }
1530
1531    /// Returns a duplicate (deep copy) of the sync pool with a different ledger height.
1532    fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1533        BlockSync::<CurrentNetwork> {
1534            peer_notify: Notify::new(),
1535            response_notify: Default::default(),
1536            ledger: Arc::new(sample_ledger_service(height)),
1537            locators: RwLock::new(sync.locators.read().clone()),
1538            common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1539            requests: RwLock::new(sync.requests.read().clone()),
1540            sync_state: RwLock::new(sync.sync_state.read().clone()),
1541            advance_with_sync_blocks_lock: Default::default(),
1542            metrics: Default::default(),
1543        }
1544    }
1545
1546    /// Checks that the sync pool (starting at genesis) returns the correct requests.
1547    fn check_prepare_block_requests(
1548        sync: BlockSync<CurrentNetwork>,
1549        min_common_ancestor: u32,
1550        peers: IndexSet<SocketAddr>,
1551    ) {
1552        let rng = &mut TestRng::default();
1553
1554        // Check test assumptions are met.
1555        assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1556
1557        // Determine the number of peers within range of this sync pool.
1558        let num_peers_within_recent_range_of_ledger = {
1559            // If no peers are within range, then set to 0.
1560            if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1561                0
1562            }
1563            // Otherwise, manually check the number of peers within range.
1564            else {
1565                peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1566            }
1567        };
1568
1569        // Prepare the block requests.
1570        let (requests, sync_peers) = sync.prepare_block_requests();
1571
1572        // If there are no peers, then there should be no requests.
1573        if peers.is_empty() {
1574            assert!(requests.is_empty());
1575            return;
1576        }
1577
1578        // Otherwise, there should be requests.
1579        let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1580        assert_eq!(requests.len(), expected_num_requests);
1581
1582        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1583            // Construct the sync IPs.
1584            let sync_ips: IndexSet<_> =
1585                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1586            assert_eq!(height, 1 + idx as u32);
1587            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1588            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1589
1590            if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1591                assert_eq!(sync_ips.len(), 1);
1592            } else {
1593                assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1594                assert_eq!(sync_ips, peers);
1595            }
1596        }
1597    }
1598
1599    /// Tests that height and hash values are set correctly using many different maximum block heights.
1600    #[test]
1601    fn test_latest_block_height() {
1602        for height in generate_block_heights(100_001, 5000) {
1603            let sync = sample_sync_at_height(height);
1604            // Check that the latest block height is the maximum height.
1605            assert_eq!(sync.ledger.latest_block_height(), height);
1606
1607            // Check the hash to height mapping
1608            assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1609            assert_eq!(
1610                sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1611                height
1612            );
1613        }
1614    }
1615
1616    #[test]
1617    fn test_get_block_hash() {
1618        for height in generate_block_heights(100_001, 5000) {
1619            let sync = sample_sync_at_height(height);
1620
1621            // Check the height to hash mapping
1622            assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1623            assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1624        }
1625    }
1626
1627    #[test]
1628    fn test_prepare_block_requests() {
1629        for num_peers in 0..111 {
1630            println!("Testing with {num_peers} peers");
1631
1632            let sync = sample_sync_at_height(0);
1633
1634            let mut peers = indexset![];
1635
1636            for peer_id in 1..=num_peers {
1637                // Add a peer.
1638                sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1639                // Add the peer to the set of peers.
1640                peers.insert(sample_peer_ip(peer_id));
1641            }
1642
1643            // If all peers are ahead, then requests should be prepared.
1644            check_prepare_block_requests(sync, 10, peers);
1645        }
1646    }
1647
1648    #[test]
1649    fn test_prepare_block_requests_with_leading_fork_at_11() {
1650        let sync = sample_sync_at_height(0);
1651
1652        // Intuitively, peer 1's fork is above peer 2 and peer 3's height.
1653        // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork.
1654        // Thus, you can sync up to block 10 from any of the 3 peers.
1655
1656        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11,
1657        // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers.
1658        // This is safe because the leading fork is at 11, and the sync pool is at 0,
1659        // so all candidate peers are at least 10 blocks ahead of the sync pool.
1660
1661        // Add a peer (fork).
1662        let peer_1 = sample_peer_ip(1);
1663        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1664
1665        // Add a peer.
1666        let peer_2 = sample_peer_ip(2);
1667        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1668
1669        // Add a peer.
1670        let peer_3 = sample_peer_ip(3);
1671        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1672
1673        // Prepare the block requests.
1674        let (requests, _) = sync.prepare_block_requests();
1675        assert_eq!(requests.len(), 10);
1676
1677        // Check the requests.
1678        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1679            assert_eq!(height, 1 + idx as u32);
1680            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1681            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1682            assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1683        }
1684    }
1685
1686    #[test]
1687    fn test_prepare_block_requests_with_leading_fork_at_10() {
1688        let rng = &mut TestRng::default();
1689        let sync = sample_sync_at_height(0);
1690
1691        // Intuitively, peer 1's fork is at peer 2 and peer 3's height.
1692        // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked.
1693        // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10.
1694        //
1695        // Now, while you could in theory sync up to block 9 from any of the 3 peers,
1696        // we choose not to do this as either side is likely to disconnect from us,
1697        // and we would rather wait for enough redundant peers before syncing.
1698
1699        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10,
1700        // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers.
1701        // We choose to sync with a cohort of peers that are *consistent* with each other,
1702        // and prioritize from descending heights (so the highest peer gets priority).
1703
1704        // Add a peer (fork).
1705        let peer_1 = sample_peer_ip(1);
1706        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1707
1708        // Add a peer.
1709        let peer_2 = sample_peer_ip(2);
1710        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1711
1712        // Add a peer.
1713        let peer_3 = sample_peer_ip(3);
1714        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1715
1716        // Prepare the block requests.
1717        let (requests, _) = sync.prepare_block_requests();
1718        assert_eq!(requests.len(), 0);
1719
1720        // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests.
1721
1722        // Add a peer.
1723        let peer_4 = sample_peer_ip(4);
1724        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1725
1726        // Prepare the block requests.
1727        let (requests, sync_peers) = sync.prepare_block_requests();
1728        assert_eq!(requests.len(), 10);
1729
1730        // Check the requests.
1731        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1732            // Construct the sync IPs.
1733            let sync_ips: IndexSet<_> =
1734                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1735            assert_eq!(height, 1 + idx as u32);
1736            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1737            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1738            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1739            assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer.
1740        }
1741    }
1742
1743    #[test]
1744    fn test_prepare_block_requests_with_trailing_fork_at_9() {
1745        let rng = &mut TestRng::default();
1746        let sync = sample_sync_at_height(0);
1747
1748        // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers
1749        // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2,
1750        // then you should be able to sync up to block 10, thereby biasing away from peer 3.
1751
1752        // Add a peer (fork).
1753        let peer_1 = sample_peer_ip(1);
1754        sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
1755
1756        // Add a peer.
1757        let peer_2 = sample_peer_ip(2);
1758        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1759
1760        // Add a peer.
1761        let peer_3 = sample_peer_ip(3);
1762        sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
1763
1764        // Prepare the block requests.
1765        let (requests, _) = sync.prepare_block_requests();
1766        assert_eq!(requests.len(), 0);
1767
1768        // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests.
1769
1770        // Add a peer.
1771        let peer_4 = sample_peer_ip(4);
1772        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
1773
1774        // Prepare the block requests.
1775        let (requests, sync_peers) = sync.prepare_block_requests();
1776        assert_eq!(requests.len(), 10);
1777
1778        // Check the requests.
1779        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1780            // Construct the sync IPs.
1781            let sync_ips: IndexSet<_> =
1782                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1783            assert_eq!(height, 1 + idx as u32);
1784            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1785            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1786            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1787            assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer.
1788        }
1789    }
1790
1791    #[test]
1792    fn test_insert_block_requests() {
1793        let rng = &mut TestRng::default();
1794        let sync = sample_sync_at_height(0);
1795
1796        // Add a peer.
1797        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1798
1799        // Prepare the block requests.
1800        let (requests, sync_peers) = sync.prepare_block_requests();
1801        assert_eq!(requests.len(), 10);
1802
1803        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1804            // Construct the sync IPs.
1805            let sync_ips: IndexSet<_> =
1806                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1807            // Insert the block request.
1808            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1809            // Check that the block requests were inserted.
1810            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1811            assert!(sync.get_block_request_timestamp(height).is_some());
1812        }
1813
1814        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1815            // Construct the sync IPs.
1816            let sync_ips: IndexSet<_> =
1817                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1818            // Check that the block requests are still inserted.
1819            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1820            assert!(sync.get_block_request_timestamp(height).is_some());
1821        }
1822
1823        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1824            // Construct the sync IPs.
1825            let sync_ips: IndexSet<_> =
1826                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1827            // Ensure that the block requests cannot be inserted twice.
1828            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
1829            // Check that the block requests are still inserted.
1830            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1831            assert!(sync.get_block_request_timestamp(height).is_some());
1832        }
1833    }
1834
1835    #[test]
1836    fn test_insert_block_requests_fails() {
1837        let sync = sample_sync_at_height(9);
1838
1839        // Add a peer.
1840        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
1841
1842        // Inserting a block height that is already in the ledger should fail.
1843        sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
1844        // Inserting a block height that is not in the ledger should succeed.
1845        sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
1846    }
1847
1848    #[test]
1849    fn test_update_peer_locators() {
1850        let sync = sample_sync_at_height(0);
1851
1852        // Test 2 peers.
1853        let peer1_ip = sample_peer_ip(1);
1854        for peer1_height in 0..500u32 {
1855            sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
1856            assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
1857
1858            let peer2_ip = sample_peer_ip(2);
1859            for peer2_height in 0..500u32 {
1860                println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
1861
1862                sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
1863                assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
1864
1865                // Compute the distance between the peers.
1866                let distance = peer1_height.abs_diff(peer2_height);
1867
1868                // Check the common ancestor.
1869                if distance < NUM_RECENT_BLOCKS as u32 {
1870                    let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
1871                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1872                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1873                } else {
1874                    let min_checkpoints =
1875                        core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
1876                    let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
1877                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
1878                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
1879                }
1880            }
1881        }
1882    }
1883
1884    #[test]
1885    fn test_remove_peer() {
1886        let sync = sample_sync_at_height(0);
1887
1888        let peer_ip = sample_peer_ip(1);
1889        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1890        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1891
1892        sync.remove_peer(&peer_ip);
1893        assert_eq!(sync.get_peer_height(&peer_ip), None);
1894
1895        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1896        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1897
1898        sync.remove_peer(&peer_ip);
1899        assert_eq!(sync.get_peer_height(&peer_ip), None);
1900    }
1901
1902    #[test]
1903    fn test_locators_insert_remove_insert() {
1904        let sync = sample_sync_at_height(0);
1905
1906        let peer_ip = sample_peer_ip(1);
1907        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
1908        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
1909
1910        sync.remove_peer(&peer_ip);
1911        assert_eq!(sync.get_peer_height(&peer_ip), None);
1912
1913        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
1914        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
1915    }
1916
1917    #[test]
1918    fn test_requests_insert_remove_insert() {
1919        let rng = &mut TestRng::default();
1920        let sync = sample_sync_at_height(0);
1921
1922        // Add a peer.
1923        let peer_ip = sample_peer_ip(1);
1924        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1925
1926        // Prepare the block requests.
1927        let (requests, sync_peers) = sync.prepare_block_requests();
1928        assert_eq!(requests.len(), 10);
1929
1930        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1931            // Construct the sync IPs.
1932            let sync_ips: IndexSet<_> =
1933                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1934            // Insert the block request.
1935            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1936            // Check that the block requests were inserted.
1937            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1938            assert!(sync.get_block_request_timestamp(height).is_some());
1939        }
1940
1941        // Remove the peer.
1942        sync.remove_peer(&peer_ip);
1943
1944        for (height, _) in requests {
1945            // Check that the block requests were removed.
1946            assert_eq!(sync.get_block_request(height), None);
1947            assert!(sync.get_block_request_timestamp(height).is_none());
1948        }
1949
1950        // As there is no peer, it should not be possible to prepare block requests.
1951        let (requests, _) = sync.prepare_block_requests();
1952        assert_eq!(requests.len(), 0);
1953
1954        // Add the peer again.
1955        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
1956
1957        // Prepare the block requests.
1958        let (requests, _) = sync.prepare_block_requests();
1959        assert_eq!(requests.len(), 10);
1960
1961        for (height, (hash, previous_hash, num_sync_ips)) in requests {
1962            // Construct the sync IPs.
1963            let sync_ips: IndexSet<_> =
1964                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1965            // Insert the block request.
1966            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1967            // Check that the block requests were inserted.
1968            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1969            assert!(sync.get_block_request_timestamp(height).is_some());
1970        }
1971    }
1972
1973    #[test]
1974    fn test_obsolete_block_requests() {
1975        let rng = &mut TestRng::default();
1976        let sync = sample_sync_at_height(0);
1977
1978        let locator_height = rng.gen_range(0..50);
1979
1980        // Add a peer.
1981        let locators = sample_block_locators(locator_height);
1982        sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
1983
1984        // Construct block requests
1985        let (requests, sync_peers) = sync.prepare_block_requests();
1986        assert_eq!(requests.len(), locator_height as usize);
1987
1988        // Add the block requests to the sync module.
1989        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
1990            // Construct the sync IPs.
1991            let sync_ips: IndexSet<_> =
1992                sync_peers.keys().choose_multiple(rng, num_sync_ips).into_iter().copied().collect();
1993            // Insert the block request.
1994            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
1995            // Check that the block requests were inserted.
1996            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
1997            assert!(sync.get_block_request_timestamp(height).is_some());
1998        }
1999
2000        // Duplicate a new sync module with a different height to simulate block advancement.
2001        // This range needs to be inclusive, so that the range is never empty,
2002        // even with a locator height of 0.
2003        let ledger_height = rng.gen_range(0..=locator_height);
2004        let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
2005
2006        // Check that the number of requests is the same.
2007        assert_eq!(new_sync.requests.read().len(), requests.len());
2008
2009        // Remove timed out block requests.
2010        let c = DummyPeerPoolHandler::default();
2011        new_sync.handle_block_request_timeouts(&c).unwrap();
2012
2013        // Check that the number of requests is reduced based on the ledger height.
2014        assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
2015    }
2016
2017    #[test]
2018    fn test_timed_out_block_request() {
2019        let sync = sample_sync_at_height(0);
2020        let peer_ip = sample_peer_ip(1);
2021        let locators = sample_block_locators(10);
2022        let block_hash = locators.get_hash(1);
2023
2024        sync.update_peer_locators(peer_ip, &locators).unwrap();
2025
2026        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2027
2028        // Add a timed-out request
2029        sync.requests.write().insert(1, OutstandingRequest {
2030            request: (block_hash, None, [peer_ip].into()),
2031            timestamp,
2032            response: None,
2033        });
2034
2035        assert_eq!(sync.requests.read().len(), 1);
2036        assert_eq!(sync.locators.read().len(), 1);
2037
2038        // Remove timed out block requests.
2039        let c = DummyPeerPoolHandler::default();
2040        sync.handle_block_request_timeouts(&c).unwrap();
2041
2042        // let ban_list = c.peers_to_ban.write();
2043        // assert_eq!(ban_list.len(), 1);
2044        // assert_eq!(ban_list.iter().next(), Some(&peer_ip));
2045
2046        assert!(sync.requests.read().is_empty());
2047        assert!(sync.locators.read().is_empty());
2048    }
2049
2050    #[test]
2051    fn test_reissue_timed_out_block_request() {
2052        let sync = sample_sync_at_height(0);
2053        let peer_ip1 = sample_peer_ip(1);
2054        let peer_ip2 = sample_peer_ip(2);
2055        let peer_ip3 = sample_peer_ip(3);
2056
2057        let locators = sample_block_locators(10);
2058        let block_hash1 = locators.get_hash(1);
2059        let block_hash2 = locators.get_hash(2);
2060
2061        sync.update_peer_locators(peer_ip1, &locators).unwrap();
2062        sync.update_peer_locators(peer_ip2, &locators).unwrap();
2063        sync.update_peer_locators(peer_ip3, &locators).unwrap();
2064
2065        assert_eq!(sync.locators.read().len(), 3);
2066
2067        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2068
2069        // Add a timed-out request
2070        sync.requests.write().insert(1, OutstandingRequest {
2071            request: (block_hash1, None, [peer_ip1].into()),
2072            timestamp,
2073            response: None,
2074        });
2075
2076        // Add a timed-out request
2077        sync.requests.write().insert(2, OutstandingRequest {
2078            request: (block_hash2, None, [peer_ip2].into()),
2079            timestamp: Instant::now(),
2080            response: None,
2081        });
2082
2083        assert_eq!(sync.requests.read().len(), 2);
2084
2085        // Remove timed out block requests.
2086        let c = DummyPeerPoolHandler::default();
2087
2088        let re_requests = sync.handle_block_request_timeouts(&c).unwrap();
2089
2090        // let ban_list = c.peers_to_ban.write();
2091        // assert_eq!(ban_list.len(), 1);
2092        // assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
2093
2094        assert_eq!(sync.requests.read().len(), 1);
2095        assert_eq!(sync.locators.read().len(), 2);
2096
2097        let (new_requests, new_sync_ips) = re_requests.unwrap();
2098        assert_eq!(new_requests.len(), 1);
2099
2100        let (height, (hash, _, _)) = new_requests.first().unwrap();
2101        assert_eq!(*height, 1);
2102        assert_eq!(*hash, block_hash1);
2103        assert_eq!(new_sync_ips.len(), 2);
2104
2105        // Make sure the removed peer is not in the sync_peer set.
2106        let mut iter = new_sync_ips.iter();
2107        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2108        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2109    }
2110}