// Copyright (C) 2019-2022 Aleo Systems Inc.
// This file is part of the snarkOS library.
// The snarkOS library is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// The snarkOS library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.
use crate::{
helpers::{block_requests::*, BlockRequest, CircularMap, NodeType, State},
Data,
DisconnectReason,
Environment,
LedgerReader,
Message,
PeersRequest,
PeersRouter,
ProverRequest,
ProverRouter,
};
use snarkos_storage::{storage::Storage, BlockLocators, LedgerState, MAXIMUM_LINEAR_BLOCK_LOCATORS};
use snarkvm::dpc::prelude::*;
use anyhow::Result;
use std::{
collections::HashMap,
net::SocketAddr,
path::Path,
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use time::OffsetDateTime;
use tokio::{
sync::{mpsc, oneshot, Mutex, RwLock},
task,
};
/// The maximum number of unconfirmed blocks that can be held by the ledger.
const MAXIMUM_UNCONFIRMED_BLOCKS: u32 = 250;
/// Shorthand for the parent half of the `Ledger` message channel.
pub(crate) type LedgerRouter<N> = mpsc::Sender<LedgerRequest<N>>;
#[allow(unused)]
/// Shorthand for the child half of the `Ledger` message channel.
type LedgerHandler<N> = mpsc::Receiver<LedgerRequest<N>>;
///
/// An enum of requests that the `Ledger` struct processes.
///
#[derive(Debug)]
pub enum LedgerRequest<N: Network> {
/// BlockResponse := (peer_ip, block, prover_router)
BlockResponse(SocketAddr, Block<N>, ProverRouter<N>),
/// Disconnect := (peer_ip, reason)
Disconnect(SocketAddr, DisconnectReason),
/// Failure := (peer_ip, failure)
Failure(SocketAddr, String),
/// Heartbeat := (prover_router)
Heartbeat(ProverRouter<N>),
/// Pong := (peer_ip, node_type, status, is_fork, block_locators)
Pong(SocketAddr, NodeType, State, Option<bool>, BlockLocators<N>),
/// UnconfirmedBlock := (peer_ip, block, prover_router)
UnconfirmedBlock(SocketAddr, Block<N>, ProverRouter<N>),
}
pub type PeersState<N> = HashMap<SocketAddr, Option<(NodeType, State, Option<bool>, u32, BlockLocators<N>)>>;
///
/// A ledger for a specific network on the node server.
///
#[derive(Debug)]
#[allow(clippy::type_complexity)]
pub struct Ledger<N: Network, E: Environment> {
/// The ledger router of the node.
ledger_router: LedgerRouter<N>,
/// The canonical chain of blocks.
canon: Arc<LedgerState<N>>,
/// The canonical chain of blocks in read-only mode.
#[allow(unused)]
canon_reader: Arc<LedgerState<N>>,
/// A lock to ensure methods that need to be mutually-exclusive are enforced.
/// In this context, `add_block`, and `revert_to_block_height` must be mutually-exclusive.
canon_lock: Arc<Mutex<()>>,
/// A map of previous block hashes to unconfirmed blocks.
unconfirmed_blocks: RwLock<CircularMap<N::BlockHash, Block<N>, { MAXIMUM_UNCONFIRMED_BLOCKS }>>,
/// The map of each peer to their ledger state := (node_type, status, is_fork, latest_block_height, block_locators).
peers_state: RwLock<PeersState<N>>,
/// The map of each peer to their block requests := HashMap<(block_height, block_hash), timestamp>
block_requests: RwLock<HashMap<SocketAddr, HashMap<BlockRequest<N>, i64>>>,
/// A lock to ensure methods that need to be mutually-exclusive are enforced.
/// In this context, `update_ledger`, `add_block`, and `update_block_requests` must be mutually-exclusive.
block_requests_lock: Arc<Mutex<()>>,
/// The timestamp of the last successful block update.
last_block_update_timestamp: RwLock<Instant>,
/// The map of each peer to their failure messages := (failure_message, timestamp).
failures: RwLock<HashMap<SocketAddr, Vec<(String, i64)>>>,
/// The peers router of the node.
peers_router: PeersRouter<N, E>,
}
impl<N: Network, E: Environment> Ledger<N, E> {
/// Initializes a new instance of the ledger.
pub async fn open<S: Storage, P: AsRef<Path> + Copy>(path: P, peers_router: PeersRouter<N, E>) -> Result<Arc<Self>> {
// Initialize an mpsc channel for sending requests to the `Ledger` struct.
let (ledger_router, mut ledger_handler) = mpsc::channel(1024);
// Initialize the ledger.
let ledger = Arc::new(Self {
ledger_router,
canon: Arc::new(LedgerState::open_writer::<S, P>(path)?),
canon_reader: LedgerState::open_reader::<S, P>(path)?,
canon_lock: Arc::new(Mutex::new(())),
unconfirmed_blocks: Default::default(),
peers_state: Default::default(),
block_requests: Default::default(),
block_requests_lock: Arc::new(Mutex::new(())),
last_block_update_timestamp: RwLock::new(Instant::now()),
failures: Default::default(),
peers_router,
});
// Initialize the handler for the ledger.
{
let ledger = ledger.clone();
let (router, handler) = oneshot::channel();
E::tasks().append(task::spawn(async move {
// Notify the outer function that the task is ready.
let _ = router.send(());
// Asynchronously wait for a ledger request.
while let Some(request) = ledger_handler.recv().await {
// Hold the ledger write lock briefly, to update the state of the ledger.
// Note: Do not wrap this call in a `task::spawn` as `BlockResponse` messages
// will end up being processed out of order.
ledger.update(request).await;
}
}));
// Wait until the ledger handler is ready.
let _ = handler.await;
}
Ok(ledger)
}
/// Returns an instance of the ledger reader.
pub fn reader(&self) -> LedgerReader<N> {
// TODO (howardwu): Switch this from `canon` to `canon_reader`.
// RocksDB at v6.22 has a rollback error with its sequence numbers.
// Currently, v6.25 has this issue patched, however rust-rocksdb has not released it.
self.canon.clone()
// self.canon_reader.clone()
}
/// Returns an instance of the ledger router.
pub fn router(&self) -> LedgerRouter<N> {
self.ledger_router.clone()
}
pub(super) async fn shut_down(&self) -> (Arc<Mutex<()>>, Arc<Mutex<()>>, Arc<parking_lot::RwLock<()>>) {
debug!("Ledger is shutting down...");
// Set the terminator bit to `true` to ensure it stops mining.
E::terminator().store(true, Ordering::SeqCst);
trace!("[ShuttingDown] Terminator bit has been enabled");
// Clear the unconfirmed blocks.
self.unconfirmed_blocks.write().await.clear();
trace!("[ShuttingDown] Pending queue has been cleared");
// Disconnect all connected peers.
let connected_peers = self.peers_state.read().await.keys().copied().collect::<Vec<_>>();
for peer_ip in connected_peers {
self.disconnect(peer_ip, DisconnectReason::ShuttingDown).await;
}
trace!("[ShuttingDown] Disconnect message has been sent to all connected peers");
// Return the lock for the canon chain and block requests.
let canon_lock = self.canon_lock.clone();
let block_requests_lock = self.block_requests_lock.clone();
let storage_map_lock = self.canon.shut_down();
trace!("[ShuttingDown] Block requests lock has been cloned");
(canon_lock, block_requests_lock, storage_map_lock)
}
///
/// Performs the given `request` to the ledger.
/// All requests must go through this `update`, so that a unified view is preserved.
///
pub(super) async fn update(&self, request: LedgerRequest<N>) {
match request {
LedgerRequest::BlockResponse(peer_ip, block, prover_router) => {
// Remove the block request from the ledger.
if self.remove_block_request(peer_ip, block.height()).await {
// On success, process the block response.
self.add_block(block, &prover_router).await;
// Check if syncing with this peer is complete.
if self
.block_requests
.read()
.await
.get(&peer_ip)
.map(|requests| requests.is_empty())
.unwrap_or(false)
{
trace!("All block requests with {} have been processed", peer_ip);
self.update_block_requests().await;
}
}
}
LedgerRequest::Disconnect(peer_ip, reason) => {
self.disconnect(peer_ip, reason).await;
}
LedgerRequest::Failure(peer_ip, failure) => {
self.add_failure(peer_ip, failure).await;
}
LedgerRequest::Heartbeat(prover_router) => {
// Update for sync nodes.
self.update_sync_nodes().await;
// Update the ledger.
self.update_ledger(&prover_router).await;
// Update the status of the ledger.
self.update_status().await;
// Remove expired block requests.
self.remove_expired_block_requests().await;
// Remove expired failures.
self.remove_expired_failures().await;
// Disconnect from peers with frequent failures.
self.disconnect_from_failing_peers().await;
// Update the block requests.
self.update_block_requests().await;
let block_requests = self.number_of_block_requests().await;
let connected_peers = self.peers_state.read().await.len();
debug!(
"Status Report (type = {}, status = {}, block_height = {}, cumulative_weight = {}, block_requests = {}, connected_peers = {})",
E::NODE_TYPE,
E::status(),
self.canon.latest_block_height(),
self.canon.latest_cumulative_weight(),
block_requests,
connected_peers,
);
}
LedgerRequest::Pong(peer_ip, node_type, status, is_fork, block_locators) => {
// Ensure the peer has been initialized in the ledger.
self.initialize_peer(peer_ip).await;
// Process the pong.
self.update_peer(peer_ip, node_type, status, is_fork, block_locators).await;
}
LedgerRequest::UnconfirmedBlock(peer_ip, block, prover_router) => {
// Ensure the node is not peering.
if !E::status().is_peering() {
// Process the unconfirmed block.
self.add_block(block.clone(), &prover_router).await;
// Propagate the unconfirmed block to the connected peers.
let message = Message::UnconfirmedBlock(block.height(), block.hash(), Data::Object(block));
let request = PeersRequest::MessagePropagate(peer_ip, message);
if let Err(error) = self.peers_router.send(request).await {
warn!("[UnconfirmedBlock] {}", error);
}
}
}
}
}
///
/// Disconnects the given peer from the ledger.
///
async fn disconnect(&self, peer_ip: SocketAddr, reason: DisconnectReason) {
info!("Disconnecting from {} ({:?})", peer_ip, reason);
// Remove all entries of the peer from the ledger.
self.remove_peer(&peer_ip).await;
// Update the status of the ledger.
self.update_status().await;
// Send a `Disconnect` message to the peer.
if let Err(error) = self
.peers_router
.send(PeersRequest::MessageSend(peer_ip, Message::Disconnect(reason)))
.await
{
warn!("[Disconnect] {}", error);
}
// Route a `PeerDisconnected` to the peers.
if let Err(error) = self.peers_router.send(PeersRequest::PeerDisconnected(peer_ip)).await {
warn!("[PeerDisconnected] {}", error);
}
}
///
/// Disconnects and restricts the given peer from the ledger.
///
async fn disconnect_and_restrict(&self, peer_ip: SocketAddr, reason: DisconnectReason) {
info!("Disconnecting and restricting {} ({:?})", peer_ip, reason);
// Remove all entries of the peer from the ledger.
self.remove_peer(&peer_ip).await;
// Update the status of the ledger.
self.update_status().await;
// Send a `Disconnect` message to the peer.
if let Err(error) = self
.peers_router
.send(PeersRequest::MessageSend(peer_ip, Message::Disconnect(reason)))
.await
{
warn!("[Disconnect] {}", error);
}
// Route a `PeerRestricted` to the peers.
if let Err(error) = self.peers_router.send(PeersRequest::PeerRestricted(peer_ip)).await {
warn!("[PeerRestricted] {}", error);
}
}
///
/// Performs a heartbeat update for the sync nodes.
///
async fn update_sync_nodes(&self) {
if E::NODE_TYPE == NodeType::Sync {
// Lock peers_state for further processing.
let peers_state = self.peers_state.read().await;
// Retrieve the latest cumulative weight of this ledger.
let latest_cumulative_weight = self.canon.latest_cumulative_weight();
// Initialize a list of peers to disconnect from.
let mut peer_ips_to_disconnect = Vec::with_capacity(peers_state.len());
// Check if any of the peers are ahead and have a larger block height.
for (peer_ip, peer_state) in peers_state.iter() {
if let Some((node_type, status, Some(_), block_height, block_locators)) = peer_state {
// Retrieve the cumulative weight, defaulting to the block height if it does not exist.
let cumulative_weight = match block_locators.get_cumulative_weight(*block_height) {
Some(cumulative_weight) => cumulative_weight,
None => *block_height as u128,
};
// If the peer is not a sync node and is syncing, and the peer is ahead, proceed to disconnect.
if *node_type != NodeType::Sync && *status == State::Syncing && cumulative_weight > latest_cumulative_weight {
// Append the peer to the list of disconnects.
peer_ips_to_disconnect.push(*peer_ip);
}
}
}
// Release the lock over peers_state.
drop(peers_state);
trace!("Found {} peers to disconnect", peer_ips_to_disconnect.len());
// Proceed to disconnect and restrict these peers.
for peer_ip in peer_ips_to_disconnect {
self.disconnect_and_restrict(peer_ip, DisconnectReason::SyncComplete).await;
}
}
}
///
/// Attempt to fast-forward the ledger with unconfirmed blocks.
///
async fn update_ledger(&self, prover_router: &ProverRouter<N>) {
// Check for candidate blocks to fast forward the ledger.
let mut block_hash = self.canon.latest_block_hash();
let unconfirmed_blocks_snapshot = self.unconfirmed_blocks.read().await.clone();
while let Some(unconfirmed_block) = unconfirmed_blocks_snapshot.get(&block_hash) {
// Attempt to add the unconfirmed block.
match self.add_block(unconfirmed_block.clone(), prover_router).await {
// Upon success, update the block hash iterator.
true => block_hash = unconfirmed_block.hash(),
false => break,
}
}
// If the timestamp of the last block increment has surpassed the preset limit,
// the ledger is likely syncing from invalid state, and should revert by one block.
if E::status().is_syncing()
&& self.last_block_update_timestamp.read().await.elapsed() > 2 * Duration::from_secs(E::RADIO_SILENCE_IN_SECS)
{
// Acquire the lock for block requests.
let _block_request_lock = self.block_requests_lock.lock().await;
trace!("Ledger state has become stale, clearing queue and reverting by one block");
self.unconfirmed_blocks.write().await.clear();
// Reset the memory pool of its transactions.
if let Err(error) = prover_router.send(ProverRequest::MemoryPoolClear(None)).await {
error!("[MemoryPoolClear]: {}", error);
}
self.block_requests
.write()
.await
.values_mut()
.for_each(|requests| *requests = Default::default());
self.revert_to_block_height(self.canon.latest_block_height().saturating_sub(1))
.await;
}
}
///
/// Updates the status of the ledger.
///
async fn update_status(&self) {
// Retrieve the status variable.
let mut status = E::status().get();
// If the node is shutting down, skip the update.
if status == State::ShuttingDown {
trace!("Ledger is shutting down");
// Set the terminator bit to `true` to ensure it stops mining.
E::terminator().store(true, Ordering::SeqCst);
return;
}
// If there is an insufficient number of connected peers, set the status to `Peering`.
else if self.peers_state.read().await.len() < E::MINIMUM_NUMBER_OF_PEERS {
status = State::Peering;
}
// If the ledger is out of date, set the status to `Syncing`.
else {
// Update the status to `Ready` or `Mining`.
status = match status {
State::Mining => State::Mining,
_ => State::Ready,
};
// Retrieve the latest cumulative weight of this node.
let latest_cumulative_weight = self.canon.latest_cumulative_weight();
// Iterate through the connected peers, to determine if the ledger state is out of date.
for (_, peer_state) in self.peers_state.read().await.iter() {
if let Some((_, _, Some(_), block_height, block_locators)) = peer_state {
// Retrieve the cumulative weight, defaulting to the block height if it does not exist.
let cumulative_weight = match block_locators.get_cumulative_weight(*block_height) {
Some(cumulative_weight) => cumulative_weight,
None => *block_height as u128,
};
// If the cumulative weight is greater than MAXIMUM_LINEAR_BLOCK_LOCATORS, set the status to `Syncing`.
if cumulative_weight.saturating_sub(latest_cumulative_weight) > MAXIMUM_LINEAR_BLOCK_LOCATORS as u128 {
// Set the status to `Syncing`.
status = State::Syncing;
break;
}
}
}
}
// If the node is `Peering` or `Syncing`, it should not be mining.
if status == State::Peering || status == State::Syncing {
// Set the terminator bit to `true` to ensure it does not mine.
E::terminator().store(true, Ordering::SeqCst);
} else {
// Set the terminator bit to `false` to ensure it is allowed to mine.
E::terminator().store(false, Ordering::SeqCst);
}
// Update the ledger to the determined status.
E::status().update(status);
}
///
/// Adds the given block:
/// 1) as the next block in the ledger if the block height increments by one, or
/// 2) to the pending queue for later use.
///
/// Returns `true` if the given block is successfully added to the *canon* chain.
///
async fn add_block(&self, unconfirmed_block: Block<N>, prover_router: &ProverRouter<N>) -> bool {
// Retrieve the unconfirmed block height.
let unconfirmed_block_height = unconfirmed_block.height();
// Retrieve the unconfirmed block hash.
let unconfirmed_block_hash = unconfirmed_block.hash();
// Retrieve the unconfirmed previous block hash.
let unconfirmed_previous_block_hash = unconfirmed_block.previous_block_hash();
// Ensure the given block is new.
if let Ok(true) = self.canon.contains_block_hash(&unconfirmed_block_hash) {
trace!(
"Canonical chain already contains block {} ({})",
unconfirmed_block_height,
unconfirmed_block_hash
);
} else if unconfirmed_block_height == self.canon.latest_block_height() + 1
&& unconfirmed_previous_block_hash == self.canon.latest_block_hash()
{
// Acquire the lock for block requests.
let _block_requests_lock = self.block_requests_lock.lock().await;
// Acquire the lock for the canon chain.
let _canon_lock = self.canon_lock.lock().await;
// Ensure the block height is not part of a block request on a fork.
let mut is_block_on_fork = false;
'outer: for requests in self.block_requests.read().await.values() {
for request in requests.keys() {
// If the unconfirmed block conflicts with a requested block on a fork, skip.
if request.block_height() == unconfirmed_block_height {
if let Some(requested_block_hash) = request.block_hash() {
if unconfirmed_block.hash() != requested_block_hash {
is_block_on_fork = true;
break 'outer;
}
}
}
}
}
// If the unconfirmed block is not on a fork, attempt to add it as the next block.
match is_block_on_fork {
// Filter out the undesirable unconfirmed blocks, if it exists.
true => self.unconfirmed_blocks.write().await.remove(&unconfirmed_previous_block_hash),
// Attempt to add the unconfirmed block as the next block in the canonical chain.
false => match self.canon.add_next_block(&unconfirmed_block) {
Ok(()) => {
info!(
"Ledger successfully advanced to block {} ({})",
self.canon.latest_block_height(),
self.canon.latest_block_hash()
);
// Update the timestamp of the last block increment.
*self.last_block_update_timestamp.write().await = Instant::now();
// Set the terminator bit to `true` to ensure the miner updates state.
E::terminator().store(true, Ordering::SeqCst);
// On success, filter the unconfirmed blocks of this block, if it exists.
self.unconfirmed_blocks.write().await.remove(&unconfirmed_previous_block_hash);
// On success, filter the memory pool of its transactions, if they exist.
if let Err(error) = prover_router.send(ProverRequest::MemoryPoolClear(Some(unconfirmed_block))).await {
error!("[MemoryPoolClear]: {}", error);
}
return true;
}
Err(error) => warn!("{}", error),
},
}
} else {
// Add the block to the unconfirmed blocks.
if self
.unconfirmed_blocks
.write()
.await
.insert(unconfirmed_previous_block_hash, unconfirmed_block)
{
trace!("Added unconfirmed block {} to the pending queue", unconfirmed_block_height);
} else {
trace!(
"Pending queue already contains unconfirmed block {} ({})",
unconfirmed_block_height,
unconfirmed_block_hash
);
}
}
false
}
///
/// Reverts the ledger state back to height `block_height`, returning `true` on success.
///
async fn revert_to_block_height(&self, block_height: u32) -> bool {
// Acquire the lock for the canon chain.
let _canon_lock = self.canon_lock.lock().await;
match self.canon.revert_to_block_height(block_height) {
Ok(removed_blocks) => {
info!("Ledger successfully reverted to block {}", self.canon.latest_block_height());
// Update the last block update timestamp.
*self.last_block_update_timestamp.write().await = Instant::now();
// Set the terminator bit to `true` to ensure the miner resets state.
E::terminator().store(true, Ordering::SeqCst);
// Lock unconfirmed_blocks for further processing.
let mut unconfirmed_blocks = self.unconfirmed_blocks.write().await;
// Ensure the removed blocks are not in the unconfirmed blocks.
for removed_block in removed_blocks {
unconfirmed_blocks.remove(&removed_block.previous_block_hash());
}
true
}
Err(error) => {
warn!("{}", error);
// Set the terminator bit to `true` to ensure the miner resets state.
E::terminator().store(true, Ordering::SeqCst);
// Reset the unconfirmed blocks.
self.unconfirmed_blocks.write().await.clear();
false
}
}
}
///
/// Adds an entry for the given peer IP to every data structure in `State`.
///
async fn initialize_peer(&self, peer_ip: SocketAddr) {
// Since the peer state already existing is the most probable scenario,
// use a read() first to avoid using write() if possible.
let peer_state_exists = self.peers_state.read().await.contains_key(&peer_ip);
if !peer_state_exists {
self.peers_state.write().await.entry(peer_ip).or_insert(None);
self.block_requests.write().await.entry(peer_ip).or_insert_with(Default::default);
self.failures.write().await.entry(peer_ip).or_insert_with(Default::default);
}
}
///
/// Removes the entry for the given peer IP from every data structure in `State`.
///
async fn remove_peer(&self, peer_ip: &SocketAddr) {
self.peers_state.write().await.remove(peer_ip);
self.block_requests.write().await.remove(peer_ip);
self.failures.write().await.remove(peer_ip);
}
///
/// Updates the state of the given peer.
///
async fn update_peer(
&self,
peer_ip: SocketAddr,
node_type: NodeType,
status: State,
is_fork: Option<bool>,
block_locators: BlockLocators<N>,
) {
// Ensure the list of block locators is not empty.
if block_locators.is_empty() {
self.add_failure(peer_ip, "Received a sync response with no block locators".to_string())
.await;
} else {
// Ensure the peer provided well-formed block locators.
match self.canon.check_block_locators(&block_locators) {
Ok(is_valid) => {
if !is_valid {
warn!("Invalid block locators from {}", peer_ip);
self.add_failure(peer_ip, "Invalid block locators".to_string()).await;
return;
}
}
Err(error) => warn!("Error checking block locators: {}", error),
};
// Determine the common ancestor block height between this ledger and the peer.
let mut common_ancestor = 0;
// Determine the latest block height of the peer.
let mut latest_block_height_of_peer = 0;
// Verify the integrity of the block hashes sent by the peer.
for (block_height, (block_hash, _)) in block_locators.iter() {
// Ensure the block hash corresponds with the block height, if the block hash exists in this ledger.
if let Ok(expected_block_height) = self.canon.get_block_height(block_hash) {
if expected_block_height != *block_height {
let error = format!("Invalid block height {} for block hash {}", expected_block_height, block_hash);
trace!("{}", error);
self.add_failure(peer_ip, error).await;
return;
} else {
// Update the common ancestor, as this block hash exists in this ledger.
if expected_block_height > common_ancestor {
common_ancestor = expected_block_height
}
}
}
// Update the latest block height of the peer.
if *block_height > latest_block_height_of_peer {
latest_block_height_of_peer = *block_height;
}
}
// If the given fork status is None, check if it can be updated.
let is_fork = match is_fork {
Some(is_fork) => Some(is_fork),
None => match common_ancestor == latest_block_height_of_peer || common_ancestor == self.canon.latest_block_height() {
// If the common ancestor matches the latest block height of (the peer || this node),
// the peer (is clearly || is likely) on the same canonical chain as this node.
true => Some(false),
false => None,
},
};
let fork_status = match is_fork {
Some(boolean) => format!("{}", boolean),
None => "undecided".to_string(),
};
let cumulative_weight = match block_locators.get_cumulative_weight(latest_block_height_of_peer) {
Some(weight) => format!("{}", weight),
_ => "unknown".to_string(),
};
debug!(
"Peer {} is at block {} (type = {}, status = {}, is_fork = {}, cumulative_weight = {}, common_ancestor = {})",
peer_ip, latest_block_height_of_peer, node_type, status, fork_status, cumulative_weight, common_ancestor,
);
match self.peers_state.write().await.get_mut(&peer_ip) {
Some(peer_state) => *peer_state = Some((node_type, status, is_fork, latest_block_height_of_peer, block_locators)),
None => self.add_failure(peer_ip, format!("Missing ledger state for {}", peer_ip)).await,
};
}
}
///
/// Proceeds to send block requests to a connected peer, if the ledger is out of date.
///
async fn update_block_requests(&self) {
// Ensure the ledger is not awaiting responses from outstanding block requests.
if self.number_of_block_requests().await > 0 {
return;
}
// Retrieve the latest block height and cumulative weight of this ledger.
let latest_block_height = self.canon.latest_block_height();
let latest_cumulative_weight = self.canon.latest_cumulative_weight();
// Iterate through the peers to check if this node needs to catch up, and determine a peer to sync with.
// Prioritize the sync nodes before regular peers.
let mut maximum_block_height = latest_block_height;
let mut maximum_cumulative_weight = latest_cumulative_weight;
// Check if any of the peers are ahead and have a larger block height.
if let Some((peer_ip, maximal_peer_is_on_fork, maximum_block_locators)) = find_maximal_peer::<N, E>(
&*self.peers_state.read().await,
&mut maximum_block_height,
&mut maximum_cumulative_weight,
) {
// Case 1 - Ensure the peer has a heavier canonical chain than this ledger.
// Note: this check is duplicated in `handle_block_requests`, as it is fast
// and allows us to skip acquiring `_block_requests_lock`.
if latest_cumulative_weight >= maximum_cumulative_weight {
return;
}
// Acquire the lock for block requests.
let _block_requests_lock = self.block_requests_lock.lock().await;
// Determine the common ancestor block height between this ledger and the peer
// and the first locator (smallest height) that does not exist in this ledger.
let (maximum_common_ancestor, first_deviating_locator) = match find_common_ancestor(&self.canon, &maximum_block_locators) {
Ok(ret) => ret,
Err(error) => {
trace!("{}", error);
self.add_failure(peer_ip, error).await;
return;
}
};
// Case 2 - Prepare to send block requests, as the peer is ahead of this ledger.
let (start_block_height, end_block_height, ledger_is_on_fork) = match handle_block_requests::<N, E>(
latest_block_height,
latest_cumulative_weight,
peer_ip,
Some(maximal_peer_is_on_fork),
maximum_block_height,
maximum_cumulative_weight,
maximum_common_ancestor,
first_deviating_locator,
) {
// Abort from the block request update.
BlockRequestHandler::Abort(_) => return,
// Disconnect from the peer if it is misbehaving and proceed to abort.
BlockRequestHandler::AbortAndDisconnect(_, reason) => {
drop(_block_requests_lock);
self.disconnect(peer_ip, reason).await;
return;
}
// Proceed to send block requests to a connected peer, if the ledger is out of date.
BlockRequestHandler::Proceed(_, proceed) => {
(proceed.start_block_height, proceed.end_block_height, proceed.ledger_is_on_fork)
}
};
// Revert the ledger, if it is on a fork.
if ledger_is_on_fork {
// If the revert operation fails, abort.
if !self.revert_to_block_height(maximum_common_ancestor).await {
warn!("Ledger failed to revert to block {}", maximum_common_ancestor);
return;
}
}
// Send a `BlockRequest` message to the peer.
debug!("Requesting blocks {} to {} from {}", start_block_height, end_block_height, peer_ip);
let request = PeersRequest::MessageSend(peer_ip, Message::BlockRequest(start_block_height, end_block_height));
if let Err(error) = self.peers_router.send(request).await {
warn!("[BlockRequest] {}", error);
return;
}
// Filter out any pre-existing block requests for the peer.
let mut missing_block_requests = false;
let mut new_block_heights = Vec::new();
if let Some(block_requests) = self.block_requests.read().await.get(&peer_ip) {
for block_height in start_block_height..=end_block_height {
if !block_requests.contains_key(&block_height.into()) {
new_block_heights.push(block_height);
}
}
} else {
self.add_failure(peer_ip, format!("Missing block requests for {}", peer_ip)).await;
missing_block_requests = true;
}
if !missing_block_requests && !new_block_heights.is_empty() {
// Log each block request to ensure the peer responds with all requested blocks.
if let Some(locked_block_requests) = self.block_requests.write().await.get_mut(&peer_ip) {
for block_height in new_block_heights {
// If the ledger is on a fork and was reverted, include the expected new block hash for the fork.
match ledger_is_on_fork {
true => {
self.add_block_request(
peer_ip,
block_height,
maximum_block_locators.get_block_hash(block_height),
locked_block_requests,
)
.await
}
false => self.add_block_request(peer_ip, block_height, None, locked_block_requests).await,
};
}
}
}
}
}
///
/// Returns the number of outstanding block requests.
///
async fn number_of_block_requests(&self) -> usize {
self.block_requests.read().await.values().map(|r| r.len()).sum()
}
///
/// Adds a block request for the given block height to the specified peer.
///
async fn add_block_request(
&self,
peer_ip: SocketAddr,
block_height: u32,
block_hash: Option<N::BlockHash>,
locked_block_requests: &mut HashMap<BlockRequest<N>, i64>,
) {
match locked_block_requests.insert((block_height, block_hash).into(), OffsetDateTime::now_utc().unix_timestamp()) {
None => debug!("Requesting block {} from {}", block_height, peer_ip),
Some(_old_request) => self.add_failure(peer_ip, format!("Duplicate block request for {}", peer_ip)).await,
}
}
///
/// Returns `true` if the block request for the given block height to the specified peer exists.
///
async fn contains_block_request(&self, peer_ip: SocketAddr, block_height: u32) -> bool {
match self.block_requests.read().await.get(&peer_ip) {
Some(requests) => requests.contains_key(&block_height.into()),
None => false,
}
}
///
/// Removes a block request for the given block height to the specified peer.
/// On success, returns `true`, otherwise returns `false`.
///
async fn remove_block_request(&self, peer_ip: SocketAddr, block_height: u32) -> bool {
// Ensure the block height corresponds to a requested block.
if !self.contains_block_request(peer_ip, block_height).await {
self.add_failure(peer_ip, "Received an invalid block response".to_string()).await;
false
} else {
if let Some(requests) = self.block_requests.write().await.get_mut(&peer_ip) {
let is_success = requests.remove(&block_height.into()).is_some();
match is_success {
true => return true,
false => {
self.add_failure(peer_ip, format!("Non-existent block request from {}", peer_ip))
.await
}
}
}
false
}
}
///
/// Removes block requests that have expired.
///
async fn remove_expired_block_requests(&self) {
// Clear all block requests that have lived longer than `E::RADIO_SILENCE_IN_SECS`.
let now = OffsetDateTime::now_utc().unix_timestamp();
self.block_requests.write().await.iter_mut().for_each(|(_peer, block_requests)| {
block_requests.retain(|_, time_of_request| now.saturating_sub(*time_of_request) < E::RADIO_SILENCE_IN_SECS as i64)
});
}
///
/// Adds the given failure message to the specified peer IP.
///
async fn add_failure(&self, peer_ip: SocketAddr, failure: String) {
trace!("Adding failure for {}: {}", peer_ip, failure);
match self.failures.write().await.get_mut(&peer_ip) {
Some(failures) => failures.push((failure, OffsetDateTime::now_utc().unix_timestamp())),
None => error!("Missing failure entry for {}", peer_ip),
};
}
///
/// Removes failures that have expired.
///
async fn remove_expired_failures(&self) {
// Clear all failures that have lived longer than `E::FAILURE_EXPIRY_TIME_IN_SECS`.
let now = OffsetDateTime::now_utc().unix_timestamp();
self.failures.write().await.iter_mut().for_each(|(_, failures)| {
failures.retain(|(_, time_of_fail)| now.saturating_sub(*time_of_fail) < E::FAILURE_EXPIRY_TIME_IN_SECS as i64)
});
}
///
/// Disconnects from connected peers who exhibit frequent failures.
///
async fn disconnect_from_failing_peers(&self) {
let peers_to_disconnect = self
.failures
.read()
.await
.iter()
.filter(|(_, failures)| failures.len() > E::MAXIMUM_NUMBER_OF_FAILURES)
.map(|(peer_ip, _)| *peer_ip)
.collect::<Vec<_>>();
for peer_ip in peers_to_disconnect {
self.disconnect(peer_ip, DisconnectReason::TooManyFailures).await;
}
}
}