use crate::{
RuntimeConfig,
utils::{BlockLoader, EthereumBlockLoader},
};
use alloy::{
providers::RootProvider,
rpc::types::eth::Header,
transports::{RpcError as AlloyRpcError, TransportErrorKind},
};
use anyhow::{Context as _, anyhow};
use ethexe_common::{
self, BlockData, BlockHeader, CodeBlobInfo, SimpleBlockData,
db::{GlobalsStorageRO, GlobalsStorageRW, OnChainStorageRO, OnChainStorageRW},
events::{BlockEvent, RouterEvent, router::CodeValidationRequestedEvent},
};
use ethexe_db::Database;
use ethexe_ethereum::{
middleware::{ElectionProvider, MiddlewareQuery},
router::RouterQuery,
};
use gprimitives::H256;
use std::collections::HashMap;
#[derive(Debug, thiserror::Error)]
pub enum SyncError {
#[error("RPC error during sync: {0:?}")]
RpcError(anyhow::Error),
#[error(transparent)]
Fatal(anyhow::Error),
}
pub type SyncResult<T> = std::result::Result<T, SyncError>;
type Result<T, E = anyhow::Error> = std::result::Result<T, E>;
impl From<anyhow::Error> for SyncError {
fn from(err: anyhow::Error) -> Self {
let is_rpc = err.chain().any(|e| {
e.downcast_ref::<AlloyRpcError<TransportErrorKind>>()
.is_some()
|| matches!(
e.downcast_ref::<alloy::contract::Error>(),
Some(alloy::contract::Error::TransportError(_))
)
});
if is_rpc {
Self::RpcError(err)
} else {
Self::Fatal(err)
}
}
}
#[derive(Clone)]
pub(crate) struct ChainSync {
pub db: Database,
pub config: RuntimeConfig,
pub router_query: RouterQuery,
pub middleware_query: MiddlewareQuery,
pub block_loader: EthereumBlockLoader,
}
impl ChainSync {
pub fn new(db: Database, config: RuntimeConfig, provider: RootProvider) -> Self {
let router_query = RouterQuery::from_provider(config.router_address, provider.clone());
let middleware_query =
MiddlewareQuery::from_provider(config.middleware_address, provider.clone());
let block_loader = EthereumBlockLoader::new(provider, config.router_address);
Self {
db,
config,
router_query,
middleware_query,
block_loader,
}
}
pub async fn sync(self, chain_head: Header) -> SyncResult<H256> {
let block = SimpleBlockData {
hash: H256(chain_head.hash.0),
header: BlockHeader {
height: chain_head.number as u32,
timestamp: chain_head.timestamp,
parent_hash: H256(chain_head.parent_hash.0),
},
};
let blocks_data = self.pre_load_data(&block.header).await?;
let chain = self.load_chain(&block, blocks_data).await?;
self.ensure_validators(block).await?;
self.mark_chain_as_synced(chain.into_iter().rev());
Ok(block.hash)
}
async fn load_chain(
&self,
block: &SimpleBlockData,
mut blocks_data: HashMap<H256, BlockData>,
) -> Result<Vec<SimpleBlockData>> {
let mut chain = Vec::new();
let mut current_block_hash = block.hash;
while !self.db.block_synced(current_block_hash) {
let block_data = match blocks_data.remove(¤t_block_hash) {
Some(data) => data,
None => {
self.block_loader
.load(
current_block_hash,
(current_block_hash == block.hash).then_some(block.header),
)
.await?
}
};
if current_block_hash != block_data.hash {
unreachable!(
"Expected data for block hash {current_block_hash}, got for {}",
block_data.hash
);
}
for event in block_data.events.iter() {
if let &BlockEvent::Router(RouterEvent::CodeValidationRequested(
CodeValidationRequestedEvent {
code_id,
timestamp,
tx_hash,
},
)) = event
{
self.db
.set_code_blob_info(code_id, CodeBlobInfo { timestamp, tx_hash });
}
}
self.db
.set_block_header(current_block_hash, block_data.header);
self.db
.set_block_events(current_block_hash, &block_data.events);
chain.push(SimpleBlockData {
hash: current_block_hash,
header: block_data.header,
});
current_block_hash = block_data.header.parent_hash;
}
Ok(chain)
}
async fn pre_load_data(&self, header: &BlockHeader) -> Result<HashMap<H256, BlockData>> {
let latest_synced_eb_height = self.db.globals().latest_synced_eb.header.height;
if header.height <= latest_synced_eb_height {
tracing::warn!(
"Got a block with number {} <= latest synced block number: {}, maybe a reorg",
header.height,
latest_synced_eb_height
);
return Ok(Default::default());
}
if (header.height - latest_synced_eb_height) >= self.config.max_sync_depth {
return Err(anyhow!(
"Too much to sync: current block number: {}, Latest synced block number: {}, Max depth: {}",
header.height,
latest_synced_eb_height,
self.config.max_sync_depth
));
}
if header.height - latest_synced_eb_height < self.config.batched_sync_depth {
return Ok(Default::default());
}
self.block_loader
.load_many(latest_synced_eb_height as u64..=header.height as u64)
.await
}
async fn ensure_validators(&self, block_data: SimpleBlockData) -> Result<()> {
let chain_head_era = self
.config
.timelines
.era_from_ts(block_data.header.timestamp)
.context("failed to calculate era from timestamp")?;
if self.db.validators(chain_head_era).is_none() {
let validators = self.router_query.validators_at(block_data.hash).await?;
self.db.set_validators(chain_head_era, validators);
}
if let Some(election_ts) = self.election_timestamp_finalized(block_data.header.timestamp)
&& self.db.validators(chain_head_era + 1).is_none()
{
let next_era_validators = self
.middleware_query
.make_election_at(election_ts, 10)
.await?;
self.db
.set_validators(chain_head_era + 1, next_era_validators);
}
Ok(())
}
fn mark_chain_as_synced(&self, chain: impl Iterator<Item = SimpleBlockData>) {
for data in chain {
let SimpleBlockData { hash, header } = data;
self.db.set_block_synced(hash);
log::trace!(
"✅ block {hash} synced, events: {:?}",
self.db.block_events(hash)
);
self.db
.globals_mutate(|g| g.latest_synced_eb = SimpleBlockData { hash, header });
}
}
fn election_timestamp_finalized(&self, timestamp: u64) -> Option<u64> {
let era = self.config.timelines.era_from_ts(timestamp)?;
let election_ts = self.config.timelines.era_election_start_ts(era)?;
(timestamp.saturating_sub(election_ts)
> self.config.timelines.slot.get() * self.config.finalization_period_blocks)
.then_some(election_ts)
}
}