use std::fmt;
use anyhow::Context;
use async_trait::async_trait;
use zksync_eth_client::EthInterface;
use zksync_types::{block::L2BlockHeader, web3, Address, L1BatchNumber, H256, U256, U64};
use zksync_web3_decl::{
client::{DynClient, L1, L2},
error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult},
jsonrpsee::core::ClientError,
namespaces::ZksNamespaceClient,
};
use super::{
metrics::{ProcessingStage, TreeDataProviderSource, METRICS},
TreeDataFetcherResult,
};
#[cfg(test)]
mod tests;
#[derive(Debug, thiserror::Error)]
pub(super) enum MissingData {
#[error("no requested L1 batch")]
Batch,
#[error("no root hash for L1 batch")]
RootHash,
#[error("possible chain reorg detected")]
PossibleReorg,
}
pub(super) type TreeDataProviderResult = TreeDataFetcherResult<Result<H256, MissingData>>;
#[async_trait]
pub(super) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult;
}
#[async_trait]
impl TreeDataProvider for Box<DynClient<L2>> {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
let Some(batch_details) = self
.get_l1_batch_details(number)
.rpc_context("get_l1_batch_details")
.with_arg("number", &number)
.await?
else {
return Ok(Err(MissingData::Batch));
};
let remote_l2_block_hash = self
.get_block_details(last_l2_block.number)
.rpc_context("get_block_details")
.with_arg("number", &last_l2_block.number)
.await?
.and_then(|block| block.base.root_hash);
if remote_l2_block_hash != Some(last_l2_block.hash) {
let last_l2_block_number = last_l2_block.number;
let last_l2_block_hash = last_l2_block.hash;
tracing::info!(
"Fetched hash of the last L2 block #{last_l2_block_number} in L1 batch #{number} ({remote_l2_block_hash:?}) \
does not match the local one ({last_l2_block_hash:?}); this can be caused by a chain reorg"
);
return Ok(Err(MissingData::PossibleReorg));
}
Ok(batch_details.base.root_hash.ok_or(MissingData::RootHash))
}
}
#[derive(Debug, Clone, Copy)]
struct PastL1BatchInfo {
number: L1BatchNumber,
l1_commit_block_number: U64,
l1_commit_block_timestamp: U256,
}
#[derive(Debug)]
pub(super) struct L1DataProvider {
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
block_commit_signature: H256,
past_l1_batch: Option<PastL1BatchInfo>,
}
impl L1DataProvider {
const L1_BLOCK_ACCURACY: U64 = U64([1_000]);
const L1_BLOCK_RANGE: U64 = U64([20_000]);
pub fn new(
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
) -> anyhow::Result<Self> {
let block_commit_signature = zksync_contracts::hyperchain_contract()
.event("BlockCommit")
.context("missing `BlockCommit` event")?
.signature();
Ok(Self {
eth_client,
diamond_proxy_address,
block_commit_signature,
past_l1_batch: None,
})
}
async fn guess_l1_commit_block_number(
eth_client: &DynClient<L1>,
l1_batch_seal_timestamp: u64,
) -> EnrichedClientResult<(U64, usize)> {
let l1_batch_seal_timestamp = U256::from(l1_batch_seal_timestamp);
let (latest_number, latest_timestamp) =
Self::get_block(eth_client, web3::BlockNumber::Latest).await?;
if latest_timestamp < l1_batch_seal_timestamp {
return Ok((latest_number, 0)); }
let (earliest_number, earliest_timestamp) =
Self::get_block(eth_client, web3::BlockNumber::Earliest).await?;
if earliest_timestamp > l1_batch_seal_timestamp {
return Ok((earliest_number, 0)); }
let mut steps = 0;
let mut left = earliest_number;
let mut right = latest_number;
while left + Self::L1_BLOCK_ACCURACY < right {
let middle = (left + right) / 2;
let (_, middle_timestamp) =
Self::get_block(eth_client, web3::BlockNumber::Number(middle)).await?;
if middle_timestamp <= l1_batch_seal_timestamp {
left = middle;
} else {
right = middle;
}
steps += 1;
}
Ok((left, steps))
}
async fn get_block(
eth_client: &DynClient<L1>,
number: web3::BlockNumber,
) -> EnrichedClientResult<(U64, U256)> {
let block = eth_client.block(number.into()).await?.ok_or_else(|| {
let err = "block is missing on L1 RPC provider";
EnrichedClientError::new(ClientError::Custom(err.into()), "get_block")
.with_arg("number", &number)
})?;
let number = block.number.ok_or_else(|| {
let err = "block is missing a number";
EnrichedClientError::new(ClientError::Custom(err.into()), "get_block")
.with_arg("number", &number)
})?;
Ok((number, block.timestamp))
}
}
#[async_trait]
impl TreeDataProvider for L1DataProvider {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
let l1_batch_seal_timestamp = last_l2_block.timestamp;
let from_block = self.past_l1_batch.and_then(|info| {
assert!(
info.number < number,
"`batch_details()` must be called with monotonically increasing numbers"
);
let threshold_timestamp = info.l1_commit_block_timestamp + Self::L1_BLOCK_RANGE.as_u64() / 2;
if U256::from(l1_batch_seal_timestamp) > threshold_timestamp {
tracing::debug!(
number = number.0,
"L1 batch #{number} seal timestamp ({l1_batch_seal_timestamp}) is too far ahead \
of the previous processed L1 batch ({info:?}); not using L1 batch info"
);
None
} else {
Some(info.l1_commit_block_number)
}
});
let from_block = match from_block {
Some(number) => number,
None => {
let (approximate_block, steps) = Self::guess_l1_commit_block_number(
self.eth_client.as_ref(),
l1_batch_seal_timestamp,
)
.await?;
tracing::debug!(
number = number.0,
"Guessed L1 block number for L1 batch #{number} commit in {steps} binary search steps: {approximate_block}"
);
METRICS
.l1_commit_block_number_binary_search_steps
.observe(steps);
approximate_block.saturating_sub(Self::L1_BLOCK_ACCURACY)
}
};
let number_topic = H256::from_low_u64_be(number.0.into());
let filter = web3::FilterBuilder::default()
.address(vec![self.diamond_proxy_address])
.from_block(web3::BlockNumber::Number(from_block))
.to_block(web3::BlockNumber::Number(from_block + Self::L1_BLOCK_RANGE))
.topics(
Some(vec![self.block_commit_signature]),
Some(vec![number_topic]),
None,
None,
)
.build();
let mut logs = self.eth_client.logs(&filter).await?;
logs.retain(|log| !log.is_removed() && log.block_number.is_some());
match logs.as_slice() {
[] => Ok(Err(MissingData::Batch)),
[log] => {
let root_hash = log.topics.get(2).copied().ok_or_else(|| {
let err = "Bogus `BlockCommit` event, does not have the root hash topic";
EnrichedClientError::new(ClientError::Custom(err.into()), "batch_details")
.with_arg("filter", &filter)
.with_arg("log", &log)
})?;
let l1_commit_block_number = log.block_number.unwrap();
let diff = l1_commit_block_number.saturating_sub(from_block).as_u64();
METRICS.l1_commit_block_number_from_diff.observe(diff);
tracing::debug!(
"`BlockCommit` event for L1 batch #{number} is at block #{l1_commit_block_number}, \
{diff} block(s) after the `from` block from the filter"
);
let l1_commit_block = self.eth_client.block(l1_commit_block_number.into()).await?;
let l1_commit_block = l1_commit_block.ok_or_else(|| {
let err = "Block disappeared from L1 RPC provider";
EnrichedClientError::new(ClientError::Custom(err.into()), "batch_details")
.with_arg("number", &l1_commit_block_number)
})?;
self.past_l1_batch = Some(PastL1BatchInfo {
number,
l1_commit_block_number,
l1_commit_block_timestamp: l1_commit_block.timestamp,
});
Ok(Ok(root_hash))
}
_ => {
tracing::warn!(
"Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}, potentially as a result \
of a chain reorg: {logs:?}"
);
Ok(Err(MissingData::PossibleReorg))
}
}
}
}
#[derive(Debug)]
pub(super) struct CombinedDataProvider {
l1: Option<L1DataProvider>,
rpc: Box<dyn TreeDataProvider>,
}
impl CombinedDataProvider {
pub fn new(fallback: impl TreeDataProvider) -> Self {
Self {
l1: None,
rpc: Box::new(fallback),
}
}
pub fn set_l1(&mut self, l1: L1DataProvider) {
self.l1 = Some(l1);
}
}
#[async_trait]
impl TreeDataProvider for CombinedDataProvider {
#[tracing::instrument(skip(self, last_l2_block))]
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
if let Some(l1) = &mut self.l1 {
let stage_latency = METRICS.stage_latency[&ProcessingStage::FetchL1CommitEvent].start();
let l1_result = l1.batch_details(number, last_l2_block).await;
stage_latency.observe();
match l1_result {
Err(err) => {
if err.is_transient() {
tracing::info!(
"Transient error calling L1 data provider: {:#}",
anyhow::Error::from(err)
);
} else {
tracing::warn!(
"Fatal error calling L1 data provider: {:#}",
anyhow::Error::from(err)
);
self.l1 = None;
}
}
Ok(Ok(root_hash)) => {
METRICS.root_hash_sources[&TreeDataProviderSource::L1CommitEvent].inc();
return Ok(Ok(root_hash));
}
Ok(Err(missing_data)) => {
tracing::info!("L1 data provider misses batch data: {missing_data}");
self.l1 = None;
}
}
}
let stage_latency = METRICS.stage_latency[&ProcessingStage::FetchBatchDetailsRpc].start();
let rpc_result = self.rpc.batch_details(number, last_l2_block).await;
stage_latency.observe();
if matches!(rpc_result, Ok(Ok(_))) {
METRICS.root_hash_sources[&TreeDataProviderSource::BatchDetailsRpc].inc();
}
rpc_result
}
}