Skip to main content

snarkos_node_sync/
block_sync.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    helpers::{PeerPair, PrepareSyncRequest, SyncRequest},
18    locators::BlockLocators,
19};
20use futures::future::BoxFuture;
21use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService};
22use snarkos_node_network::ConnectionMode;
23use snarkos_node_router::messages::DataBlocks;
24use snarkos_node_sync_communication_service::CommunicationService;
25use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS};
26
27use snarkvm::{
28    console::network::{ConsensusVersion, Network},
29    ledger::{Block, CheckBlockError},
30    utilities::flatten_error,
31};
32
33use anyhow::{Context, Result, anyhow, bail, ensure};
34use futures::FutureExt;
35use indexmap::{IndexMap, IndexSet};
36use itertools::Itertools;
37#[cfg(feature = "locktick")]
38use locktick::parking_lot::{Mutex, RwLock};
39#[cfg(feature = "locktick")]
40use locktick::tokio::Mutex as TMutex;
41#[cfg(not(feature = "locktick"))]
42use parking_lot::Mutex;
43#[cfg(not(feature = "locktick"))]
44use parking_lot::RwLock;
45use rand::seq::{IteratorRandom, SliceRandom};
46use std::{
47    collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map},
48    net::{IpAddr, Ipv4Addr, SocketAddr},
49    sync::Arc,
50    time::{Duration, Instant},
51};
52#[cfg(not(feature = "locktick"))]
53use tokio::sync::Mutex as TMutex;
54use tokio::sync::Notify;
55use tracing::info;
56
57mod helpers;
58use helpers::rangify_heights;
59
60mod sync_state;
61pub use sync_state::BftSyncMode;
62use sync_state::SyncState;
63
64mod metrics;
65use metrics::BlockSyncMetrics;
66
67// The redundancy factor decreases the possibility of a malicious peers sending us an invalid block locator
68// by requiring multiple peers to advertise the same (prefix of) block locators.
69// However, we do not use this in production yet.
70#[cfg(not(test))]
71pub const REDUNDANCY_FACTOR: usize = 1;
72#[cfg(test)]
73pub const REDUNDANCY_FACTOR: usize = 3;
74
75/// The time nodes wait between issuing batches of block requests to avoid triggering spam detection.
76///
77/// The current rate limit for all messages is around 160k  per second (see [`Gateway::max_cache_events`]).
78/// This constant limits number of block requests to a much lower 100 per second.
79///
80// TODO(kaimast): base rate limits on how many requests were sent to each peer instead.
81pub const BLOCK_REQUEST_BATCH_DELAY: Duration = Duration::from_millis(10);
82
83const EXTRA_REDUNDANCY_FACTOR: usize = REDUNDANCY_FACTOR * 3;
84const NUM_SYNC_CANDIDATE_PEERS: usize = REDUNDANCY_FACTOR * 5;
85
86const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
87
88/// The maximum number of outstanding block requests.
89/// Once a node hits this limit, it will not issue any new requests until existing requests time out or receive responses.
90const MAX_BLOCK_REQUESTS: usize = 50; // 50 requests
91
92/// The maximum number of blocks tolerated before the primary is considered behind its peers.
93pub const MAX_BLOCKS_BEHIND: u32 = 1; // blocks
94
95/// This is a dummy IP address that is used to represent the local node.
96/// Note: This here does not need to be a real IP address, but it must be unique/distinct from all other connections.
97pub const DUMMY_SELF_IP: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
98
99/// The map of failed block requests.
100type FailedRequests<H> = BTreeMap<u32, (Option<H>, Option<H>)>;
101
102/// Handle to an outstanding requested, containing the request itself and its timestamp.
103/// This does not contain the response so that checking for responses does not require iterating over all requests.
104#[derive(Clone)]
105struct OutstandingRequest<N: Network> {
106    request: SyncRequest<N>,
107    timestamp: Instant,
108    /// The corresponding response (if any).
109    /// This is guaranteed to be Some if sync_ips for the given request are empty.
110    response: Option<Block<N>>,
111}
112
113/// Information about a block request (used for the REST API).
114#[derive(Clone, serde::Serialize)]
115pub struct BlockRequestInfo {
116    /// Seconds since the request was created
117    elapsed: u64,
118    /// Has the request been responded to?
119    done: bool,
120}
121
122/// Summary of completed all in-flight requests.
123#[derive(Clone, serde::Serialize)]
124pub struct BlockRequestsSummary {
125    outstanding: String,
126    completed: String,
127}
128
129#[derive(thiserror::Error, Debug)]
130pub enum InsertBlockResponseError<N: Network> {
131    #[error("Empty block response")]
132    EmptyBlockResponse,
133    #[error("The peer did not send a consensus version")]
134    NoConsensusVersion,
135    #[error(
136        "The peer's consensus version for height {last_height} does not match ours: expected {expected_version}, got {peer_version}"
137    )]
138    ConsensusVersionMismatch { peer_version: ConsensusVersion, expected_version: ConsensusVersion, last_height: u32 },
139    #[error("Block Sync already advanced to block {height}")]
140    BlockSyncAlreadyAdvanced { height: u32 },
141    #[error("No such request for height {height}")]
142    NoSuchRequest { height: u32 },
143    #[error("Invalid block hash for height {height} from '{peer_ip}': expected {expected_hash}, got {actual_hash}")]
144    InvalidBlockHash { height: u32, peer_ip: SocketAddr, expected_hash: N::BlockHash, actual_hash: N::BlockHash },
145    #[error(
146        "The previous block hash in candidate block {height} from '{peer_ip}' is incorrect: expected {expected}, but got {actual}"
147    )]
148    InvalidPreviousBlockHash { height: u32, peer_ip: SocketAddr, expected: N::BlockHash, actual: N::BlockHash },
149    #[error("Candidate block {height} from '{peer_ip}' is malformed")]
150    MalformedBlock { height: u32, peer_ip: SocketAddr },
151    #[error("The sync pool did not request block {height} from '{peer_ip}'")]
152    WrongSyncPeer { height: u32, peer_ip: SocketAddr },
153    #[error("{}", flatten_error(.0))]
154    Other(#[from] anyhow::Error),
155}
156
157impl<N: Network> InsertBlockResponseError<N> {
158    /// Returns `true` if the error does not indicate malicious or faulty behavior.
159    pub fn is_benign(&self) -> bool {
160        matches!(self, Self::NoSuchRequest { .. } | Self::BlockSyncAlreadyAdvanced { .. })
161    }
162
163    // Returns true if the error is about an invalid consensus version.
164    pub fn is_invalid_consensus_version(&self) -> bool {
165        matches!(self, Self::ConsensusVersionMismatch { .. } | Self::NoConsensusVersion)
166    }
167}
168
169impl<N: Network> OutstandingRequest<N> {
170    /// Get a reference to the IPs of peers that have not responded to the request (yet).
171    fn sync_ips(&self) -> &IndexSet<SocketAddr> {
172        let (_, _, sync_ips) = &self.request;
173        sync_ips
174    }
175
176    /// Get a mutable reference to the IPs of peers that have not responded to the request (yet).
177    fn sync_ips_mut(&mut self) -> &mut IndexSet<SocketAddr> {
178        let (_, _, sync_ips) = &mut self.request;
179        sync_ips
180    }
181}
182
183/// A struct that keeps track of synchronizing blocks with other nodes.
184///
185/// It generates requests to send to other peers and processes responses to those requests.
186/// The struct also keeps track of block locators, which indicate which peers it can fetch blocks from.
187///
188/// # Notes
189/// - The actual network communication happens in `snarkos_node::Client` (for clients and provers) and in `snarkos_node_bft::Sync` (for validators).
190///
191/// - Validators only sync from other nodes using this struct if they fall behind, e.g.,
192///   because they experience a network partition.
193///   In the common case, validators will generate blocks from the DAG after an anchor certificate has been approved
194///   by a supermajority of the committee.
195pub struct BlockSync<N: Network> {
196    /// The ledger.
197    ledger: Arc<dyn LedgerService<N>>,
198
199    /// The connection mode of this node (Gateway for validators, Router for clients/provers).
200    connection_mode: ConnectionMode,
201
202    /// The map of peer IP to their block locators.
203    /// The block locators are consistent with the ledger and every other peer's block locators.
204    locators: RwLock<HashMap<SocketAddr, BlockLocators<N>>>,
205
206    /// The map of peer-to-peer to their common ancestor.
207    /// This map is used to determine which peers to request blocks from.
208    ///
209    /// Lock ordering: when locking both, `common_ancestors` and `locators`, `common_ancestors` must be locked first.
210    common_ancestors: RwLock<IndexMap<PeerPair, u32>>,
211
212    /// The block requests in progress and their responses.
213    requests: RwLock<BTreeMap<u32, OutstandingRequest<N>>>,
214
215    /// The boolean indicator of whether the node is synced up to the latest block (within the given tolerance).
216    ///
217    /// Lock ordering: if you lock `sync_state` and `requests`, you must lock `sync_state` first.
218    sync_state: RwLock<SyncState>,
219
220    /// The lock used to ensure that [`Self::advance_with_sync_blocks()`] is called by one task at a time.
221    advance_with_sync_blocks_lock: TMutex<()>,
222
223    /// Gets notified when there was an update to the locators or a peer disconnected.
224    peer_notify: Notify,
225
226    /// Gets notified when we received a new block response.
227    response_notify: Notify,
228
229    /// Tracks sync speed
230    metrics: BlockSyncMetrics,
231
232    /// Meta lock that ensures no new block requests are created while a peer is removed.
233    prepare_requests_lock: Mutex<()>,
234
235    /// Tracks failed requests that need to be re-issued.
236    failed_requests: Mutex<FailedRequests<N::BlockHash>>,
237
238    /// Tracks the last time each peer delivered a successful block response.
239    ///
240    /// Used in `handle_block_request_timeouts` to avoid banning peers that are actively
241    /// responding but cannot keep up with the request rate.
242    last_response_at: Mutex<HashMap<SocketAddr, Instant>>,
243
244    /// Condition variable that wakes up waiting tasks when the node is synced.
245    synced_notify: Notify,
246}
247
248impl<N: Network> BlockSync<N> {
249    /// Initializes a new block sync module.
250    pub fn new(ledger: Arc<dyn LedgerService<N>>, connection_mode: ConnectionMode) -> Self {
251        // Make sync state aware of the blocks that already exist on disk at startup.
252        let sync_state = SyncState::new_with_height(ledger.latest_block_height());
253
254        Self {
255            ledger,
256            connection_mode,
257            sync_state: RwLock::new(sync_state),
258            peer_notify: Default::default(),
259            response_notify: Default::default(),
260            locators: Default::default(),
261            requests: Default::default(),
262            common_ancestors: Default::default(),
263            advance_with_sync_blocks_lock: Default::default(),
264            metrics: Default::default(),
265            prepare_requests_lock: Default::default(),
266            failed_requests: Default::default(),
267            last_response_at: Default::default(),
268            synced_notify: Default::default(),
269        }
270    }
271
272    /// Blocks until something about a peer changes,
273    /// or block request has been fully processed (either successfully or unsuccessfully).
274    ///
275    /// Used by the outgoing task.
276    ///
277    /// # Concurrency
278    /// Only one task can wait on this at a time.
279    pub async fn wait_for_peer_update(&self) {
280        self.peer_notify.notified().await
281    }
282
283    /// Blocks until there is a new response to a block request.
284    ///
285    /// Used by the incoming task.
286    ///
287    /// # Concurrency
288    /// Only one task can wait on this at a time.
289    pub async fn wait_for_block_responses(&self) {
290        self.response_notify.notified().await
291    }
292
293    /// Returns `true` if the node is synced up to the latest block (within the given tolerance).
294    #[inline]
295    pub fn is_block_synced(&self) -> bool {
296        self.sync_state.read().is_block_synced()
297    }
298
299    /// This futures blocks until the node is synced.
300    ///
301    /// # Concurrency
302    /// Multiple tasks can wait on this at the same time safely.
303    pub async fn wait_for_synced(&self) {
304        loop {
305            let mut fut = std::pin::pin!(self.synced_notify.notified());
306
307            {
308                let sync_state = self.sync_state.read();
309                if sync_state.is_block_synced() {
310                    return;
311                }
312
313                // Register this task as waiting before dropping the lock.
314                fut.as_mut().enable();
315            }
316
317            fut.await;
318        }
319    }
320
321    /// Similar as [`Self::wait_for_synced`] but returns `None` if the node is already synced.
322    /// Otherwise, it will return a future that behaves like `wait_for_synced`.
323    ///
324    /// # Concurrency
325    /// * This method is atomic, unlike calling `is_synced` and `wait_for_synced` sequentially.
326    /// * Multiple tasks can wait on this at the same time safely.
327    pub fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<()>> {
328        let mut notified = Box::pin(self.synced_notify.notified());
329
330        {
331            let sync_state = self.sync_state.read();
332            if sync_state.is_block_synced() {
333                return None;
334            }
335
336            // Register this task as waiting before dropping the lock.
337            notified.as_mut().enable();
338        }
339
340        Some(
341            async move {
342                notified.await;
343                self.wait_for_synced().await;
344            }
345            .boxed(),
346        )
347    }
348
349    /// Returns the number of blocks the node is behind the greatest peer height,
350    /// or `None` if no peers are connected yet.
351    #[inline]
352    pub fn num_blocks_behind(&self) -> Option<u32> {
353        self.sync_state.read().num_blocks_behind()
354    }
355
356    /// Returns the greatest block height of any connected peer.
357    #[inline]
358    pub fn greatest_peer_block_height(&self) -> Option<u32> {
359        self.sync_state.read().get_greatest_peer_height()
360    }
361
362    /// Returns the current sync height of this node.
363    /// The sync height is always greater or equal to the ledger height.
364    #[inline]
365    pub fn get_sync_height(&self) -> u32 {
366        self.sync_state.read().get_sync_height()
367    }
368
369    /// Returns the BFT sync mode (fast or DAG), or `None` if no BFT layer is attached.
370    #[inline]
371    pub fn get_bft_sync_mode(&self) -> Option<BftSyncMode> {
372        self.sync_state.read().get_bft_sync_mode()
373    }
374
375    /// Sets the BFT sync mode. Should only be called by the BFT layer.
376    ///
377    /// # Returns
378    /// The previous BFT sync mode (if any).
379    #[inline]
380    pub fn set_bft_sync_mode(&self, mode: BftSyncMode) -> Option<BftSyncMode> {
381        self.sync_state.write().set_bft_sync_mode(mode)
382    }
383
384    /// Returns the number of blocks we requested from peers, but have not received yet.
385    #[inline]
386    pub fn num_outstanding_block_requests(&self) -> usize {
387        self.requests.read().iter().filter(|(_, e)| !e.sync_ips().is_empty()).count()
388    }
389
390    /// The total number of block request, including the ones that have been answered already but not processed yet.
391    #[inline]
392    pub fn num_total_block_requests(&self) -> usize {
393        self.requests.read().len()
394    }
395
396    //// Returns the latest locator height for all known peers.
397    pub fn get_peer_heights(&self) -> HashMap<SocketAddr, u32> {
398        self.locators.read().iter().map(|(addr, locators)| (*addr, locators.latest_locator_height())).collect()
399    }
400
401    //// Returns information about all in-flight block requests.
402    pub fn get_block_requests_info(&self) -> BTreeMap<u32, BlockRequestInfo> {
403        self.requests
404            .read()
405            .iter()
406            .map(|(height, request)| {
407                (*height, BlockRequestInfo {
408                    done: request.sync_ips().is_empty(),
409                    elapsed: request.timestamp.elapsed().as_secs(),
410                })
411            })
412            .collect()
413    }
414
415    /// Returns a summary of all in-flight requests.
416    pub fn get_block_requests_summary(&self) -> BlockRequestsSummary {
417        let requests = self.requests.read();
418        let completed = requests.iter().filter_map(|(h, e)| if e.sync_ips().is_empty() { Some(*h) } else { None });
419        let outstanding = requests.iter().filter_map(|(h, e)| if !e.sync_ips().is_empty() { Some(*h) } else { None });
420
421        BlockRequestsSummary { completed: rangify_heights(completed), outstanding: rangify_heights(outstanding) }
422    }
423
424    pub fn get_sync_speed(&self) -> f64 {
425        self.metrics.get_sync_speed()
426    }
427}
428
429// Helper functions needed for testing
430#[cfg(test)]
431impl<N: Network> BlockSync<N> {
432    /// Returns the latest block height of the given peer IP.
433    fn get_peer_height(&self, peer_ip: &SocketAddr) -> Option<u32> {
434        self.locators.read().get(peer_ip).map(|locators| locators.latest_locator_height())
435    }
436
437    /// Returns the common ancestor for the given peer pair, if it exists.
438    fn get_common_ancestor(&self, peer_a: SocketAddr, peer_b: SocketAddr) -> Option<u32> {
439        self.common_ancestors.read().get(&PeerPair(peer_a, peer_b)).copied()
440    }
441
442    /// Returns the block request for the given height, if it exists.
443    fn get_block_request(&self, height: u32) -> Option<SyncRequest<N>> {
444        self.requests.read().get(&height).map(|e| e.request.clone())
445    }
446
447    /// Returns the timestamp of the last time the block was requested, if it exists.
448    fn get_block_request_timestamp(&self, height: u32) -> Option<Instant> {
449        self.requests.read().get(&height).map(|e| e.timestamp)
450    }
451}
452
453impl<N: Network> BlockSync<N> {
454    /// Returns the block locators.
455    #[inline]
456    pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
457        // Retrieve the latest block height.
458        let latest_height = self.ledger.latest_block_height();
459
460        // Initialize the recents map.
461        // TODO: generalize this for RECENT_INTERVAL > 1, or remove this comment if we hardwire that to 1
462        let mut recents = IndexMap::with_capacity(NUM_RECENT_BLOCKS);
463        // Retrieve the recent block hashes.
464        for height in latest_height.saturating_sub((NUM_RECENT_BLOCKS - 1) as u32)..=latest_height {
465            recents.insert(height, self.ledger.get_block_hash(height)?);
466        }
467
468        // Initialize the checkpoints map.
469        let mut checkpoints = IndexMap::with_capacity((latest_height / CHECKPOINT_INTERVAL + 1).try_into()?);
470        // Retrieve the checkpoint block hashes.
471        for height in (0..=latest_height).step_by(CHECKPOINT_INTERVAL as usize) {
472            checkpoints.insert(height, self.ledger.get_block_hash(height)?);
473        }
474
475        // Construct the block locators.
476        BlockLocators::new(recents, checkpoints)
477    }
478
479    /// Returns true if there are pending responses to block requests that need to be processed.
480    pub fn has_pending_responses(&self) -> bool {
481        self.requests.read().iter().filter(|(_, req)| req.response.is_some() && req.sync_ips().is_empty()).count() > 0
482    }
483
484    /// Send a batch of block requests.
485    pub async fn send_block_requests<C: CommunicationService>(
486        &self,
487        communication: &C,
488        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
489        requests: &[(u32, PrepareSyncRequest<N>)],
490    ) -> bool {
491        let (start_height, max_num_sync_ips) = match requests.first() {
492            Some((height, (_, _, max_num_sync_ips))) => (*height, *max_num_sync_ips),
493            None => {
494                warn!("Block sync failed - no block requests");
495                return false;
496            }
497        };
498
499        // Use a randomly sampled subset of the sync IPs.
500        let sync_ips: IndexSet<_> =
501            sync_peers.keys().copied().sample(&mut rand::rng(), max_num_sync_ips).into_iter().collect();
502
503        // Calculate the end height.
504        let end_height = start_height.saturating_add(requests.len() as u32);
505
506        // A peer may have disconnected after we prepared this batch; inserting requests that
507        // reference them would leave those requests never cleaned by remove_peer. Hold the same
508        // lock that remove_peer uses so we're atomic: check that all selected peers are still
509        // connected, then insert, without a disconnect slipping in between.
510        {
511            let _prepare_requests_lock = self.prepare_requests_lock.lock();
512            let all_still_connected = {
513                let locators = self.locators.read();
514                sync_ips.iter().all(|ip| locators.contains_key(ip))
515            };
516
517            if !all_still_connected {
518                trace!(
519                    "Skipping block request batch for heights {start_height}-{inclusive_end}: at least one of the selected peer(s) has disconnected",
520                    inclusive_end = end_height.saturating_sub(1)
521                );
522                return false;
523            }
524
525            // Insert the chunk of block requests (still holding lock so remove_peer cannot run).
526            for (height, (hash, previous_hash, _)) in requests.iter() {
527                // Insert the block request into the sync pool using the sync IPs from the last block request in the chunk.
528                if let Err(err) = self.insert_block_request(*height, (*hash, *previous_hash, sync_ips.clone())) {
529                    let err = err.context(format!("Failed to insert block request for height {height}"));
530                    warn!("{}", flatten_error(&err));
531                    return false;
532                }
533            }
534        }
535
536        debug!("Sending {len} block requests to peer(s) at {peers:?}", len = requests.len(), peers = sync_ips);
537
538        /* Send the block request to the peers */
539
540        // Construct the message.
541        let message = C::prepare_block_request(start_height, end_height);
542
543        // Send the message to the peers.
544        let mut tasks = Vec::with_capacity(sync_ips.len());
545        for sync_ip in sync_ips {
546            let sender = communication.send(sync_ip, message.clone()).await;
547            let task = tokio::spawn(async move {
548                // Ensure the request is sent successfully.
549                match sender {
550                    Some(sender) => {
551                        if let Err(err) = sender.await {
552                            warn!("Failed to send block request to peer '{sync_ip}': {err}");
553                            false
554                        } else {
555                            true
556                        }
557                    }
558                    None => {
559                        warn!("Failed to send block request to peer '{sync_ip}': no such peer");
560                        false
561                    }
562                }
563            });
564
565            tasks.push(task);
566        }
567
568        // Wait for all sends to finish at the same time.
569        for result in futures::future::join_all(tasks).await {
570            let success = match result {
571                Ok(success) => success,
572                Err(err) => {
573                    error!("tokio join error: {err}");
574                    false
575                }
576            };
577
578            // If sending fails for any peer, remove the block request from the sync pool.
579            if !success {
580                // Remove the entire block request from the sync pool.
581                let mut requests = self.requests.write();
582                for height in start_height..end_height {
583                    requests.remove(&height);
584                }
585                // Break out of the loop.
586                return false;
587            }
588        }
589        true
590    }
591
592    /// Handles timeouts, checks if block sync is possible, prepares block requests,
593    /// and sends them via the given [`CommunicationService`].
594    ///
595    /// Callers typically call this in a loop after waiting for peer updates, e.g.
596    /// `timeout(MAX_SYNC_INTERVAL, self.wait_for_peer_update())`.
597    pub async fn try_issuing_block_requests<C: CommunicationService>(&self, communication: &C) {
598        self.handle_block_request_timeouts();
599
600        if self.is_block_synced() {
601            trace!("Node is already synced. Will not issue new block requests");
602            return;
603        }
604
605        if !self.sync_state.read().can_issue_new_block_requests() && self.failed_requests.lock().is_empty() {
606            trace!("Nothing to sync. Will not issue new block requests");
607            return;
608        }
609
610        let batches = self.prepare_block_requests();
611
612        if batches.is_empty() {
613            let total_requests = self.num_total_block_requests();
614            let num_outstanding = self.num_outstanding_block_requests();
615            if total_requests != 0 {
616                trace!(
617                    "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
618                );
619            } else {
620                debug!(
621                    "Not block synced yet, and there are no outstanding block requests or \
622                 new block requests to send"
623                );
624            }
625        } else {
626            for (block_requests, sync_peers) in batches {
627                for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
628                    if !self.send_block_requests(communication, &sync_peers, requests).await {
629                        break;
630                    }
631                    tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
632                }
633            }
634        }
635    }
636
637    /// Inserts a new block response from the given peer IP.
638    ///
639    /// Returns an error if the block was malformed, or we already received a different block for this height.
640    /// This function also removes all block requests from the given peer IP on failure.
641    ///
642    /// Note, that this only queues the response. After this, you most likely want to call `Self::try_advancing_block_synchronization`.
643    ///
644    #[inline]
645    pub fn insert_block_responses(
646        &self,
647        peer_ip: SocketAddr,
648        blocks: Vec<Block<N>>,
649        latest_consensus_version: Option<ConsensusVersion>,
650    ) -> Result<(), InsertBlockResponseError<N>> {
651        // Attempt to insert the block responses, and break if we encounter an error.
652        let result = 'outer: {
653            let Some(last_height) = blocks.as_slice().last().map(|b| b.height()) else {
654                break 'outer Err(InsertBlockResponseError::EmptyBlockResponse);
655            };
656
657            let expected_consensus_version =
658                N::CONSENSUS_VERSION(last_height).map_err(InsertBlockResponseError::Other)?;
659
660            // Perform consensus version check, if possible.
661            // This check is only enabled after nodes have reached V12.
662            if expected_consensus_version >= ConsensusVersion::V12 {
663                if let Some(peer_version) = latest_consensus_version {
664                    if peer_version != expected_consensus_version {
665                        break 'outer Err(InsertBlockResponseError::ConsensusVersionMismatch {
666                            peer_version,
667                            expected_version: expected_consensus_version,
668                            last_height,
669                        });
670                    }
671                } else {
672                    break 'outer Err(InsertBlockResponseError::NoConsensusVersion);
673                }
674            }
675
676            // Insert the candidate blocks into the sync pool.
677            for block in blocks {
678                if let Err(error) = self.insert_block_response(peer_ip, block) {
679                    break 'outer Err(error);
680                }
681            }
682
683            Ok(())
684        };
685
686        // On failure, remove all block requests to the peer.
687        if result.is_err() {
688            self.remove_block_requests_to_peer(&peer_ip);
689        }
690
691        // Return the result.
692        result
693    }
694
695    /// Returns the next block for the given `next_height` if the request is complete,
696    /// or `None` otherwise. This does not remove the block from the `responses` map.
697    #[inline]
698    pub fn peek_next_block(&self, next_height: u32) -> Option<Block<N>> {
699        // Determine if the request is complete:
700        // either there is no request for `next_height`, or the request has no peer socket addresses left.
701        if let Some(entry) = self.requests.read().get(&next_height) {
702            let is_complete = entry.sync_ips().is_empty();
703            if !is_complete {
704                return None;
705            }
706
707            // If the request is complete, return the block from the responses, if there is one.
708            if entry.response.is_none() {
709                warn!("Request for height {next_height} is complete but no response exists");
710            }
711            entry.response.clone()
712        } else {
713            None
714        }
715    }
716
717    /// Attempts to advance synchronization by processing completed block responses.
718    ///
719    /// Returns true, if new blocks were added to the ledger.
720    ///
721    /// # Usage
722    /// This is only called in [`Client::try_block_sync`] and should not be called concurrently by multiple tasks.
723    /// Validators do not call this function, and instead invoke
724    /// [`snarkos_node_bft::Sync::try_advancing_block_synchronization`] which also updates the BFT state.
725    #[inline]
726    pub async fn try_advancing_block_synchronization(&self) -> Result<bool> {
727        // Acquire the lock to ensure this function is called only once at a time.
728        // If the lock is already acquired, return early.
729        //
730        // Note: This lock should not be needed anymore as there is only one place we call it from,
731        // but we keep it for now out of caution.
732        // TODO(kaimast): remove this eventually.
733        let Ok(_lock) = self.advance_with_sync_blocks_lock.try_lock() else {
734            trace!("Skipping attempt to advance block synchronziation as it is already in progress");
735            return Ok(false);
736        };
737
738        // Start with the current height.
739        let mut current_height = self.ledger.latest_block_height();
740        let start_height = current_height;
741        trace!(
742            "Try advancing with block responses (at block {current_height}, current sync speed is {})",
743            self.get_sync_speed()
744        );
745
746        loop {
747            let next_height = current_height + 1;
748
749            let Some(block) = self.peek_next_block(next_height) else {
750                break;
751            };
752
753            // Ensure the block height matches.
754            if block.height() != next_height {
755                warn!("Block height mismatch: expected {}, found {}", current_height + 1, block.height());
756                break;
757            }
758
759            let ledger = self.ledger.clone();
760
761            let (advanced, stop) = tokio::task::spawn_blocking(move || {
762                let ledger_update = match ledger.begin_ledger_update() {
763                    Ok(update) => update,
764                    Err(BeginLedgerUpdateError::ShuttingDown) => {
765                        info!("BlockSync cannot advance the ledger any more. The node is shutting down.");
766                        return Ok((false, true));
767                    }
768                    Err(err) => {
769                        return Err(anyhow!("Unexpected error when beginning ledger update: {err}"));
770                    }
771                };
772
773                // Try to check the next block and advance to it.
774                let block = match ledger_update.check_next_block(block) {
775                    Ok(block) => block,
776                    Err(CheckBlockError::InvalidHeight { .. })
777                    | Err(CheckBlockError::BlockAlreadyExists { .. })
778                    | Err(CheckBlockError::InvalidRound { .. }) => {
779                        debug!("Skipping a block at height {next_height}. The ledger already advanced",);
780                        return Ok((false, false));
781                    }
782                    Err(err) => {
783                        warn!("{err}");
784                        return Err(err.into_anyhow());
785                    }
786                };
787
788                ledger_update.advance_to_next_block(&block).with_context(|| {
789                    format!(
790                        "Failed to advance to next block (height: {height}, hash: {hash})",
791                        height = block.height(),
792                        hash = block.hash(),
793                    )
794                })?;
795
796                Ok((true, false))
797            })
798            .await??;
799
800            // Only count successful advances.
801            // We may not always advance, for example, if the block was already added to the ledger.
802            if advanced {
803                self.count_request_completed();
804            }
805
806            // Remove the block response.
807            self.remove_block_response(next_height);
808
809            // If the node is shutting down, exit the loop.
810            if stop {
811                break;
812            }
813
814            // Update the latest height.
815            current_height = next_height;
816        }
817
818        if current_height > start_height {
819            self.set_sync_height(current_height);
820            Ok(true)
821        } else {
822            Ok(false)
823        }
824    }
825}
826
827impl<N: Network> BlockSync<N> {
828    /// Returns the sync peers with their latest heights, and their minimum common ancestor, if the node can sync.
829    /// This function returns peers that are consistent with each other, and have a block height
830    /// that is greater than the ledger height of this node.
831    ///
832    /// # Locking
833    /// This will read-lock `common_ancestors` and `sync_state`, but not at the same time.
834    pub fn find_sync_peers(&self) -> Option<(IndexMap<SocketAddr, u32>, u32)> {
835        // Retrieve the current sync height.
836        let current_height = self.get_sync_height();
837
838        if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
839            // Map the locators into the latest height.
840            let sync_peers =
841                sync_peers.into_iter().map(|(ip, locators)| (ip, locators.latest_locator_height())).collect();
842            // Return the sync peers and their minimum common ancestor.
843            Some((sync_peers, min_common_ancestor))
844        } else {
845            None
846        }
847    }
848
849    /// Updates the block locators and common ancestors for the given peer IP.
850    ///
851    /// This function does not need to check that the block locators are well-formed,
852    /// because that is already done in [`BlockLocators::new()`], as noted in [`BlockLocators`].
853    ///
854    /// This function does **not** check
855    /// that the block locators are consistent with the peer's previous block locators or other peers' block locators.
856    pub fn update_peer_locators(&self, peer_ip: SocketAddr, locators: &BlockLocators<N>) -> Result<()> {
857        let connection_mode = self.connection_mode;
858        // -- First, update the locators entry for the given peer IP. --
859        // We perform this update atomically, and drop the lock as soon as we are done with the update.
860        match self.locators.write().entry(peer_ip) {
861            hash_map::Entry::Occupied(mut e) => {
862                // Return early if the block locators did not change.
863                if e.get() == locators {
864                    return Ok(());
865                }
866
867                let old_height = e.get().latest_locator_height();
868                let new_height = locators.latest_locator_height();
869
870                if old_height > new_height {
871                    debug!("Block height for peer {peer_ip} decreased from {old_height} to {new_height}",);
872                }
873                e.insert(locators.clone());
874            }
875            hash_map::Entry::Vacant(e) => {
876                e.insert(locators.clone());
877            }
878        }
879
880        // -- Second, compute the common ancestor with this node. --
881        let new_local_ancestor = {
882            let mut ancestor = 0;
883            // Attention: Please do not optimize this loop, as it performs fork-detection. In addition,
884            // by iterating upwards, it also early-terminates malicious block locators at the *first* point
885            // of bifurcation in their ledger history, which is a critical safety guarantee provided here.
886            for (height, hash) in locators.clone().into_iter() {
887                if let Ok(ledger_hash) = self.ledger.get_block_hash(height) {
888                    match ledger_hash == hash {
889                        true => ancestor = height,
890                        false => {
891                            warn!(
892                                "[{connection_mode}] Detected fork between this node and peer \"{peer_ip}\" at height {height}"
893                            );
894                            break;
895                        }
896                    }
897                }
898            }
899            ancestor
900        };
901
902        // -- Third, compute the common ancestor with every other peer, and determine if this peer is forked from others. --
903        // Do not hold write lock to `common_ancestors` here, because this can take a while with many peers.
904        let ancestor_updates: Vec<_> = self
905            .locators
906            .read()
907            .iter()
908            .filter_map(|(other_ip, other_locators)| {
909                // Skip if the other peer is the given peer.
910                if other_ip == &peer_ip {
911                    return None;
912                }
913                // Compute the common ancestor with the other peer.
914                let mut ancestor = 0;
915                for (height, hash) in other_locators.clone().into_iter() {
916                    if let Some(expected_hash) = locators.get_hash(height) {
917                        match expected_hash == hash {
918                            true => ancestor = height,
919                            false => {
920                                debug!(
921                                    "[{connection_mode}] Detected fork between peers \"{other_ip}\" and \"{peer_ip}\" at height {height}"
922                                );
923                                break;
924                            }
925                        }
926                    }
927                }
928
929                Some((PeerPair(peer_ip, *other_ip), ancestor))
930            })
931            .collect();
932
933        // -- Forth, update the map of common ancestors. --
934        // Scope the lock, so it is dropped before locking `sync_state`.
935        {
936            let mut common_ancestors = self.common_ancestors.write();
937            common_ancestors.insert(PeerPair(DUMMY_SELF_IP, peer_ip), new_local_ancestor);
938
939            for (peer_pair, new_ancestor) in ancestor_updates.into_iter() {
940                common_ancestors.insert(peer_pair, new_ancestor);
941            }
942        }
943
944        // -- Finally, update sync state and notify the sync loop about the change. --
945        let is_synced = if let Some(greatest_peer_height) =
946            self.locators.read().values().map(|l| l.latest_locator_height()).max()
947        {
948            let mut sync_state = self.sync_state.write();
949            sync_state.set_greatest_peer_height(greatest_peer_height);
950            sync_state.is_block_synced()
951        } else {
952            error!("Got new block locators but greatest peer height is zero.");
953            false
954        };
955
956        // For the unlikely case a peer's height gets lowered.
957        if is_synced {
958            self.synced_notify.notify_waiters();
959        }
960
961        // Even if the greatest peer height did not change, we still received new block locators
962        // that the sync loop might need to proceed.
963        self.peer_notify.notify_one();
964
965        Ok(())
966    }
967
968    /// TODO (howardwu): Remove the `common_ancestor` entry. But check that this is safe
969    ///  (that we don't rely upon it for safety when we re-connect with the same peer).
970    /// Removes the peer from the sync pool, if they exist.
971    pub fn remove_peer(&self, peer_ip: &SocketAddr) {
972        trace!("Removing peer {peer_ip} from block sync");
973
974        // Ensure no new block requests are issued to this peer, while it is disconnecting.
975        let _prepare_requests_lock = self.prepare_requests_lock.lock();
976
977        // Remove the locators entry for the given peer IP.
978        self.locators.write().remove(peer_ip);
979        // Remove all common ancestor entries for this peers.
980        self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
981        // Drop the last-response timestamp so a reconnecting peer starts fresh.
982        self.last_response_at.lock().remove(peer_ip);
983        // Remove all block requests to the peer.
984        self.remove_block_requests_to_peer(peer_ip);
985
986        let synced = {
987            // Do not lock sync state and locators at the same time.
988            let max_height = self.locators.read().values().map(|l| l.latest_locator_height()).max();
989            let mut sync_state = self.sync_state.write();
990
991            // Update sync state, because the greatest peer height may have decreased.
992            if let Some(greatest_peer_height) = max_height {
993                sync_state.set_greatest_peer_height(greatest_peer_height);
994            } else {
995                // There are no more peers left.
996                sync_state.clear_greatest_peer_height();
997            }
998
999            sync_state.is_block_synced()
1000        };
1001
1002        // For the case where the maximum peer height gets lowered.
1003        if synced {
1004            self.synced_notify.notify_waiters();
1005        }
1006
1007        // Notify the sync loop that something changed.
1008        self.peer_notify.notify_one();
1009    }
1010}
1011
1012// Helper type for prepare_block_requests
1013pub type BlockRequestBatch<N> = (Vec<(u32, PrepareSyncRequest<N>)>, IndexMap<SocketAddr, BlockLocators<N>>);
1014
1015impl<N: Network> BlockSync<N> {
1016    /// Returns a list of block requests and the sync peers, if the node needs to sync.
1017    ///
1018    /// You usually want to call `remove_timed_out_block_requests` before invoking this function.
1019    ///
1020    /// # Returns
1021    /// * An empty vector, if there is no work to be done.
1022    /// * Otherwise, a vector of block request batches, each with a contiguous range of heights.
1023    ///
1024    /// # Concurrency
1025    /// This should be called by at most one task at a time.
1026    ///
1027    /// # Usage
1028    ///  - For validators, the primary spawns exactly one task that periodically calls
1029    ///    `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it.
1030    ///  - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls
1031    ///    `Client::try_issuing_block_requests` which calls this function.
1032    ///  - Provers do not call this function.
1033    pub fn prepare_block_requests(&self) -> Vec<BlockRequestBatch<N>> {
1034        let _block_requests_lock = self.prepare_requests_lock.lock();
1035
1036        // Used to print more information when we max out on requests.
1037        let print_requests = || {
1038            if tracing::enabled!(tracing::Level::TRACE) {
1039                let summary = self.get_block_requests_summary();
1040
1041                if summary.completed.is_empty() {
1042                    trace!("There are no completed requests that have not been processed yet.");
1043                } else {
1044                    trace!("The following requests are complete but not processed yet: {:?}", summary.completed);
1045                }
1046
1047                if summary.outstanding.is_empty() {
1048                    trace!("There are no outstanding requests.");
1049                } else {
1050                    trace!("The following requests are still outstanding: {:?}", summary.outstanding);
1051                }
1052            }
1053        };
1054
1055        // Do not hold lock here as, currently, `find_sync_peers_inner` can take a while.
1056        let current_height = self.get_sync_height();
1057
1058        // Determine if there are any failed requests that need to be re-issued.
1059        //
1060        // The entries are only removed once the requests are successfully re-issued.
1061        let mut failed_requests = self.failed_requests.lock();
1062
1063        // Ensure none of the failed requests are obsolete.
1064        while let Some(height) = failed_requests.keys().next()
1065            && *height <= current_height
1066        {
1067            failed_requests.pop_first();
1068        }
1069
1070        // Re-issue the remaining failed requests.
1071        if !failed_requests.is_empty() {
1072            trace!("There are {} failed requests that need to be re-issued.", failed_requests.len());
1073
1074            // Convert the set of failed requests into one or multiple continuous ranges.
1075            let iter = failed_requests.iter();
1076            let mut batches: VecDeque<Vec<(u32, _, _)>> = VecDeque::new();
1077
1078            for (height, (hash, previous_hash)) in iter {
1079                if let Some(prev_batch) = batches.back_mut() {
1080                    if let Some((last_height, _, _)) = prev_batch.last()
1081                        && *last_height + 1 != *height
1082                    {
1083                        // We need to start a new batch.
1084                        batches.push_back(vec![(*height, *hash, *previous_hash)]);
1085                    } else {
1086                        // We can add the request to the current batch.
1087                        prev_batch.push((*height, *hash, *previous_hash));
1088                    }
1089                } else {
1090                    // First batch.
1091                    batches.push_back(vec![(*height, *hash, *previous_hash)]);
1092                }
1093            }
1094
1095            let mut result = vec![];
1096            while let Some(batch) = batches.pop_front() {
1097                // SAFETY: Batches are guaranteed to be non-empty.
1098                let start_height = batch.first().unwrap().0;
1099                let end_height = batch.last().unwrap().0 + 1;
1100
1101                // Set the maximum number of blocks, so that they do not exceed the end height.
1102                let max_new_blocks_to_request = end_height - start_height;
1103
1104                let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(start_height) else {
1105                    // This generally shouldn't happen, because there cannot be outstanding requests when no peers are connected.
1106                    error!("Cannot re-request blocks because no or not enough peers are connected");
1107                    return result;
1108                };
1109
1110                // Retrieve the greatest block height of any connected peer.
1111                let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
1112                    // This should never happen because `sync_peers` is guaranteed to be non-empty.
1113                    error!(
1114                        "Cannot re-request blocks because no or not enough peers with sufficient height are connected"
1115                    );
1116                    return result;
1117                };
1118
1119                // (Try to) construct the requests.
1120                let requests = self.construct_requests(
1121                    &sync_peers,
1122                    start_height.saturating_sub(1),
1123                    min_common_ancestor,
1124                    max_new_blocks_to_request,
1125                    greatest_peer_height,
1126                );
1127
1128                // Only remove from failed_requests the heights we actually re-issued.
1129                // (If construct_requests returned empty we must not drop these failed requests.)
1130                for (height, _) in &requests {
1131                    failed_requests.remove(height);
1132                }
1133
1134                result.push((requests, sync_peers));
1135            }
1136
1137            return result;
1138        }
1139
1140        // Ensure to not exceed the maximum number of outstanding block requests.
1141        let max_outstanding_block_requests =
1142            (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);
1143
1144        // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet.
1145        let max_total_requests = 4 * max_outstanding_block_requests;
1146
1147        let max_new_blocks_to_request =
1148            max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);
1149
1150        // Prepare the block requests and sync peers, or returns an empty result if there is nothing to request.
1151        if self.num_total_block_requests() >= max_total_requests as usize {
1152            trace!(
1153                "We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
1154            );
1155
1156            print_requests();
1157            vec![]
1158        } else if max_new_blocks_to_request == 0 {
1159            trace!(
1160                "Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
1161            );
1162
1163            print_requests();
1164            vec![]
1165        } else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
1166            // Retrieve the greatest block height of any connected peer.
1167            // We do not need to update the sync state here, as that already happens when the block locators are received.
1168            let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
1169
1170            // Construct the list of block requests.
1171            let requests = self.construct_requests(
1172                &sync_peers,
1173                current_height,
1174                min_common_ancestor,
1175                max_new_blocks_to_request,
1176                greatest_peer_height,
1177            );
1178
1179            if !requests.is_empty() {
1180                trace!(
1181                    "Generated new block requests for the following heights: {}",
1182                    rangify_heights(requests.iter().map(|(h, _)| *h))
1183                );
1184            }
1185
1186            vec![(requests, sync_peers)]
1187        } else if self.requests.read().is_empty() {
1188            // This can happen during a race condition where the node just finished syncing.
1189            // It does not make sense to log or change the sync status here.
1190            // Checking the sync status here also does not make sense, as the node might as well have switched back
1191            //  from `synced` to `syncing` between calling `find_sync_peers_inner` and this line.
1192
1193            vec![]
1194        } else {
1195            // This happens if we already requested all advertised blocks.
1196            trace!("No new blocks can be requested, but there are still outstanding requests.");
1197
1198            print_requests();
1199            vec![]
1200        }
1201    }
1202
1203    /// Should only be called by validators when they successfully process a block request.
1204    /// (for other nodes this will be automatically called internally)
1205    ///
1206    /// TODO(kaimast): remove this public function once the sync logic is fully unified `BlockSync`.
1207    pub fn count_request_completed(&self) {
1208        self.metrics.count_request_completed();
1209    }
1210
1211    /// Set the sync height to a the given value.
1212    /// This is a no-op if `new_height` is equal or less to the current sync height.
1213    pub fn set_sync_height(&self, new_height: u32) {
1214        // Scope state lock to avoid locking state and metrics at the same time.
1215        let (synced, fully_synced) = {
1216            let mut state = self.sync_state.write();
1217            state.set_sync_height(new_height);
1218            (state.is_block_synced(), !state.can_issue_new_block_requests())
1219        };
1220
1221        if fully_synced {
1222            self.metrics.mark_fully_synced();
1223        }
1224
1225        if synced {
1226            self.synced_notify.notify_waiters();
1227        }
1228    }
1229
1230    /// Inserts a block request for the given height.
1231    ///
1232    /// With a single task issuing block requests, a height should not already be in the requests
1233    /// map: heights in `failed_requests` were removed from `requests` when added there, and
1234    /// `construct_requests` skips heights already in `requests`. If this returns "already in
1235    /// requests map", logging the existing entry (e.g. has response? peers still in locators?)
1236    /// may help diagnose why the height was included in the batch.
1237    fn insert_block_request(&self, height: u32, (hash, previous_hash, sync_ips): SyncRequest<N>) -> Result<()> {
1238        // Ensure the block request does not already exist.
1239        self.check_block_request(height)?;
1240        // Ensure the sync IPs are not empty.
1241        ensure!(!sync_ips.is_empty(), "Cannot insert a block request with no sync IPs");
1242        // Insert the block request.
1243        self.requests.write().insert(height, OutstandingRequest {
1244            request: (hash, previous_hash, sync_ips),
1245            timestamp: Instant::now(),
1246            response: None,
1247        });
1248        Ok(())
1249    }
1250
1251    /// Inserts the given block response, after checking that the request exists and the response is well-formed.
1252    /// On success, this function removes the peer IP from the request sync peers and inserts the response.
1253    fn insert_block_response(&self, peer_ip: SocketAddr, block: Block<N>) -> Result<(), InsertBlockResponseError<N>> {
1254        // Retrieve the block height.
1255        let height = block.height();
1256        let mut requests = self.requests.write();
1257
1258        if self.ledger.contains_block_height(height) {
1259            return Err(InsertBlockResponseError::BlockSyncAlreadyAdvanced { height });
1260        }
1261
1262        let Some(entry) = requests.get_mut(&height) else {
1263            return Err(InsertBlockResponseError::NoSuchRequest { height });
1264        };
1265
1266        // Retrieve the request entry for the candidate block.
1267        let (expected_hash, expected_previous_hash, sync_ips) = &entry.request;
1268
1269        // Ensure the candidate block hash matches the expected hash.
1270        if let Some(expected_hash) = expected_hash
1271            && block.hash() != *expected_hash
1272        {
1273            return Err(InsertBlockResponseError::InvalidBlockHash {
1274                height,
1275                peer_ip,
1276                expected_hash: *expected_hash,
1277                actual_hash: block.hash(),
1278            });
1279        }
1280        // Ensure the previous block hash matches if it exists.
1281        if let Some(expected_previous_hash) = expected_previous_hash
1282            && block.previous_hash() != *expected_previous_hash
1283        {
1284            return Err(InsertBlockResponseError::InvalidPreviousBlockHash {
1285                height,
1286                peer_ip,
1287                expected: *expected_previous_hash,
1288                actual: block.previous_hash(),
1289            });
1290        }
1291        // Ensure the sync pool requested this block from the given peer.
1292        if !sync_ips.contains(&peer_ip) {
1293            return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip });
1294        }
1295
1296        // Remove the peer IP from the request entry.
1297        entry.sync_ips_mut().swap_remove(&peer_ip);
1298
1299        if let Some(existing_block) = &entry.response {
1300            // If the candidate block was already present, ensure it is the same block.
1301            if block != *existing_block {
1302                return Err(InsertBlockResponseError::MalformedBlock { height, peer_ip });
1303            }
1304        } else {
1305            entry.response = Some(block.clone());
1306        }
1307
1308        trace!("Received a new and valid block response for height {height}");
1309
1310        // Record that this peer is actively responding. Used by `handle_block_request_timeouts`
1311        // to avoid banning peers that are slow but making progress.
1312        self.last_response_at.lock().insert(peer_ip, Instant::now());
1313
1314        // Notify the sync loop that something changed.
1315        self.response_notify.notify_one();
1316
1317        Ok(())
1318    }
1319
1320    /// Checks that a block request for the given height does not already exist.
1321    fn check_block_request(&self, height: u32) -> Result<()> {
1322        // Ensure the block height is not already in the ledger.
1323        if self.ledger.contains_block_height(height) {
1324            bail!("Failed to add block request, as block {height} exists in the ledger");
1325        }
1326        // Ensure the block height is not already requested.
1327        if self.requests.read().contains_key(&height) {
1328            bail!("Failed to add block request, as block {height} exists in the requests map");
1329        }
1330
1331        Ok(())
1332    }
1333
1334    /// Removes the block request and response for the given height
1335    /// This may only be called after `peek_next_block`, which checked if the request for the given height was complete.
1336    ///
1337    /// Precondition: This may only be called after `peek_next_block` has returned `Some`,
1338    /// which has checked if the request for the given height is complete
1339    /// and there is a block with the given `height` in the `responses` map.
1340    pub fn remove_block_response(&self, height: u32) {
1341        // Remove the request entry for the given height.
1342        if let Some(e) = self.requests.write().remove(&height) {
1343            trace!(
1344                "Block request for height {height} was completed in {}ms (sync speed is {})",
1345                e.timestamp.elapsed().as_millis(),
1346                self.get_sync_speed()
1347            );
1348
1349            // Notify the sending task that less requests are in-flight.
1350            self.peer_notify.notify_one();
1351        }
1352    }
1353
1354    /// Removes all block requests for the given peer IP.
1355    ///
1356    /// This is used when disconnecting from a peer or when a peer sends invalid block responses.
1357    fn remove_block_requests_to_peer(&self, peer_ip: &SocketAddr) {
1358        trace!("Block sync is removing all block requests to peer {peer_ip}...");
1359        let mut heights = vec![];
1360        let mut removed_requests = vec![];
1361
1362        // Remove the peer IP from the requests map. If any request entry is now empty,
1363        // and its corresponding response entry is also empty, then remove that request entry altogether.
1364        self.requests.write().retain(|height, e| {
1365            let had_peer = e.sync_ips_mut().swap_remove(peer_ip);
1366
1367            if had_peer && e.response.is_none() {
1368                trace!("Removed outstanding block request to peer {peer_ip} at height {height}");
1369                heights.push(*height);
1370            }
1371
1372            // Only remove requests that were sent to this peer, that have no other peer that can respond instead,
1373            // and that were not completed yet.
1374            let retain = !had_peer || !e.sync_ips().is_empty() || e.response.is_some();
1375            if !retain {
1376                // Record the request to be re-issued.
1377                let (hash, previous_hash, _) = &e.request;
1378                removed_requests.push((*height, (*hash, *previous_hash)));
1379            }
1380            retain
1381        });
1382
1383        if !heights.is_empty() {
1384            debug!(
1385                "Removed outstanding block requests to disconnecting peer '{peer_ip}' at heights: {}. {} were fully removed.",
1386                rangify_heights(heights),
1387                removed_requests.len(),
1388            );
1389        }
1390
1391        // Mark all requests that were removed as failed.
1392        if !removed_requests.is_empty() {
1393            let mut failed_requests = self.failed_requests.lock();
1394            for (height, e) in removed_requests.into_iter() {
1395                let prev = failed_requests.insert(height, e);
1396                if prev.is_some() {
1397                    warn!(
1398                        "Failed to mark block request at height {height} as failed, as it already exists in the failed requests map"
1399                    );
1400                }
1401            }
1402        }
1403
1404        // No need to remove responses here, because requests with responses will be retained.
1405    }
1406
1407    /// Removes block requests that have timed out, i.e, requests we sent that did not receive a response in time.
1408    ///
1409    /// Timed-out requests will be marked as "failed" and re-issued on the next call to `prepare_block_requests`.
1410    pub fn handle_block_request_timeouts(&self) {
1411        // Snapshot last-response times before locking `requests`. A request whose assigned peer
1412        // has responded within `BLOCK_REQUEST_TIMEOUT` is not timed out, even if its own timer
1413        // has elapsed — the peer is keeping up with a backlog and timing this request out would
1414        // just churn it through `failed_requests` and lose its place in the queue.
1415        let responsive_peers: HashSet<SocketAddr> = {
1416            let last_response_at = self.last_response_at.lock();
1417            let now = Instant::now();
1418            last_response_at
1419                .iter()
1420                .filter_map(|(peer, t)| (now.duration_since(*t) <= BLOCK_REQUEST_TIMEOUT).then_some(*peer))
1421                .collect()
1422        };
1423
1424        // Avoid locking `locators` and `requests` at the same time.
1425        let (timed_out_requests, peers_to_ban) = {
1426            // Acquire the write lock on the requests map.
1427            let mut requests = self.requests.write();
1428
1429            // Retrieve the current time.
1430            let now = Instant::now();
1431
1432            // Retrieve the current block height
1433            let current_height = self.ledger.latest_block_height();
1434
1435            // Track the number of timed out block requests (only used to print a log message).
1436            let mut timed_out_requests = vec![];
1437
1438            // Track which peers should be banned due to unresponsiveness.
1439            let mut peers_to_ban: HashSet<SocketAddr> = HashSet::new();
1440
1441            // Remove timed out block requests.
1442            requests.retain(|height, e| {
1443                let is_obsolete = *height <= current_height;
1444                // Determine if the duration since the request timestamp has exceeded the request timeout.
1445                let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
1446                // Determine if the request is complete.
1447                let is_complete = e.sync_ips().is_empty() && e.response.is_some();
1448                // If any assigned peer is still actively responding, the request is not stuck.
1449                let has_responsive_peer = e.sync_ips().iter().any(|ip| responsive_peers.contains(ip));
1450
1451                // Determine if the request has timed out.
1452                let is_timeout = timer_elapsed && !is_complete && !has_responsive_peer;
1453
1454                // Retain if this is not a timeout and is not obsolete.
1455                let retain = !is_timeout && !is_obsolete;
1456
1457                if is_timeout {
1458                    trace!("Block request at height {height} has timed out: timer_elapsed={timer_elapsed}, is_complete={is_complete}, is_obsolete={is_obsolete}");
1459
1460                    // Increment the number of timed out block requests.
1461                    let (hash, previous_hash, _) = &e.request;
1462                    timed_out_requests.push((*height, (*hash, *previous_hash)));
1463                } else if is_obsolete {
1464                    trace!("Block request at height {height} became obsolete (current_height={current_height})");
1465                }
1466
1467                // If the request timed out, also remove and ban given peer.
1468                if is_timeout {
1469                    for peer_ip in e.sync_ips().iter() {
1470                        peers_to_ban.insert(*peer_ip);
1471                    }
1472                }
1473
1474                retain
1475            });
1476
1477            if !timed_out_requests.is_empty() {
1478                debug!(
1479                    "{num} block requests timed out: {list}",
1480                    num = timed_out_requests.len(),
1481                    list = rangify_heights(timed_out_requests.iter().map(|(height, _)| *height))
1482                );
1483            }
1484
1485            (timed_out_requests, peers_to_ban)
1486        };
1487
1488        // Mark the non-obsolete requests that timed out as failed.
1489        if !timed_out_requests.is_empty() {
1490            let mut failed_requests = self.failed_requests.lock();
1491            for (height, e) in timed_out_requests.into_iter() {
1492                let prev = failed_requests.insert(height, e);
1493                if prev.is_some() {
1494                    warn!(
1495                        "Failed to mark block request at height {height} as failed, as it already exists in the failed requests map"
1496                    );
1497                }
1498            }
1499        }
1500
1501        // Remove and ban the unresponsive peers. The `has_responsive_peer` check inside `retain`
1502        // above already guarantees that a request only counts as timed out when none of its
1503        // assigned peers have responded recently, so every peer in this set is unresponsive.
1504        for peer_ip in peers_to_ban {
1505            self.remove_peer(&peer_ip);
1506            // TODO: Uncomment this when we have a more rigorous analysis and testing of peer banning.
1507            // peer_pool_handler.ip_ban_peer(peer_ip, Some("timed out on block requests"));
1508        }
1509    }
1510
1511    /// Finds the peers to sync from and the shared common ancestor, starting at the give height.
1512    ///
1513    /// Unlike [`Self::find_sync_peers`] this does not only return the latest locators height, but the full BlockLocators for each peer.
1514    /// Returns `None` if there are no peers to sync from.
1515    ///
1516    /// # Locking
1517    /// This function will read-lock `common_ancestors`.
1518    fn find_sync_peers_inner(&self, current_height: u32) -> Option<(IndexMap<SocketAddr, BlockLocators<N>>, u32)> {
1519        // Retrieve the latest ledger height.
1520        let latest_ledger_height = self.ledger.latest_block_height();
1521
1522        // Pick a set of peers above the latest ledger height, and include their locators.
1523        // This will sort the peers by locator height in descending order.
1524        let candidate_locators: IndexMap<_, _> = self
1525            .locators
1526            .read()
1527            .iter()
1528            .filter(|(_, locators)| locators.latest_locator_height() > current_height)
1529            .sorted_by(|(_, a), (_, b)| b.latest_locator_height().cmp(&a.latest_locator_height()))
1530            .take(NUM_SYNC_CANDIDATE_PEERS)
1531            .map(|(peer_ip, locators)| (*peer_ip, locators.clone()))
1532            .collect();
1533
1534        // Case 0: If there are no candidate peers, return `None`.
1535        if candidate_locators.is_empty() {
1536            trace!("Found no sync peers with height greater {current_height}");
1537            return None;
1538        }
1539
1540        // TODO (howardwu): Change this to the highest cumulative weight for Phase 3.
1541        // Case 1: If all of the candidate peers share a common ancestor below the latest ledger height,
1542        // then pick the peer with the highest height, and find peers (up to extra redundancy) with
1543        // a common ancestor above the block request range. Set the end height to their common ancestor.
1544
1545        // Determine the threshold number of peers to sync from.
1546        let threshold_to_request = candidate_locators.len().min(REDUNDANCY_FACTOR);
1547
1548        // Breaks the loop when the first threshold number of peers are found, biasing for the peer with the highest height
1549        // and a cohort of peers who share a common ancestor above this node's latest ledger height.
1550        for (idx, (peer_ip, peer_locators)) in candidate_locators.iter().enumerate() {
1551            // The height of the common ancestor shared by all selected peers.
1552            let mut min_common_ancestor = peer_locators.latest_locator_height();
1553
1554            // The peers we will synchronize from.
1555            // As the previous iteration did not succeed, restart with the next candidate peers.
1556            let mut sync_peers = vec![(*peer_ip, peer_locators.clone())];
1557
1558            // Try adding other peers consistent with this one to the sync peer set.
1559            for (other_ip, other_locators) in candidate_locators.iter().skip(idx + 1) {
1560                // Check if these two peers have a common ancestor above the latest ledger height.
1561                if let Some(common_ancestor) = self.common_ancestors.read().get(&PeerPair(*peer_ip, *other_ip)) {
1562                    // If so, then check that their block locators are consistent.
1563                    if *common_ancestor > latest_ledger_height && peer_locators.is_consistent_with(other_locators) {
1564                        // If their common ancestor is less than the minimum common ancestor, then update it.
1565                        min_common_ancestor = min_common_ancestor.min(*common_ancestor);
1566
1567                        // Add the other peer to the list of sync peers.
1568                        sync_peers.push((*other_ip, other_locators.clone()));
1569                    }
1570                }
1571            }
1572
1573            // If we have enough sync peers above the latest ledger height, finish and return them.
1574            if min_common_ancestor > latest_ledger_height && sync_peers.len() >= threshold_to_request {
1575                // Shuffle the sync peers prior to returning. This ensures the rest of the stack
1576                // does not rely on the order of the sync peers, and that the sync peers are not biased.
1577                sync_peers.shuffle(&mut rand::rng());
1578
1579                // Collect into an IndexMap and return.
1580                return Some((sync_peers.into_iter().collect(), min_common_ancestor));
1581            }
1582        }
1583
1584        // If there is not enough peers with a minimum common ancestor above the latest ledger height, return None.
1585        None
1586    }
1587
1588    /// Given the sync peers and their minimum common ancestor, return a list of block requests.
1589    ///
1590    /// # Returns
1591    /// The list of block requests, ordered by height.
1592    fn construct_requests(
1593        &self,
1594        sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1595        sync_height: u32,
1596        min_common_ancestor: u32,
1597        max_blocks_to_request: u32,
1598        greatest_peer_height: u32,
1599    ) -> Vec<(u32, PrepareSyncRequest<N>)> {
1600        // Compute the start height for the block requests.
1601        let start_height = {
1602            let requests = self.requests.read();
1603            let ledger_height = self.ledger.latest_block_height();
1604
1605            // Do not issue requests for blocks already contained in the ledger.
1606            let mut start_height = ledger_height.max(sync_height + 1);
1607
1608            // Do not issue requests that already exist.
1609            while requests.contains_key(&start_height) {
1610                start_height += 1;
1611            }
1612
1613            start_height
1614        };
1615
1616        // If the minimum common ancestor is below the start height, then return early.
1617        if min_common_ancestor < start_height {
1618            if start_height < greatest_peer_height {
1619                trace!(
1620                    "No request to construct. Height for the next block request is {start_height}, but minimum common block locator ancestor is only {min_common_ancestor} (sync_height={sync_height} greatest_peer_height={greatest_peer_height})"
1621                );
1622            }
1623            return Default::default();
1624        }
1625
1626        // Compute the end height for the block request.
1627        let end_height = (min_common_ancestor + 1).min(start_height + max_blocks_to_request);
1628
1629        // Construct the block hashes to request.
1630        let mut request_hashes = IndexMap::with_capacity((start_height..end_height).len());
1631        // Track the largest number of sync IPs required for any block request in the sequence of requests.
1632        let mut max_num_sync_ips = 1;
1633
1634        for height in start_height..end_height {
1635            // Ensure the current height is not in the ledger or already requested.
1636            if self.check_block_request(height).is_err() {
1637                // If the sequence of block requests is interrupted, then return early.
1638                // Otherwise, continue until the first start height that is new.
1639                match request_hashes.is_empty() {
1640                    true => continue,
1641                    false => break,
1642                }
1643            }
1644
1645            // Construct the block request.
1646            let (hash, previous_hash, num_sync_ips, is_honest) = construct_request(height, sync_peers);
1647
1648            // Handle the dishonest case.
1649            if !is_honest {
1650                // TODO (howardwu): Consider performing an integrity check on peers (to disconnect).
1651                warn!("Detected dishonest peer(s) when preparing block request");
1652                // If there are not enough peers in the dishonest case, then return early.
1653                if sync_peers.len() < num_sync_ips {
1654                    break;
1655                }
1656            }
1657
1658            // Update the maximum number of sync IPs.
1659            max_num_sync_ips = max_num_sync_ips.max(num_sync_ips);
1660
1661            // Append the request.
1662            request_hashes.insert(height, (hash, previous_hash));
1663        }
1664
1665        // Construct the requests with the same sync ips.
1666        request_hashes
1667            .into_iter()
1668            .map(|(height, (hash, previous_hash))| (height, (hash, previous_hash, max_num_sync_ips)))
1669            .collect()
1670    }
1671}
1672
1673/// If any peer is detected to be dishonest in this function, it will not set the hash or previous hash,
1674/// in order to allow the caller to determine what to do.
1675fn construct_request<N: Network>(
1676    height: u32,
1677    sync_peers: &IndexMap<SocketAddr, BlockLocators<N>>,
1678) -> (Option<N::BlockHash>, Option<N::BlockHash>, usize, bool) {
1679    let mut hash = None;
1680    let mut hash_redundancy: usize = 0;
1681    let mut previous_hash = None;
1682    let mut is_honest = true;
1683
1684    for peer_locators in sync_peers.values() {
1685        if let Some(candidate_hash) = peer_locators.get_hash(height) {
1686            match hash {
1687                // Increment the redundancy count if the hash matches.
1688                Some(hash) if hash == candidate_hash => hash_redundancy += 1,
1689                // Some peer is dishonest.
1690                Some(_) => {
1691                    hash = None;
1692                    hash_redundancy = 0;
1693                    previous_hash = None;
1694                    is_honest = false;
1695                    break;
1696                }
1697                // Set the hash if it is not set.
1698                None => {
1699                    hash = Some(candidate_hash);
1700                    hash_redundancy = 1;
1701                }
1702            }
1703        }
1704        if let Some(candidate_previous_hash) = peer_locators.get_hash(height.saturating_sub(1)) {
1705            match previous_hash {
1706                // Increment the redundancy count if the previous hash matches.
1707                Some(previous_hash) if previous_hash == candidate_previous_hash => (),
1708                // Some peer is dishonest.
1709                Some(_) => {
1710                    hash = None;
1711                    hash_redundancy = 0;
1712                    previous_hash = None;
1713                    is_honest = false;
1714                    break;
1715                }
1716                // Set the previous hash if it is not set.
1717                None => previous_hash = Some(candidate_previous_hash),
1718            }
1719        }
1720    }
1721
1722    // Note that we intentionally do not just pick the peers that have the hash we have chosen,
1723    // to give stronger confidence that we are syncing during times when the network is consistent/stable.
1724    let num_sync_ips = {
1725        // Extra redundant peers - as the block hash was dishonest.
1726        if !is_honest {
1727            // Choose up to the extra redundancy factor in sync peers.
1728            EXTRA_REDUNDANCY_FACTOR
1729        }
1730        // No redundant peers - as we have redundancy on the block hash.
1731        else if hash.is_some() && hash_redundancy >= REDUNDANCY_FACTOR {
1732            // Choose one sync peer.
1733            1
1734        }
1735        // Redundant peers - as we do not have redundancy on the block hash.
1736        else {
1737            // Choose up to the redundancy factor in sync peers.
1738            REDUNDANCY_FACTOR
1739        }
1740    };
1741
1742    (hash, previous_hash, num_sync_ips, is_honest)
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747    use super::*;
1748    use crate::locators::{
1749        CHECKPOINT_INTERVAL,
1750        NUM_RECENT_BLOCKS,
1751        test_helpers::{sample_block_locators, sample_block_locators_with_fork},
1752    };
1753
1754    use snarkos_node_bft_ledger_service::MockLedgerService;
1755    use snarkvm::{
1756        ledger::committee::Committee,
1757        prelude::{Field, TestRng},
1758    };
1759
1760    use indexmap::{IndexSet, indexset};
1761    #[cfg(feature = "locktick")]
1762    use locktick::parking_lot::RwLock;
1763    #[cfg(not(feature = "locktick"))]
1764    use parking_lot::RwLock;
1765    use rand::RngExt;
1766    use std::net::{IpAddr, Ipv4Addr};
1767
1768    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1769
1770    /// Returns the peer IP for the sync pool.
1771    fn sample_peer_ip(id: u16) -> SocketAddr {
1772        assert_ne!(id, 0, "The peer ID must not be 0 (reserved for local IP in testing)");
1773        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), id)
1774    }
1775
1776    /// Returns a sample committee.
1777    fn sample_committee() -> Committee<CurrentNetwork> {
1778        let rng = &mut TestRng::default();
1779        snarkvm::ledger::committee::test_helpers::sample_committee(rng)
1780    }
1781
1782    /// Returns the ledger service, initialized to the given height.
1783    fn sample_ledger_service(height: u32) -> MockLedgerService<CurrentNetwork> {
1784        MockLedgerService::new_at_height(sample_committee(), height)
1785    }
1786
1787    /// Returns the sync pool, with the ledger initialized to the given height.
1788    fn sample_sync_at_height(height: u32) -> BlockSync<CurrentNetwork> {
1789        BlockSync::<CurrentNetwork>::new(Arc::new(sample_ledger_service(height)), ConnectionMode::Router)
1790    }
1791
1792    /// Returns a vector of randomly sampled block heights in [0, max_height].
1793    ///
1794    /// The maximum value will always be included in the result.
1795    fn generate_block_heights(max_height: u32, num_values: usize) -> Vec<u32> {
1796        assert!(num_values > 0, "Cannot generate an empty vector");
1797        assert!((max_height as usize) >= num_values);
1798
1799        let mut rng = TestRng::default();
1800
1801        let mut heights: Vec<u32> = (0..(max_height - 1)).sample(&mut rng, num_values);
1802
1803        heights.push(max_height);
1804
1805        heights
1806    }
1807
1808    /// Returns a duplicate (deep copy) of the sync pool with a different ledger height.
1809    fn duplicate_sync_at_new_height(sync: &BlockSync<CurrentNetwork>, height: u32) -> BlockSync<CurrentNetwork> {
1810        BlockSync::<CurrentNetwork> {
1811            failed_requests: Default::default(),
1812            peer_notify: Notify::new(),
1813            response_notify: Default::default(),
1814            ledger: Arc::new(sample_ledger_service(height)),
1815            connection_mode: sync.connection_mode,
1816            locators: RwLock::new(sync.locators.read().clone()),
1817            common_ancestors: RwLock::new(sync.common_ancestors.read().clone()),
1818            requests: RwLock::new(sync.requests.read().clone()),
1819            sync_state: RwLock::new(sync.sync_state.read().clone()),
1820            synced_notify: Notify::new(),
1821            advance_with_sync_blocks_lock: Default::default(),
1822            metrics: Default::default(),
1823            prepare_requests_lock: Default::default(),
1824            last_response_at: Default::default(),
1825        }
1826    }
1827
1828    /// Checks that the sync pool (starting at genesis) returns the correct requests.
1829    fn check_prepare_block_requests(
1830        sync: BlockSync<CurrentNetwork>,
1831        min_common_ancestor: u32,
1832        peers: IndexSet<SocketAddr>,
1833    ) {
1834        let rng = &mut TestRng::default();
1835
1836        // Check test assumptions are met.
1837        assert_eq!(sync.ledger.latest_block_height(), 0, "This test assumes the sync pool is at genesis");
1838
1839        // Determine the number of peers within range of this sync pool.
1840        let num_peers_within_recent_range_of_ledger = {
1841            // If no peers are within range, then set to 0.
1842            if min_common_ancestor >= NUM_RECENT_BLOCKS as u32 {
1843                0
1844            }
1845            // Otherwise, manually check the number of peers within range.
1846            else {
1847                peers.iter().filter(|peer_ip| sync.get_peer_height(peer_ip).unwrap() < NUM_RECENT_BLOCKS as u32).count()
1848            }
1849        };
1850
1851        // Prepare the block requests.
1852        let mut batches = sync.prepare_block_requests();
1853
1854        // If there are no peers, then there should be no requests.
1855        if peers.is_empty() {
1856            assert!(batches.is_empty());
1857            return;
1858        }
1859
1860        let (requests, sync_peers) = batches.pop().unwrap();
1861
1862        // Otherwise, there should be requests.
1863        let expected_num_requests = core::cmp::min(min_common_ancestor as usize, MAX_BLOCK_REQUESTS);
1864        assert_eq!(requests.len(), expected_num_requests);
1865
1866        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1867            // Construct the sync IPs.
1868            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
1869            assert_eq!(height, 1 + idx as u32);
1870            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1871            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1872
1873            if num_peers_within_recent_range_of_ledger >= REDUNDANCY_FACTOR {
1874                assert_eq!(sync_ips.len(), 1);
1875            } else {
1876                assert_eq!(sync_ips.len(), num_peers_within_recent_range_of_ledger);
1877                assert_eq!(sync_ips, peers);
1878            }
1879        }
1880    }
1881
1882    /// Tests that height and hash values are set correctly using many different maximum block heights.
1883    #[test]
1884    fn test_latest_block_height() {
1885        for height in generate_block_heights(100_001, 5000) {
1886            let sync = sample_sync_at_height(height);
1887            // Check that the latest block height is the maximum height.
1888            assert_eq!(sync.ledger.latest_block_height(), height);
1889
1890            // Check the hash to height mapping
1891            assert_eq!(sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(0)).into()).unwrap(), 0);
1892            assert_eq!(
1893                sync.ledger.get_block_height(&(Field::<CurrentNetwork>::from_u32(height)).into()).unwrap(),
1894                height
1895            );
1896        }
1897    }
1898
1899    #[test]
1900    fn test_get_block_hash() {
1901        for height in generate_block_heights(100_001, 5000) {
1902            let sync = sample_sync_at_height(height);
1903
1904            // Check the height to hash mapping
1905            assert_eq!(sync.ledger.get_block_hash(0).unwrap(), (Field::<CurrentNetwork>::from_u32(0)).into());
1906            assert_eq!(sync.ledger.get_block_hash(height).unwrap(), (Field::<CurrentNetwork>::from_u32(height)).into());
1907        }
1908    }
1909
1910    #[test]
1911    fn test_prepare_block_requests() {
1912        for num_peers in 0..111 {
1913            println!("Testing with {num_peers} peers");
1914
1915            let sync = sample_sync_at_height(0);
1916
1917            let mut peers = indexset![];
1918
1919            for peer_id in 1..=num_peers {
1920                // Add a peer.
1921                sync.update_peer_locators(sample_peer_ip(peer_id), &sample_block_locators(10)).unwrap();
1922                // Add the peer to the set of peers.
1923                peers.insert(sample_peer_ip(peer_id));
1924            }
1925
1926            // If all peers are ahead, then requests should be prepared.
1927            check_prepare_block_requests(sync, 10, peers);
1928        }
1929    }
1930
1931    #[test]
1932    fn test_prepare_block_requests_with_leading_fork_at_11() {
1933        let sync = sample_sync_at_height(0);
1934
1935        // Intuitively, peer 1's fork is above peer 2 and peer 3's height.
1936        // So from peer 2 and peer 3's perspective, they don't even realize that peer 1 is on a fork.
1937        // Thus, you can sync up to block 10 from any of the 3 peers.
1938
1939        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 11,
1940        // then the sync pool should request blocks 1..=10 from the NUM_REDUNDANCY peers.
1941        // This is safe because the leading fork is at 11, and the sync pool is at 0,
1942        // so all candidate peers are at least 10 blocks ahead of the sync pool.
1943
1944        // Add a peer (fork).
1945        let peer_1 = sample_peer_ip(1);
1946        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 11)).unwrap();
1947
1948        // Add a peer.
1949        let peer_2 = sample_peer_ip(2);
1950        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1951
1952        // Add a peer.
1953        let peer_3 = sample_peer_ip(3);
1954        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1955
1956        // Prepare the block requests.
1957        let (requests, _) = sync.prepare_block_requests().pop().unwrap();
1958        assert_eq!(requests.len(), 10);
1959
1960        // Check the requests.
1961        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
1962            assert_eq!(height, 1 + idx as u32);
1963            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
1964            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
1965            assert_eq!(num_sync_ips, 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
1966        }
1967    }
1968
1969    #[test]
1970    fn test_prepare_block_requests_with_leading_fork_at_10() {
1971        let rng = &mut TestRng::default();
1972        let sync = sample_sync_at_height(0);
1973
1974        // Intuitively, peer 1's fork is at peer 2 and peer 3's height.
1975        // So from peer 2 and peer 3's perspective, they recognize that peer 1 has forked.
1976        // Thus, you don't have NUM_REDUNDANCY peers to sync to block 10.
1977        //
1978        // Now, while you could in theory sync up to block 9 from any of the 3 peers,
1979        // we choose not to do this as either side is likely to disconnect from us,
1980        // and we would rather wait for enough redundant peers before syncing.
1981
1982        // When there are NUM_REDUNDANCY peers ahead, and 1 peer is on a leading fork at 10,
1983        // then the sync pool should not request blocks as 1 peer conflicts with the other NUM_REDUNDANCY-1 peers.
1984        // We choose to sync with a cohort of peers that are *consistent* with each other,
1985        // and prioritize from descending heights (so the highest peer gets priority).
1986
1987        // Add a peer (fork).
1988        let peer_1 = sample_peer_ip(1);
1989        sync.update_peer_locators(peer_1, &sample_block_locators_with_fork(20, 10)).unwrap();
1990
1991        // Add a peer.
1992        let peer_2 = sample_peer_ip(2);
1993        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
1994
1995        // Add a peer.
1996        let peer_3 = sample_peer_ip(3);
1997        sync.update_peer_locators(peer_3, &sample_block_locators(10)).unwrap();
1998
1999        // Prepare the block requests.
2000        let batches = sync.prepare_block_requests();
2001        assert!(batches.is_empty());
2002
2003        // When there are NUM_REDUNDANCY+1 peers ahead, and 1 is on a fork, then there should be block requests.
2004
2005        // Add a peer.
2006        let peer_4 = sample_peer_ip(4);
2007        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
2008
2009        // Prepare the block requests.
2010        let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2011        assert_eq!(requests.len(), 10);
2012
2013        // Check the requests.
2014        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
2015            // Construct the sync IPs.
2016            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2017            assert_eq!(height, 1 + idx as u32);
2018            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
2019            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
2020            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
2021            assert_ne!(sync_ips[0], peer_1); // It should never be the forked peer.
2022        }
2023    }
2024
2025    #[test]
2026    fn test_prepare_block_requests_with_trailing_fork_at_9() {
2027        let rng = &mut TestRng::default();
2028        let sync = sample_sync_at_height(0);
2029
2030        // Peer 1 and 2 diverge from peer 3 at block 10. We only sync when there are NUM_REDUNDANCY peers
2031        // who are *consistent* with each other. So if you add a 4th peer that is consistent with peer 1 and 2,
2032        // then you should be able to sync up to block 10, thereby biasing away from peer 3.
2033
2034        // Add a peer (fork).
2035        let peer_1 = sample_peer_ip(1);
2036        sync.update_peer_locators(peer_1, &sample_block_locators(10)).unwrap();
2037
2038        // Add a peer.
2039        let peer_2 = sample_peer_ip(2);
2040        sync.update_peer_locators(peer_2, &sample_block_locators(10)).unwrap();
2041
2042        // Add a peer.
2043        let peer_3 = sample_peer_ip(3);
2044        sync.update_peer_locators(peer_3, &sample_block_locators_with_fork(20, 10)).unwrap();
2045
2046        // Prepare the block requests.
2047        let batches = sync.prepare_block_requests();
2048        assert!(batches.is_empty());
2049
2050        // When there are NUM_REDUNDANCY+1 peers ahead, and peer 3 is on a fork, then there should be block requests.
2051
2052        // Add a peer.
2053        let peer_4 = sample_peer_ip(4);
2054        sync.update_peer_locators(peer_4, &sample_block_locators(10)).unwrap();
2055
2056        // Prepare the block requests.
2057        let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2058        assert_eq!(requests.len(), 10);
2059
2060        // Check the requests.
2061        for (idx, (height, (hash, previous_hash, num_sync_ips))) in requests.into_iter().enumerate() {
2062            // Construct the sync IPs.
2063            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2064            assert_eq!(height, 1 + idx as u32);
2065            assert_eq!(hash, Some((Field::<CurrentNetwork>::from_u32(height)).into()));
2066            assert_eq!(previous_hash, Some((Field::<CurrentNetwork>::from_u32(height - 1)).into()));
2067            assert_eq!(sync_ips.len(), 1); // Only 1 needed since we have redundancy factor on this (recent locator) hash.
2068            assert_ne!(sync_ips[0], peer_3); // It should never be the forked peer.
2069        }
2070    }
2071
2072    #[test]
2073    fn test_insert_block_requests() {
2074        let rng = &mut TestRng::default();
2075        let sync = sample_sync_at_height(0);
2076
2077        // Add a peer.
2078        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
2079
2080        // Prepare the block requests.
2081        let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2082        assert_eq!(requests.len(), 10);
2083
2084        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2085            // Construct the sync IPs.
2086            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2087            // Insert the block request.
2088            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2089            // Check that the block requests were inserted.
2090            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2091            assert!(sync.get_block_request_timestamp(height).is_some());
2092        }
2093
2094        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2095            // Construct the sync IPs.
2096            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2097            // Check that the block requests are still inserted.
2098            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2099            assert!(sync.get_block_request_timestamp(height).is_some());
2100        }
2101
2102        for (height, (hash, previous_hash, num_sync_ips)) in requests {
2103            // Construct the sync IPs.
2104            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2105            // Ensure that the block requests cannot be inserted twice.
2106            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap_err();
2107            // Check that the block requests are still inserted.
2108            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2109            assert!(sync.get_block_request_timestamp(height).is_some());
2110        }
2111    }
2112
2113    #[test]
2114    fn test_insert_block_requests_fails() {
2115        let sync = sample_sync_at_height(9);
2116
2117        // Add a peer.
2118        sync.update_peer_locators(sample_peer_ip(1), &sample_block_locators(10)).unwrap();
2119
2120        // Inserting a block height that is already in the ledger should fail.
2121        sync.insert_block_request(9, (None, None, indexset![sample_peer_ip(1)])).unwrap_err();
2122        // Inserting a block height that is not in the ledger should succeed.
2123        sync.insert_block_request(10, (None, None, indexset![sample_peer_ip(1)])).unwrap();
2124    }
2125
2126    #[test]
2127    fn test_update_peer_locators() {
2128        let sync = sample_sync_at_height(0);
2129
2130        // Test 2 peers.
2131        let peer1_ip = sample_peer_ip(1);
2132        for peer1_height in 0..500u32 {
2133            sync.update_peer_locators(peer1_ip, &sample_block_locators(peer1_height)).unwrap();
2134            assert_eq!(sync.get_peer_height(&peer1_ip), Some(peer1_height));
2135
2136            let peer2_ip = sample_peer_ip(2);
2137            for peer2_height in 0..500u32 {
2138                println!("Testing peer 1 height at {peer1_height} and peer 2 height at {peer2_height}");
2139
2140                sync.update_peer_locators(peer2_ip, &sample_block_locators(peer2_height)).unwrap();
2141                assert_eq!(sync.get_peer_height(&peer2_ip), Some(peer2_height));
2142
2143                // Compute the distance between the peers.
2144                let distance = peer1_height.abs_diff(peer2_height);
2145
2146                // Check the common ancestor.
2147                if distance < NUM_RECENT_BLOCKS as u32 {
2148                    let expected_ancestor = core::cmp::min(peer1_height, peer2_height);
2149                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
2150                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
2151                } else {
2152                    let min_checkpoints =
2153                        core::cmp::min(peer1_height / CHECKPOINT_INTERVAL, peer2_height / CHECKPOINT_INTERVAL);
2154                    let expected_ancestor = min_checkpoints * CHECKPOINT_INTERVAL;
2155                    assert_eq!(sync.get_common_ancestor(peer1_ip, peer2_ip), Some(expected_ancestor));
2156                    assert_eq!(sync.get_common_ancestor(peer2_ip, peer1_ip), Some(expected_ancestor));
2157                }
2158            }
2159        }
2160    }
2161
2162    #[test]
2163    fn test_remove_peer() {
2164        let sync = sample_sync_at_height(0);
2165
2166        let peer_ip = sample_peer_ip(1);
2167        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
2168        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
2169
2170        sync.remove_peer(&peer_ip);
2171        assert_eq!(sync.get_peer_height(&peer_ip), None);
2172
2173        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
2174        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
2175
2176        sync.remove_peer(&peer_ip);
2177        assert_eq!(sync.get_peer_height(&peer_ip), None);
2178    }
2179
2180    #[test]
2181    fn test_locators_insert_remove_insert() {
2182        let sync = sample_sync_at_height(0);
2183
2184        let peer_ip = sample_peer_ip(1);
2185        sync.update_peer_locators(peer_ip, &sample_block_locators(100)).unwrap();
2186        assert_eq!(sync.get_peer_height(&peer_ip), Some(100));
2187
2188        sync.remove_peer(&peer_ip);
2189        assert_eq!(sync.get_peer_height(&peer_ip), None);
2190
2191        sync.update_peer_locators(peer_ip, &sample_block_locators(200)).unwrap();
2192        assert_eq!(sync.get_peer_height(&peer_ip), Some(200));
2193    }
2194
2195    #[test]
2196    fn test_requests_insert_remove_insert() {
2197        let rng = &mut TestRng::default();
2198        let sync = sample_sync_at_height(0);
2199
2200        // Add a peer.
2201        let peer_ip = sample_peer_ip(1);
2202        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
2203
2204        // Prepare the block requests.
2205        let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2206        assert_eq!(requests.len(), 10);
2207
2208        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2209            // Construct the sync IPs.
2210            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2211            // Insert the block request.
2212            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2213            // Check that the block requests were inserted.
2214            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2215            assert!(sync.get_block_request_timestamp(height).is_some());
2216        }
2217
2218        // Remove the peer.
2219        sync.remove_peer(&peer_ip);
2220
2221        for (height, _) in requests {
2222            // Check that the block requests were removed.
2223            assert_eq!(sync.get_block_request(height), None);
2224            assert!(sync.get_block_request_timestamp(height).is_none());
2225        }
2226
2227        // As there is no peer, it should not be possible to prepare block requests.
2228        let batches = sync.prepare_block_requests();
2229        assert!(batches.is_empty());
2230
2231        // Add the peer again.
2232        sync.update_peer_locators(peer_ip, &sample_block_locators(10)).unwrap();
2233
2234        // Prepare the block requests.
2235        let (requests, _) = sync.prepare_block_requests().pop().unwrap();
2236        assert_eq!(requests.len(), 10);
2237
2238        for (height, (hash, previous_hash, num_sync_ips)) in requests {
2239            // Construct the sync IPs.
2240            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2241            // Insert the block request.
2242            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2243            // Check that the block requests were inserted.
2244            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2245            assert!(sync.get_block_request_timestamp(height).is_some());
2246        }
2247    }
2248
2249    #[test]
2250    fn test_obsolete_block_requests() {
2251        let rng = &mut TestRng::default();
2252        let sync = sample_sync_at_height(0);
2253
2254        // Ensure the locator always includes at least one block,
2255        // that is not the genesis block.
2256        // Otherwise there are no block requests to construct.
2257        let locator_height = rng.random_range(1..50);
2258
2259        // Add a peer.
2260        let locators = sample_block_locators(locator_height);
2261        sync.update_peer_locators(sample_peer_ip(1), &locators).unwrap();
2262
2263        // Construct block requests
2264        let (requests, sync_peers) = sync.prepare_block_requests().pop().unwrap();
2265        assert_eq!(requests.len(), locator_height as usize);
2266
2267        // Add the block requests to the sync module.
2268        for (height, (hash, previous_hash, num_sync_ips)) in requests.clone() {
2269            // Construct the sync IPs.
2270            let sync_ips: IndexSet<_> = sync_peers.keys().sample(rng, num_sync_ips).into_iter().copied().collect();
2271            // Insert the block request.
2272            sync.insert_block_request(height, (hash, previous_hash, sync_ips.clone())).unwrap();
2273            // Check that the block requests were inserted.
2274            assert_eq!(sync.get_block_request(height), Some((hash, previous_hash, sync_ips)));
2275            assert!(sync.get_block_request_timestamp(height).is_some());
2276        }
2277
2278        // Duplicate a new sync module with a different height to simulate block advancement.
2279        // This range needs to be inclusive, so that the range is never empty,
2280        // even with a locator height of 0.
2281        let ledger_height = rng.random_range(0..=locator_height);
2282        let new_sync = duplicate_sync_at_new_height(&sync, ledger_height);
2283
2284        // Check that the number of requests is the same.
2285        assert_eq!(new_sync.requests.read().len(), requests.len());
2286
2287        // Remove timed out block requests.
2288        new_sync.handle_block_request_timeouts();
2289
2290        // Check that the number of requests is reduced based on the ledger height.
2291        assert_eq!(new_sync.requests.read().len(), (locator_height - ledger_height) as usize);
2292    }
2293
2294    #[test]
2295    fn test_timed_out_block_request() {
2296        let sync = sample_sync_at_height(0);
2297        let peer_ip = sample_peer_ip(1);
2298        let locators = sample_block_locators(10);
2299        let block_hash = locators.get_hash(1);
2300
2301        sync.update_peer_locators(peer_ip, &locators).unwrap();
2302
2303        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2304
2305        // Add a timed-out request
2306        sync.requests.write().insert(1, OutstandingRequest {
2307            request: (block_hash, None, [peer_ip].into()),
2308            timestamp,
2309            response: None,
2310        });
2311
2312        assert_eq!(sync.requests.read().len(), 1);
2313        assert_eq!(sync.locators.read().len(), 1);
2314
2315        // Remove timed out block requests.
2316        sync.handle_block_request_timeouts();
2317
2318        // let ban_list = c.peers_to_ban.write();
2319        // assert_eq!(ban_list.len(), 1);
2320        // assert_eq!(ban_list.iter().next(), Some(&peer_ip));
2321
2322        assert!(sync.requests.read().is_empty());
2323        assert!(sync.locators.read().is_empty());
2324    }
2325
2326    #[test]
2327    fn test_reissue_timed_out_block_request() {
2328        let sync = sample_sync_at_height(0);
2329        let peer_ip1 = sample_peer_ip(1);
2330        let peer_ip2 = sample_peer_ip(2);
2331        let peer_ip3 = sample_peer_ip(3);
2332
2333        let locators = sample_block_locators(10);
2334        let block_hash1 = locators.get_hash(1);
2335        let block_hash2 = locators.get_hash(2);
2336
2337        sync.update_peer_locators(peer_ip1, &locators).unwrap();
2338        sync.update_peer_locators(peer_ip2, &locators).unwrap();
2339        sync.update_peer_locators(peer_ip3, &locators).unwrap();
2340
2341        assert_eq!(sync.locators.read().len(), 3);
2342
2343        let timestamp = Instant::now() - BLOCK_REQUEST_TIMEOUT - Duration::from_secs(1);
2344
2345        // Add a timed-out request
2346        sync.requests.write().insert(1, OutstandingRequest {
2347            request: (block_hash1, None, [peer_ip1].into()),
2348            timestamp,
2349            response: None,
2350        });
2351
2352        // Add a timed-out request
2353        sync.requests.write().insert(2, OutstandingRequest {
2354            request: (block_hash2, None, [peer_ip2].into()),
2355            timestamp: Instant::now(),
2356            response: None,
2357        });
2358
2359        assert_eq!(sync.requests.read().len(), 2);
2360
2361        // Remove timed out block requests.
2362        sync.handle_block_request_timeouts();
2363
2364        // let ban_list = c.peers_to_ban.write();
2365        // assert_eq!(ban_list.len(), 1);
2366        // assert_eq!(ban_list.iter().next(), Some(&peer_ip1));
2367
2368        assert_eq!(sync.requests.read().len(), 1);
2369        assert_eq!(sync.locators.read().len(), 2);
2370
2371        let failed_requests = sync.failed_requests.lock();
2372        assert_eq!(failed_requests.len(), 1);
2373
2374        let (height, (hash, _)) = failed_requests.iter().next().unwrap();
2375        assert_eq!(*height, 1);
2376        assert_eq!(*hash, block_hash1);
2377        /*
2378        assert_eq!(new_sync_ips.len(), 2);
2379
2380        // Make sure the removed peer is not in the sync_peer set.
2381        let mut iter = new_sync_ips.iter();
2382        assert_ne!(iter.next().unwrap().0, &peer_ip1);
2383        assert_ne!(iter.next().unwrap().0, &peer_ip1);*/
2384    }
2385}