use crate::rlpx::initiator::RLPxInitiator;
use crate::{
metrics::{CurrentStepValue, METRICS},
peer_table::{
PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _, RequestPermit,
},
rlpx::{
connection::server::PeerConnection,
error::PeerConnectionError,
eth::{
block_access_lists::{BlockAccessLists, GetBlockAccessLists},
blocks::{
BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
HashOrNumber,
},
},
message::Message as RLPxMessage,
p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
},
};
use ethrex_common::{
H256,
types::{BlockBody, BlockHeader, block_access_list::BlockAccessList, validate_block_body},
};
use ethrex_crypto::NativeCrypto;
use spawned_concurrency::{error::ActorError, tasks::ActorRef};
use std::{
collections::{HashSet, VecDeque},
sync::atomic::Ordering,
time::{Duration, SystemTime},
};
use tracing::{debug, error, trace, warn};
pub use crate::snap::constants::{
HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_CHUNK, MAX_RESPONSE_BYTES,
PEER_REPLY_TIMEOUT, PEER_SELECT_RETRY_ATTEMPTS, RANGE_FILE_CHUNK_SIZE, REQUEST_RETRY_ATTEMPTS,
SNAP_LIMIT,
};
pub use crate::snap::{DumpError, RequestMetadata, RequestStorageTrieNodesError, SnapError};
#[derive(Debug, Clone)]
pub struct PeerHandler {
pub peer_table: PeerTable,
pub initiator: ActorRef<RLPxInitiator>,
}
pub enum BlockRequestOrder {
OldToNew,
NewToOld,
}
#[derive(Debug)]
pub enum HeaderFetchOutcome {
Headers(Vec<BlockHeader>),
NoPeerAvailable,
PeerFailed,
}
impl HeaderFetchOutcome {
pub fn failure_reason(&self) -> &'static str {
match self {
HeaderFetchOutcome::Headers(_) => "headers received",
HeaderFetchOutcome::NoPeerAvailable => {
"no eth-capable peer with a live connection to query (peers may be connecting or recently dropped)"
}
HeaderFetchOutcome::PeerFailed => "peer(s) queried but did not serve headers",
}
}
}
async fn ask_peer_head_number(
peer_id: H256,
connection: &mut PeerConnection,
_permit: RequestPermit,
sync_head: H256,
retries: i32,
) -> Result<u64, PeerHandlerError> {
trace!("Sync Log 11: Requesting sync head block number from peer {peer_id}");
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: HashOrNumber::Hash(sync_head),
limit: 1,
skip: 0,
reverse: false,
});
debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}");
match connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await
{
Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
block_headers,
})) => {
if !block_headers.is_empty() {
let sync_head_number = block_headers
.last()
.ok_or(PeerHandlerError::BlockHeaders)?
.number;
trace!(
"Sync Log 12: Received sync head block headers from peer {peer_id}, sync head number {sync_head_number}"
);
Ok(sync_head_number)
} else {
Err(PeerHandlerError::EmptyResponseFromPeer(peer_id))
}
}
Ok(_other_msgs) => Err(PeerHandlerError::UnexpectedResponseFromPeer(peer_id)),
Err(PeerConnectionError::Timeout) => {
Err(PeerHandlerError::ReceiveMessageFromPeerTimeout(peer_id))
}
Err(_other_err) => Err(PeerHandlerError::ReceiveMessageFromPeer(peer_id)),
}
}
impl PeerHandler {
pub fn new(peer_table: PeerTable, initiator: ActorRef<RLPxInitiator>) -> PeerHandler {
Self {
peer_table,
initiator,
}
}
async fn get_random_peer(
&mut self,
capabilities: &[Capability],
) -> Result<Option<(H256, PeerConnection, RequestPermit)>, PeerHandlerError> {
Ok(self
.peer_table
.get_random_peer(capabilities.to_vec())
.await?)
}
pub async fn eth_capable_peer_count(&self) -> usize {
self.peer_table
.peer_count_by_capabilities(SUPPORTED_ETH_CAPABILITIES.to_vec())
.await
.unwrap_or(0)
}
pub async fn request_block_headers(
&mut self,
start: u64,
sync_head: H256,
) -> Result<Option<Vec<BlockHeader>>, PeerHandlerError> {
let start_time = SystemTime::now();
METRICS
.current_step
.set(CurrentStepValue::DownloadingHeaders);
let mut ret = Vec::<BlockHeader>::new();
let mut sync_head_number = 0_u64;
let sync_head_number_retrieval_start = SystemTime::now();
debug!("Retrieving sync head block number from peers");
let mut retries = 1;
const MAX_PEERS_TO_ASK: usize = 5;
const MAX_RETRIES: i32 = 3;
while sync_head_number == 0 {
if retries > MAX_RETRIES {
return Ok(None);
}
let peers = self
.peer_table
.get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), MAX_PEERS_TO_ASK)
.await?;
let selected_peers: Vec<_> = peers.iter().map(|(id, _, _)| *id).collect();
debug!(
retry = retries,
peers_selected = ?selected_peers,
"request_block_headers: resolving sync head with peers"
);
for (peer_id, mut connection, permit) in peers {
match ask_peer_head_number(peer_id, &mut connection, permit, sync_head, retries)
.await
{
Ok(number) => {
sync_head_number = number;
if number != 0 {
#[cfg(feature = "metrics")]
ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("found");
break;
}
#[cfg(feature = "metrics")]
ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("unknown");
}
Err(err) => {
#[cfg(feature = "metrics")]
ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("timeout");
debug!(
"Sync Log 13: Failed to retrieve sync head block number from peer {peer_id}: {err}"
);
}
}
}
retries += 1;
}
METRICS
.sync_head_block
.store(sync_head_number, Ordering::Relaxed);
sync_head_number = sync_head_number.min(start + MAX_HEADER_CHUNK);
let sync_head_number_retrieval_elapsed = sync_head_number_retrieval_start
.elapsed()
.unwrap_or_default();
debug!("Sync head block number retrieved");
*METRICS.time_to_retrieve_sync_head_block.lock().await =
Some(sync_head_number_retrieval_elapsed);
*METRICS.sync_head_hash.lock().await = sync_head;
let block_count = sync_head_number + 1 - start;
let chunk_count = if block_count < 800_u64 { 1 } else { 800_u64 };
let chunk_limit = block_count / chunk_count;
let mut tasks_queue_not_started = VecDeque::<(u64, u64)>::new();
for i in 0..chunk_count {
tasks_queue_not_started.push_back((i * chunk_limit + start, chunk_limit));
}
if !block_count.is_multiple_of(chunk_count) {
tasks_queue_not_started
.push_back((chunk_count * chunk_limit + start, block_count % chunk_count));
}
let mut downloaded_count = 0_u64;
let (task_sender, mut task_receiver) =
tokio::sync::mpsc::channel::<(Vec<BlockHeader>, H256, PeerConnection, u64, u64)>(1000);
let mut current_show = 0;
debug!("Starting to download block headers from peers");
*METRICS.headers_download_start_time.lock().await = Some(SystemTime::now());
let mut logged_no_free_peers_count = 0;
loop {
if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) =
task_receiver.try_recv()
{
trace!("We received a download chunk from peer");
if headers.is_empty() {
self.peer_table.record_failure(peer_id)?;
debug!("Failed to download chunk from peer. Downloader {peer_id} freed");
tasks_queue_not_started.push_back((startblock, previous_chunk_limit));
continue; }
downloaded_count += headers.len() as u64;
METRICS.downloaded_headers.inc_by(headers.len() as u64);
let batch_show = downloaded_count / 10_000;
if current_show < batch_show {
debug!(
"Downloaded {} headers from peer {} (current count: {downloaded_count})",
headers.len(),
peer_id
);
current_show += 1;
}
ret.extend_from_slice(&headers);
let downloaded_headers = headers.len() as u64;
if downloaded_headers < previous_chunk_limit {
let new_start = startblock + headers.len() as u64;
let new_chunk_limit = previous_chunk_limit - headers.len() as u64;
debug!(
"Task for ({startblock}, {new_chunk_limit}) was not completed, re-adding to the queue, {new_chunk_limit} remaining headers"
);
tasks_queue_not_started.push_back((new_start, new_chunk_limit));
}
self.peer_table.record_success(peer_id)?;
debug!("Downloader {peer_id} freed");
}
let Some((peer_id, mut connection, permit)) = self
.peer_table
.get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec())
.await?
else {
if logged_no_free_peers_count == 0 {
trace!("We are missing peers in request_block_headers");
logged_no_free_peers_count = 1000;
}
logged_no_free_peers_count -= 1;
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
let Some((startblock, chunk_limit)) = tasks_queue_not_started.pop_front() else {
if downloaded_count >= block_count {
debug!("All headers downloaded successfully");
break;
}
let batch_show = downloaded_count / 10_000;
if current_show < batch_show {
current_show += 1;
}
tokio::task::yield_now().await;
continue;
};
let tx = task_sender.clone();
debug!("Downloader {peer_id} is now busy");
tokio::spawn(async move {
trace!(
"Sync Log 5: Requesting block headers from peer {peer_id}, chunk_limit: {chunk_limit}"
);
let headers = Self::download_chunk_from_peer(
peer_id,
&mut connection,
permit,
startblock,
chunk_limit,
)
.await
.inspect_err(|err| trace!("Sync Log 6: {peer_id} failed to download chunk: {err}"))
.unwrap_or_default();
tx.send((headers, peer_id, connection, startblock, chunk_limit))
.await
.inspect_err(|err| {
error!("Failed to send headers result through channel. Error: {err}")
})
});
}
let elapsed = start_time.elapsed().unwrap_or_default();
debug!(
"Downloaded all headers ({}) in {} seconds",
ret.len(),
format_duration(elapsed)
);
{
let downloaded_headers = ret.len();
let unique_headers = ret.iter().map(|h| h.hash()).collect::<HashSet<_>>();
debug!(
"Downloaded {} headers, unique: {}, duplicates: {}",
downloaded_headers,
unique_headers.len(),
downloaded_headers - unique_headers.len()
);
match downloaded_headers.cmp(&unique_headers.len()) {
std::cmp::Ordering::Equal => {
debug!("All downloaded headers are unique");
}
std::cmp::Ordering::Greater => {
debug!(
"Downloaded headers contain duplicates, {} duplicates found",
downloaded_headers - unique_headers.len()
);
}
std::cmp::Ordering::Less => {
debug!(
"Downloaded headers are less than unique headers, this should not happen"
);
}
}
}
ret.sort_by(|x, y| x.number.cmp(&y.number));
Ok(Some(ret))
}
pub async fn request_block_headers_from_hash(
&mut self,
start: H256,
order: BlockRequestOrder,
) -> Result<HeaderFetchOutcome, PeerHandlerError> {
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: start.into(),
limit: BLOCK_HEADER_LIMIT,
skip: 0,
reverse: matches!(order, BlockRequestOrder::NewToOld),
});
match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
None => Ok(HeaderFetchOutcome::NoPeerAvailable),
Some((peer_id, mut connection, permit)) => {
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
block_headers,
})) = response
{
if block_headers.is_empty() {
debug!(
"[SYNCING] Received empty headers from peer {peer_id}, trying another"
);
self.peer_table.record_failure(peer_id)?;
return Ok(HeaderFetchOutcome::PeerFailed);
}
if are_block_headers_chained(&block_headers, &order) {
if block_headers[0].hash() != start {
warn!(
"[SYNCING] Peer {peer_id} returned headers not starting at the requested hash {start:#x}, penalizing peer"
);
self.peer_table.record_failure(peer_id)?;
return Ok(HeaderFetchOutcome::PeerFailed);
}
self.peer_table.record_success(peer_id)?;
return Ok(HeaderFetchOutcome::Headers(block_headers));
}
debug!(
"Received invalid (unchained) headers from peer, penalizing peer {peer_id}"
);
self.peer_table.record_failure(peer_id)?;
return Ok(HeaderFetchOutcome::PeerFailed);
}
debug!("Didn't receive block headers from peer, penalizing peer {peer_id}");
self.peer_table.record_failure(peer_id)?;
Ok(HeaderFetchOutcome::PeerFailed)
}
}
}
async fn download_chunk_from_peer(
peer_id: H256,
connection: &mut PeerConnection,
permit: RequestPermit,
startblock: u64,
chunk_limit: u64,
) -> Result<Vec<BlockHeader>, PeerHandlerError> {
debug!("Requesting block headers from peer {peer_id}");
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: HashOrNumber::Number(startblock),
limit: chunk_limit,
skip: 0,
reverse: false,
});
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
block_headers,
})) = response
{
if are_block_headers_chained(&block_headers, &BlockRequestOrder::OldToNew) {
Ok(block_headers)
} else {
debug!("Received invalid headers from peer: {peer_id}");
Err(PeerHandlerError::InvalidHeaders)
}
} else {
Err(PeerHandlerError::BlockHeaders)
}
}
async fn request_block_bodies_inner(
&mut self,
block_hashes: &[H256],
) -> Result<Option<(Vec<BlockBody>, H256)>, PeerHandlerError> {
let block_hashes_len = block_hashes.len();
let request_id = rand::random();
let request = RLPxMessage::GetBlockBodies(GetBlockBodies {
id: request_id,
block_hashes: block_hashes.to_vec(),
});
match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
None => Ok(None),
Some((peer_id, mut connection, permit)) => {
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
if let Ok(RLPxMessage::BlockBodies(BlockBodies {
id: _,
block_bodies,
})) = response
{
if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len {
self.peer_table.record_success(peer_id)?;
return Ok(Some((block_bodies, peer_id)));
}
}
debug!("Didn't receive block bodies from peer, penalizing peer {peer_id}");
self.peer_table.record_failure(peer_id)?;
let _ = self.peer_table.set_disposable(peer_id);
Ok(None)
}
}
}
pub async fn request_block_bodies(
&mut self,
block_headers: &[BlockHeader],
) -> Result<Option<Vec<BlockBody>>, PeerHandlerError> {
let block_hashes: Vec<H256> = block_headers.iter().map(|h| h.hash()).collect();
for _ in 0..REQUEST_RETRY_ATTEMPTS {
let Some((block_bodies, peer_id)) =
self.request_block_bodies_inner(&block_hashes).await?
else {
continue; };
let mut res = Vec::new();
let mut validation_success = true;
for (header, body) in block_headers[..block_bodies.len()].iter().zip(block_bodies) {
if let Err(e) = validate_block_body(header, &body, &NativeCrypto) {
debug!("Invalid block body error {e}, discarding peer {peer_id} and retrying");
validation_success = false;
self.peer_table.record_critical_failure(peer_id)?;
break;
}
res.push(body);
}
if validation_success {
return Ok(Some(res));
}
}
Ok(None)
}
pub async fn request_block_access_lists(
&mut self,
block_hashes: &[H256],
) -> Result<Option<Vec<Option<BlockAccessList>>>, PeerHandlerError> {
let request_id = rand::random();
let request = RLPxMessage::GetBlockAccessLists(GetBlockAccessLists {
id: request_id,
block_hashes: block_hashes.to_vec(),
});
match self.get_random_peer(&[Capability::eth(71)]).await? {
None => Ok(None),
Some((peer_id, mut connection, permit)) => {
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
match response {
Ok(RLPxMessage::BlockAccessLists(BlockAccessLists {
id,
block_access_lists,
})) if id == request_id => {
self.peer_table.record_success(peer_id)?;
Ok(Some(block_access_lists))
}
_ => {
debug!("Didn't receive block access lists from peer {peer_id}");
self.peer_table.record_failure(peer_id)?;
Ok(None)
}
}
}
}
}
pub async fn read_peer_diagnostics(&self) -> Vec<PeerDiagnostics> {
self.peer_table
.get_peer_diagnostics()
.await
.unwrap_or_default()
}
pub async fn read_connected_peers(&mut self) -> Vec<PeerData> {
self.peer_table
.get_peers_data()
.await
.unwrap_or(Vec::new())
}
pub async fn count_total_peers(&mut self) -> Result<usize, PeerHandlerError> {
Ok(self.peer_table.peer_count().await?)
}
pub async fn get_block_header(
&mut self,
connection: &mut PeerConnection,
_permit: RequestPermit,
block_number: u64,
) -> Result<Option<BlockHeader>, PeerHandlerError> {
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: HashOrNumber::Number(block_number),
limit: 1,
skip: 0,
reverse: false,
});
debug!("get_block_header: requesting header with number {block_number}");
match connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await
{
Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
block_headers,
})) => {
if !block_headers.is_empty() {
return Ok(Some(
block_headers
.last()
.ok_or(PeerHandlerError::BlockHeaders)?
.clone(),
));
}
}
Ok(_other_msgs) => {
debug!("Received unexpected message from peer");
}
Err(PeerConnectionError::Timeout) => {
debug!("Timeout while waiting for sync head from peer");
}
Err(_) => {
debug!("Peer connection closed while waiting for response");
}
}
Ok(None)
}
}
fn are_block_headers_chained(block_headers: &[BlockHeader], order: &BlockRequestOrder) -> bool {
block_headers.windows(2).all(|headers| match order {
BlockRequestOrder::OldToNew => headers[1].parent_hash == headers[0].hash(),
BlockRequestOrder::NewToOld => headers[0].parent_hash == headers[1].hash(),
})
}
fn format_duration(duration: Duration) -> String {
let total_seconds = duration.as_secs();
let hours = total_seconds / 3600;
let minutes = (total_seconds % 3600) / 60;
let seconds = total_seconds % 60;
format!("{hours:02}h {minutes:02}m {seconds:02}s")
}
#[derive(thiserror::Error, Debug)]
pub enum PeerHandlerError {
#[error("Failed to send message to peer: {0}")]
SendMessageToPeer(String),
#[error("Failed to receive block headers")]
BlockHeaders,
#[error("Received unexpected response from peer {0}")]
UnexpectedResponseFromPeer(H256),
#[error("Received an empty response from peer {0}")]
EmptyResponseFromPeer(H256),
#[error("Failed to receive message from peer {0}")]
ReceiveMessageFromPeer(H256),
#[error("Timeout while waiting for message from peer {0}")]
ReceiveMessageFromPeerTimeout(H256),
#[error("Received invalid headers")]
InvalidHeaders,
#[error("Storage Full")]
StorageFull,
#[error("No response from peer")]
NoResponseFromPeer,
#[error("Error in Peer Table: {0}")]
PeerTableError(#[from] ActorError),
#[error("Snap error: {0}")]
Snap(#[from] SnapError),
}
impl PeerHandlerError {
pub fn is_recoverable(&self) -> bool {
match self {
PeerHandlerError::SendMessageToPeer(_)
| PeerHandlerError::BlockHeaders
| PeerHandlerError::UnexpectedResponseFromPeer(_)
| PeerHandlerError::EmptyResponseFromPeer(_)
| PeerHandlerError::ReceiveMessageFromPeer(_)
| PeerHandlerError::ReceiveMessageFromPeerTimeout(_)
| PeerHandlerError::InvalidHeaders
| PeerHandlerError::NoResponseFromPeer => true,
PeerHandlerError::PeerTableError(ActorError::RequestTimeout) => true,
PeerHandlerError::PeerTableError(ActorError::ActorStopped) => false,
PeerHandlerError::StorageFull | PeerHandlerError::Snap(_) => false,
}
}
}