use zksync_dal::{Connection, Core, CoreDal};
use zksync_shared_metrics::{TxStage, APP_METRICS};
use zksync_state_keeper::io::{common::IoCursor, L1BatchParams, L2BlockParams};
use zksync_types::{
api::en::SyncBlock, block::L2BlockHasher, fee_model::BatchFeeInput, helpers::unix_timestamp_ms,
Address, L1BatchNumber, L2BlockNumber, ProtocolVersionId, H256,
};
use super::{
metrics::{L1BatchStage, FETCHER_METRICS},
sync_action::SyncAction,
};
#[derive(Debug, Clone)]
pub struct FetchedTransaction(zksync_types::Transaction);
impl FetchedTransaction {
pub fn new(mut tx: zksync_types::Transaction) -> Self {
tx.received_timestamp_ms = unix_timestamp_ms();
Self(tx)
}
pub fn hash(&self) -> H256 {
self.0.hash()
}
}
impl From<FetchedTransaction> for zksync_types::Transaction {
fn from(tx: FetchedTransaction) -> Self {
tx.0
}
}
#[derive(Debug)]
pub struct FetchedBlock {
pub number: L2BlockNumber,
pub l1_batch_number: L1BatchNumber,
pub last_in_batch: bool,
pub protocol_version: ProtocolVersionId,
pub timestamp: u64,
pub reference_hash: Option<H256>,
pub l1_gas_price: u64,
pub l2_fair_gas_price: u64,
pub fair_pubdata_price: Option<u64>,
pub virtual_blocks: u32,
pub operator_address: Address,
pub transactions: Vec<FetchedTransaction>,
}
impl FetchedBlock {
fn compute_hash(&self, prev_l2_block_hash: H256) -> H256 {
let mut hasher = L2BlockHasher::new(self.number, self.timestamp, prev_l2_block_hash);
for tx in &self.transactions {
hasher.push_tx_hash(tx.hash());
}
hasher.finalize(self.protocol_version)
}
}
impl TryFrom<SyncBlock> for FetchedBlock {
type Error = anyhow::Error;
fn try_from(block: SyncBlock) -> anyhow::Result<Self> {
let Some(transactions) = block.transactions else {
return Err(anyhow::anyhow!("Transactions are always requested"));
};
if transactions.is_empty() && !block.last_in_batch {
return Err(anyhow::anyhow!(
"Only last L2 block of the batch can be empty"
));
}
Ok(Self {
number: block.number,
l1_batch_number: block.l1_batch_number,
last_in_batch: block.last_in_batch,
protocol_version: block.protocol_version,
timestamp: block.timestamp,
reference_hash: block.hash,
l1_gas_price: block.l1_gas_price,
l2_fair_gas_price: block.l2_fair_gas_price,
fair_pubdata_price: block.fair_pubdata_price,
virtual_blocks: block.virtual_blocks.unwrap_or(0),
operator_address: block.operator_address,
transactions: transactions
.into_iter()
.map(FetchedTransaction::new)
.collect(),
})
}
}
#[async_trait::async_trait]
pub trait IoCursorExt: Sized {
async fn for_fetcher(storage: &mut Connection<'_, Core>) -> anyhow::Result<Self>;
fn advance(&mut self, block: FetchedBlock) -> Vec<SyncAction>;
}
#[async_trait::async_trait]
impl IoCursorExt for IoCursor {
async fn for_fetcher(storage: &mut Connection<'_, Core>) -> anyhow::Result<Self> {
let mut this = Self::new(storage).await?;
let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?;
if !was_new_batch_open {
this.l1_batch -= 1; }
Ok(this)
}
fn advance(&mut self, block: FetchedBlock) -> Vec<SyncAction> {
assert_eq!(block.number, self.next_l2_block);
let local_block_hash = block.compute_hash(self.prev_l2_block_hash);
if let Some(reference_hash) = block.reference_hash {
if local_block_hash != reference_hash {
tracing::warn!(
"Mismatch between the locally computed and received L2 block hash for {block:?}; \
local_block_hash = {local_block_hash:?}, prev_l2_block_hash = {:?}",
self.prev_l2_block_hash
);
}
}
let mut new_actions = Vec::new();
if block.l1_batch_number != self.l1_batch {
assert_eq!(
block.l1_batch_number,
self.l1_batch.next(),
"Unexpected batch number in the next received L2 block"
);
tracing::info!(
"New L1 batch: {}. Timestamp: {}",
block.l1_batch_number,
block.timestamp
);
new_actions.push(SyncAction::OpenBatch {
params: L1BatchParams {
protocol_version: block.protocol_version,
validation_computational_gas_limit: super::VALIDATION_COMPUTATIONAL_GAS_LIMIT,
operator_address: block.operator_address,
fee_input: BatchFeeInput::for_protocol_version(
block.protocol_version,
block.l2_fair_gas_price,
block.fair_pubdata_price,
block.l1_gas_price,
),
first_l2_block: L2BlockParams {
timestamp: block.timestamp,
virtual_blocks: block.virtual_blocks,
},
},
number: block.l1_batch_number,
first_l2_block_number: block.number,
});
FETCHER_METRICS.l1_batch[&L1BatchStage::Open].set(block.l1_batch_number.0.into());
self.l1_batch += 1;
} else {
new_actions.push(SyncAction::L2Block {
params: L2BlockParams {
timestamp: block.timestamp,
virtual_blocks: block.virtual_blocks,
},
number: block.number,
});
FETCHER_METRICS.miniblock.set(block.number.0.into());
}
APP_METRICS.processed_txs[&TxStage::added_to_mempool()]
.inc_by(block.transactions.len() as u64);
new_actions.extend(block.transactions.into_iter().map(Into::into));
if block.last_in_batch {
new_actions.push(SyncAction::SealBatch);
} else {
new_actions.push(SyncAction::SealL2Block);
}
self.next_l2_block += 1;
self.prev_l2_block_hash = local_block_hash;
new_actions
}
}