use std::cmp::min;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use ethrex_blockchain::{
BatchBlockProcessingFailure, Blockchain,
error::{ChainError, InvalidBlockError},
};
use ethrex_common::{
H256,
types::{Block, BlockBody, BlockHeader, block_access_list::BlockAccessList},
};
use ethrex_storage::Store;
use tokio::sync::RwLock;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::peer_handler::{BlockRequestOrder, HeaderFetchOutcome, PeerHandler};
use crate::snap::constants::{
MAX_BLOCK_BODIES_TO_REQUEST, MAX_BODY_FETCH_ATTEMPTS, MAX_HEADER_FETCH_ATTEMPTS,
};
use super::{EXECUTE_BATCH_SIZE, SyncDiagnostics, SyncError};
const STALE_FORKCHOICE_HEAD_SECS: u64 = 1800;
const FOLLOW_DISTANCE: u64 = 8;
fn humanize_secs(secs: u64) -> String {
if secs < 60 {
return "< 1m".to_string();
}
let days = secs / 86_400;
let hours = (secs % 86_400) / 3_600;
let mins = (secs % 3_600) / 60;
if days > 0 {
format!("{days}d {hours}h")
} else if hours > 0 {
format!("{hours}h {mins}m")
} else {
format!("{mins}m")
}
}
async fn request_bodies_with_retry(
peers: &mut PeerHandler,
headers: &[BlockHeader],
) -> Result<Option<Vec<BlockBody>>, SyncError> {
for attempt in 1..=MAX_BODY_FETCH_ATTEMPTS {
if let Some(bodies) = peers.request_block_bodies(headers).await? {
return Ok(Some(bodies));
}
if attempt == MAX_BODY_FETCH_ATTEMPTS {
break;
}
let from = headers.first().map(|h| h.number).unwrap_or_default();
let to = headers.last().map(|h| h.number).unwrap_or_default();
let eth_capable_peers = peers.eth_capable_peer_count().await;
warn!(
eth_capable_peers,
from,
to,
"Failed to fetch block bodies (attempt {attempt}/{MAX_BODY_FETCH_ATTEMPTS}), retrying in 2s"
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
Ok(None)
}
pub fn is_resume_point(store: &Store, header: &BlockHeader) -> Result<bool, SyncError> {
Ok(store.is_canonical_sync(header.hash())? && store.has_state_root(header.state_root)?)
}
pub fn first_resume_point_in_batch(
store: &Store,
block_headers: &[BlockHeader],
local_head: u64,
) -> Result<Option<usize>, SyncError> {
let Some(oldest) = block_headers.last() else {
return Ok(None);
};
if oldest.number > local_head {
return Ok(None);
}
for (index, header) in block_headers.iter().enumerate() {
if is_resume_point(store, header)? {
return Ok(Some(index));
}
}
Ok(None)
}
pub async fn sync_cycle_full(
peers: &mut PeerHandler,
blockchain: Arc<Blockchain>,
cancel_token: CancellationToken,
mut sync_head: H256,
store: Store,
diagnostics: &Arc<RwLock<SyncDiagnostics>>,
) -> Result<(), SyncError> {
let local_head = store.get_latest_block_number().await?;
let eth_capable_peers = peers.eth_capable_peer_count().await;
info!(
local_head,
eth_capable_peers,
?sync_head,
"Starting full sync cycle"
);
let mut pending_blocks = vec![];
while let Some(block) = store.get_pending_block(sync_head).await? {
if store.is_canonical_sync(block.hash())? {
break;
}
sync_head = block.header.parent_hash;
pending_blocks.insert(0, block);
}
if !pending_blocks.is_empty() && store.is_canonical_sync(sync_head)? {
let parent_has_state = match store.get_block_header_by_hash(sync_head)? {
Some(parent) => store.has_state_root(parent.state_root)?,
None => false,
};
if parent_has_state {
info!(
"Executing {} pending blocks for full sync (gap fully covered by blocks from the consensus client, no peer download needed). First block hash: {:#?} Last block hash: {:#?}",
pending_blocks.len(),
pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(),
pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash()
);
add_blocks_in_batch(
blockchain.clone(),
cancel_token.clone(),
pending_blocks,
true,
store.clone(),
peers,
)
.await?;
store.clear_fullsync_headers().await?;
return Ok(());
}
}
let fcu_head = pending_blocks
.last()
.map(|block| (block.header.number, block.header.timestamp));
let mut start_block_number;
let mut end_block_number = 0;
let mut headers = vec![];
let mut single_batch = true;
let mut attempts = 0;
let mut started_behind = false;
let mut sync_target_logged = false;
loop {
let outcome = peers
.request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld)
.await?;
let mut block_headers = match outcome {
HeaderFetchOutcome::Headers(headers) => headers,
other => {
let reason = other.failure_reason();
let eth_capable_peers = peers.eth_capable_peer_count().await;
if attempts >= MAX_HEADER_FETCH_ATTEMPTS {
warn!(
eth_capable_peers,
reason,
?sync_head,
"Sync failed to find target block header after {attempts} attempts, aborting to wait for a newer sync head"
);
return Ok(());
}
attempts += 1;
warn!(
eth_capable_peers,
reason,
"Failed to fetch headers for sync head (attempt {attempts}/{MAX_HEADER_FETCH_ATTEMPTS}), retrying in 2s"
);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};
debug!("Sync Log 9: Received {} block headers", block_headers.len());
attempts = 0;
let first_header = block_headers.first().ok_or(SyncError::NoBlocks)?;
let last_header = block_headers.last().ok_or(SyncError::NoBlocks)?;
debug!(
"Received {} block headers | First Number: {} Last Number: {}",
block_headers.len(),
first_header.number,
last_header.number,
);
if !sync_target_logged {
sync_target_logged = true;
let (target, target_ts) =
fcu_head.unwrap_or((first_header.number, first_header.timestamp));
let local_head = store.get_latest_block_number().await?;
let behind = target.saturating_sub(local_head);
if behind > FOLLOW_DISTANCE {
started_behind = true;
info!(
"Sync target from consensus forkchoice: block {target} ({behind} blocks ahead of local head {local_head})"
);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let age = now.saturating_sub(target_ts);
if age > STALE_FORKCHOICE_HEAD_SECS {
warn!(
"Consensus forkchoice head (block {target}) is {} old. This can happen while the consensus client is still catching up to chain head; \
if so, execution will only advance as fast as it does. If sync seems slow, it may be worth checking the consensus client's sync status.",
humanize_secs(age)
);
}
}
}
end_block_number = end_block_number.max(first_header.number);
let batch_newest_number = first_header.number;
sync_head = last_header.parent_hash;
let parent_is_resume_point = match store.get_block_header_by_hash(sync_head)? {
Some(parent) => is_resume_point(&store, &parent)?,
None => false,
};
let batch_resume_index = first_resume_point_in_batch(&store, &block_headers, local_head)?;
if parent_is_resume_point || batch_resume_index.is_some() || sync_head.is_zero() {
let first_skippable = batch_resume_index.unwrap_or(block_headers.len());
block_headers.drain(first_skippable..block_headers.len());
match block_headers.last() {
Some(last_header) => start_block_number = last_header.number,
None => start_block_number = batch_newest_number.saturating_add(1),
}
let resume_parent_number = start_block_number.saturating_sub(1);
let resume_parent_has_state = match store.get_block_header(resume_parent_number)? {
Some(parent) => store.has_state_root(parent.state_root)?,
None => false,
};
if !resume_parent_has_state {
let local_head = store.get_latest_block_number().await?;
warn!(
resume_parent_number,
local_head,
"Full sync cannot resume: post-state for block {resume_parent_number} is absent \
(pruned from the layered store, or never executed). The consensus sync head does \
not reconcile to a block whose state we retain; pausing until a reconcilable \
forkchoice head arrives. If this persists with no state above genesis, the datadir \
needs a fresh resync (ethrex removedb)."
);
store.clear_fullsync_headers().await?;
return Ok(());
}
let canonical_head = store.get_latest_block_number().await?;
let state_head = start_block_number.saturating_sub(1);
diagnostics.write().await.executed_head = state_head;
if start_block_number <= canonical_head {
warn!(
state_head,
canonical_head,
gap = canonical_head
.saturating_add(1)
.saturating_sub(start_block_number),
"Full sync resuming below canonical head: re-executing canonical-but-stateless blocks (FCU canonicalized past executed state)"
);
}
if single_batch {
headers = block_headers.into_iter().rev().collect();
} else {
store.add_fullsync_batch(block_headers).await?;
}
break;
}
store.add_fullsync_batch(block_headers).await?;
single_batch = false;
}
end_block_number += 1;
start_block_number = start_block_number.max(1);
let (body_tx, mut body_rx) =
tokio::sync::mpsc::channel::<Result<(Vec<Block>, bool), SyncError>>(2);
let mut download_peers = peers.clone();
let download_store = store.clone();
let download_task = tokio::spawn(async move {
if single_batch {
let final_batch = true;
let mut batch_headers = headers;
let mut blocks = Vec::new();
while !batch_headers.is_empty() {
let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len());
let header_batch = &batch_headers[..end];
match request_bodies_with_retry(&mut download_peers, header_batch).await {
Ok(Some(bodies)) => {
debug!("Obtained: {} block bodies", bodies.len());
let block_batch = batch_headers
.drain(..bodies.len())
.zip(bodies)
.map(|(header, body)| Block { header, body });
blocks.extend(block_batch);
}
Ok(None) => {
let eth_capable_peers = download_peers.eth_capable_peer_count().await;
warn!(
eth_capable_peers,
"Block bodies unavailable from peers after retries; pausing full sync until a new forkchoice head arrives"
);
return;
}
Err(e) => {
let _ = body_tx.send(Err(e)).await;
return;
}
}
}
if !blocks.is_empty() {
let _ = body_tx.send(Ok((blocks, final_batch))).await;
}
return;
}
for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) {
let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize);
let final_batch = end_block_number == start + batch_size as u64;
let batch_headers = match download_store
.read_fullsync_batch(start, batch_size as u64)
.await
{
Ok(h) => h,
Err(e) => {
let _ = body_tx.send(Err(e.into())).await;
return;
}
};
let mut batch_headers: Vec<_> = match batch_headers
.into_iter()
.map(|opt| opt.ok_or(SyncError::MissingFullsyncBatch))
.collect()
{
Ok(h) => h,
Err(e) => {
let _ = body_tx.send(Err(e)).await;
return;
}
};
let mut blocks = Vec::new();
while !batch_headers.is_empty() {
let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len());
let header_batch = &batch_headers[..end];
match request_bodies_with_retry(&mut download_peers, header_batch).await {
Ok(Some(bodies)) => {
debug!("Obtained: {} block bodies", bodies.len());
let block_batch = batch_headers
.drain(..bodies.len())
.zip(bodies)
.map(|(header, body)| Block { header, body });
blocks.extend(block_batch);
}
Ok(None) => {
let eth_capable_peers = download_peers.eth_capable_peer_count().await;
warn!(
eth_capable_peers,
"Block bodies unavailable from peers after retries; pausing full sync until a new forkchoice head arrives"
);
return;
}
Err(e) => {
let _ = body_tx.send(Err(e)).await;
return;
}
}
}
if !blocks.is_empty() && body_tx.send(Ok((blocks, final_batch))).await.is_err() {
return;
}
}
});
let mut reached_target = false;
while let Some(result) = body_rx.recv().await {
let (blocks, final_batch) = result?;
debug!(
"Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}",
blocks.len(),
blocks.first().ok_or(SyncError::NoBlocks)?.hash(),
blocks.last().ok_or(SyncError::NoBlocks)?.hash()
);
add_blocks_in_batch(
blockchain.clone(),
cancel_token.clone(),
blocks,
final_batch,
store.clone(),
peers,
)
.await?;
if final_batch {
reached_target = true;
}
}
download_task.await?;
if let Some(oldest_pending) = pending_blocks.first() {
let parent_has_state =
match store.get_block_header_by_hash(oldest_pending.header.parent_hash)? {
Some(parent) => store.has_state_root(parent.state_root)?,
None => false,
};
if !parent_has_state {
let local_head = store.get_latest_block_number().await?;
warn!(
local_head,
"Skipping {} pending block(s): the downloaded chain they build on was not fully executed (parent state absent); will retry on the next forkchoice update",
pending_blocks.len()
);
} else {
info!(
"Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}",
pending_blocks.len(),
pending_blocks.first().ok_or(SyncError::NoBlocks)?.hash(),
pending_blocks.last().ok_or(SyncError::NoBlocks)?.hash()
);
add_blocks_in_batch(
blockchain.clone(),
cancel_token.clone(),
pending_blocks,
true,
store.clone(),
peers,
)
.await?;
reached_target = true;
}
}
if started_behind {
let local_head = store.get_latest_block_number().await?;
if reached_target {
info!(
"Reached consensus-provided head at block {local_head}. Waiting for the next forkchoice update from the consensus client."
);
} else {
warn!(
local_head,
"Full sync paused before reaching the consensus-provided head (data unavailable from peers); will resume on the next forkchoice update"
);
}
}
store.clear_fullsync_headers().await?;
Ok(())
}
async fn add_blocks_in_batch(
blockchain: Arc<Blockchain>,
cancel_token: CancellationToken,
blocks: Vec<Block>,
final_batch: bool,
store: Store,
peers: &mut PeerHandler,
) -> Result<(), SyncError> {
let execution_start = Instant::now();
let blocks_len = blocks.len();
let numbers_and_hashes = blocks
.iter()
.map(|b| (b.header.number, b.hash()))
.collect::<Vec<_>>();
let (last_block_number, last_block_hash) = numbers_and_hashes
.last()
.cloned()
.ok_or(SyncError::InvalidRangeReceived)?;
let (first_block_number, first_block_hash) = numbers_and_hashes
.first()
.cloned()
.ok_or(SyncError::InvalidRangeReceived)?;
let blocks_hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
let chain_config = store.get_chain_config();
let bals: Vec<Option<BlockAccessList>> = {
let any_amsterdam = blocks
.iter()
.any(|b| chain_config.is_amsterdam_activated(b.header.timestamp));
if any_amsterdam {
match peers.request_block_access_lists(&blocks_hashes).await {
Ok(Some(bals)) if bals.len() == blocks.len() => bals,
_ => {
debug!("BAL fetch unavailable or failed, proceeding without BALs");
vec![None; blocks.len()]
}
}
} else {
vec![None; blocks.len()]
}
};
if let Err((err, batch_failure)) =
add_blocks(blockchain.clone(), blocks, bals, final_batch, cancel_token).await
{
if let Some(batch_failure) = batch_failure {
warn!("Failed to add block during FullSync: {err}");
if let ChainError::InvalidBlock(_) = err {
let mut block_hashes_with_invalid_ancestor: Vec<H256> = vec![];
if let Some(index) = blocks_hashes
.iter()
.position(|x| x == &batch_failure.failed_block_hash)
{
block_hashes_with_invalid_ancestor = blocks_hashes[index..].to_vec();
}
for hash in block_hashes_with_invalid_ancestor {
store
.set_latest_valid_ancestor(hash, batch_failure.last_valid_hash)
.await?;
}
}
}
return Err(err.into());
}
store
.forkchoice_update(
numbers_and_hashes,
last_block_number,
last_block_hash,
None,
None,
)
.await?;
let execution_time: f64 = execution_start.elapsed().as_millis() as f64 / 1000.0;
let blocks_per_second = blocks_len as f64 / execution_time;
info!(
"Executed and stored {} blocks in {:.3} seconds ({:.3} blocks/s). First block: {} ({}). Last block: {} ({}).",
blocks_len,
execution_time,
blocks_per_second,
first_block_number,
first_block_hash,
last_block_number,
last_block_hash
);
Ok(())
}
async fn add_blocks(
blockchain: Arc<Blockchain>,
blocks: Vec<Block>,
bals: Vec<Option<BlockAccessList>>,
sync_head_found: bool,
cancel_token: CancellationToken,
) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
if sync_head_found {
return run_blocks_pipeline(blockchain, blocks, bals).await;
}
match blockchain
.add_blocks_in_batch(blocks.clone(), &bals, cancel_token)
.await
{
Ok(()) => Ok(()),
Err((ChainError::InvalidBlock(ref err), ref batch_failure))
if is_post_execution_error(err) =>
{
let failed_block_info = batch_failure
.as_ref()
.and_then(|f| {
blocks
.iter()
.find(|b| b.hash() == f.failed_block_hash)
.map(|b| format!("block {} ({})", b.header.number, f.failed_block_hash))
})
.unwrap_or_else(|| "unknown block".to_string());
warn!(
"Batch execution failed at {failed_block_info} with: {err}. \
Retrying batch with per-block pipeline execution."
);
run_blocks_pipeline(blockchain, blocks, bals).await
}
Err(e) => Err(e),
}
}
fn is_post_execution_error(err: &InvalidBlockError) -> bool {
matches!(
err,
InvalidBlockError::GasUsedMismatch(_, _)
| InvalidBlockError::StateRootMismatch
| InvalidBlockError::ReceiptsRootMismatch
| InvalidBlockError::RequestsHashMismatch
| InvalidBlockError::BlockAccessListHashMismatch
| InvalidBlockError::BlobGasUsedMismatch
)
}
async fn run_blocks_pipeline(
blockchain: Arc<Blockchain>,
blocks: Vec<Block>,
bals: Vec<Option<BlockAccessList>>,
) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
tokio::task::spawn_blocking(move || {
let mut last_valid_hash = H256::default();
for (block, bal) in blocks.into_iter().zip(bals.into_iter()) {
let block_hash = block.hash();
blockchain
.add_block_pipeline(block, bal.as_ref())
.map_err(|e| {
(
e,
Some(BatchBlockProcessingFailure {
last_valid_hash,
failed_block_hash: block_hash,
}),
)
})?;
last_valid_hash = block_hash;
}
Ok(())
})
.await
.map_err(|e| (ChainError::Custom(e.to_string()), None))?
}