use std::{fmt, time::Duration};
use anyhow::Context as _;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::Serialize;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::watch;
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_shared_metrics::EN_METRICS;
use zksync_types::{
aggregated_operations::AggregatedActionType, api, L1BatchNumber, L2BlockNumber, H256,
};
use zksync_web3_decl::{
client::{DynClient, L2},
error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult},
namespaces::ZksNamespaceClient,
};
use super::metrics::{FetchStage, FETCHER_METRICS};
#[cfg(test)]
mod tests;
fn l1_batch_stage_to_action_str(stage: AggregatedActionType) -> &'static str {
match stage {
AggregatedActionType::Commit => "committed",
AggregatedActionType::PublishProofOnchain => "proven",
AggregatedActionType::Execute => "executed",
}
}
#[derive(Debug)]
struct BatchStatusChange {
number: L1BatchNumber,
l1_tx_hash: H256,
happened_at: DateTime<Utc>,
}
#[derive(Debug, Default)]
struct StatusChanges {
commit: Vec<BatchStatusChange>,
prove: Vec<BatchStatusChange>,
execute: Vec<BatchStatusChange>,
}
impl StatusChanges {
fn is_empty(&self) -> bool {
self.commit.is_empty() && self.prove.is_empty() && self.execute.is_empty()
}
}
#[derive(Debug, thiserror::Error)]
enum UpdaterError {
#[error("JSON-RPC error communicating with main node")]
Web3(#[from] EnrichedClientError),
#[error("Internal error")]
Internal(#[from] anyhow::Error),
}
impl From<zksync_dal::DalError> for UpdaterError {
fn from(err: zksync_dal::DalError) -> Self {
Self::Internal(err.into())
}
}
#[async_trait]
trait MainNodeClient: fmt::Debug + Send + Sync {
async fn resolve_l1_batch_to_l2_block(
&self,
number: L1BatchNumber,
) -> EnrichedClientResult<Option<L2BlockNumber>>;
async fn block_details(
&self,
number: L2BlockNumber,
) -> EnrichedClientResult<Option<api::BlockDetails>>;
}
#[async_trait]
impl MainNodeClient for Box<DynClient<L2>> {
async fn resolve_l1_batch_to_l2_block(
&self,
number: L1BatchNumber,
) -> EnrichedClientResult<Option<L2BlockNumber>> {
let request_latency = FETCHER_METRICS.requests[&FetchStage::GetL2BlockRange].start();
let number = self
.get_l2_block_range(number)
.rpc_context("resolve_l1_batch_to_l2_block")
.with_arg("number", &number)
.await?
.map(|(start, _)| L2BlockNumber(start.as_u32()));
request_latency.observe();
Ok(number)
}
async fn block_details(
&self,
number: L2BlockNumber,
) -> EnrichedClientResult<Option<api::BlockDetails>> {
let request_latency = FETCHER_METRICS.requests[&FetchStage::GetBlockDetails].start();
let details = self
.get_block_details(number)
.rpc_context("block_details")
.with_arg("number", &number)
.await?;
request_latency.observe();
Ok(details)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize)]
struct UpdaterCursor {
last_executed_l1_batch: L1BatchNumber,
last_proven_l1_batch: L1BatchNumber,
last_committed_l1_batch: L1BatchNumber,
}
impl UpdaterCursor {
async fn new(storage: &mut Connection<'_, Core>) -> anyhow::Result<Self> {
let first_l1_batch_number = projected_first_l1_batch(storage).await?;
let starting_l1_batch_number = L1BatchNumber(first_l1_batch_number.saturating_sub(1));
let last_executed_l1_batch = storage
.blocks_dal()
.get_number_of_last_l1_batch_executed_on_eth()
.await?
.unwrap_or(starting_l1_batch_number);
let last_proven_l1_batch = storage
.blocks_dal()
.get_number_of_last_l1_batch_proven_on_eth()
.await?
.unwrap_or(starting_l1_batch_number);
let last_committed_l1_batch = storage
.blocks_dal()
.get_number_of_last_l1_batch_committed_on_eth()
.await?
.unwrap_or(starting_l1_batch_number);
Ok(Self {
last_executed_l1_batch,
last_proven_l1_batch,
last_committed_l1_batch,
})
}
fn extract_tx_hash_and_timestamp(
batch_info: &api::BlockDetails,
stage: AggregatedActionType,
) -> (Option<H256>, Option<DateTime<Utc>>) {
match stage {
AggregatedActionType::Commit => {
(batch_info.base.commit_tx_hash, batch_info.base.committed_at)
}
AggregatedActionType::PublishProofOnchain => {
(batch_info.base.prove_tx_hash, batch_info.base.proven_at)
}
AggregatedActionType::Execute => {
(batch_info.base.execute_tx_hash, batch_info.base.executed_at)
}
}
}
fn update(
&mut self,
status_changes: &mut StatusChanges,
batch_info: &api::BlockDetails,
) -> anyhow::Result<()> {
for stage in [
AggregatedActionType::Commit,
AggregatedActionType::PublishProofOnchain,
AggregatedActionType::Execute,
] {
self.update_stage(status_changes, batch_info, stage)?;
}
Ok(())
}
fn update_stage(
&mut self,
status_changes: &mut StatusChanges,
batch_info: &api::BlockDetails,
stage: AggregatedActionType,
) -> anyhow::Result<()> {
let (l1_tx_hash, happened_at) = Self::extract_tx_hash_and_timestamp(batch_info, stage);
let (last_l1_batch, changes_to_update) = match stage {
AggregatedActionType::Commit => (
&mut self.last_committed_l1_batch,
&mut status_changes.commit,
),
AggregatedActionType::PublishProofOnchain => {
(&mut self.last_proven_l1_batch, &mut status_changes.prove)
}
AggregatedActionType::Execute => (
&mut self.last_executed_l1_batch,
&mut status_changes.execute,
),
};
let Some(l1_tx_hash) = l1_tx_hash else {
return Ok(());
};
if batch_info.l1_batch_number != last_l1_batch.next() {
return Ok(());
}
let action_str = l1_batch_stage_to_action_str(stage);
let happened_at = happened_at.with_context(|| {
format!("Malformed API response: batch is {action_str}, but has no relevant timestamp")
})?;
changes_to_update.push(BatchStatusChange {
number: batch_info.l1_batch_number,
l1_tx_hash,
happened_at,
});
tracing::info!("Batch {}: {action_str}", batch_info.l1_batch_number);
FETCHER_METRICS.l1_batch[&stage.into()].set(batch_info.l1_batch_number.0.into());
*last_l1_batch += 1;
Ok(())
}
}
#[derive(Debug)]
pub struct BatchStatusUpdater {
client: Box<dyn MainNodeClient>,
pool: ConnectionPool<Core>,
health_updater: HealthUpdater,
sleep_interval: Duration,
#[cfg(test)]
changes_sender: mpsc::UnboundedSender<StatusChanges>,
}
impl BatchStatusUpdater {
const DEFAULT_SLEEP_INTERVAL: Duration = Duration::from_secs(5);
pub fn new(client: Box<DynClient<L2>>, pool: ConnectionPool<Core>) -> Self {
Self::from_parts(
Box::new(client.for_component("batch_status_updater")),
pool,
Self::DEFAULT_SLEEP_INTERVAL,
)
}
fn from_parts(
client: Box<dyn MainNodeClient>,
pool: ConnectionPool<Core>,
sleep_interval: Duration,
) -> Self {
Self {
client,
pool,
health_updater: ReactiveHealthCheck::new("batch_status_updater").1,
sleep_interval,
#[cfg(test)]
changes_sender: mpsc::unbounded_channel().0,
}
}
pub fn health_check(&self) -> ReactiveHealthCheck {
self.health_updater.subscribe()
}
pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let mut storage = self.pool.connection_tagged("sync_layer").await?;
let mut cursor = UpdaterCursor::new(&mut storage).await?;
drop(storage);
tracing::info!("Initialized batch status updater cursor: {cursor:?}");
self.health_updater
.update(Health::from(HealthStatus::Ready).with_details(cursor));
while !*stop_receiver.borrow_and_update() {
let mut status_changes = StatusChanges::default();
match self.get_status_changes(&mut status_changes, cursor).await {
Ok(()) => { }
Err(UpdaterError::Web3(err)) => {
tracing::warn!("Failed to get status changes from the main node: {err}");
}
Err(UpdaterError::Internal(err)) => return Err(err),
}
if status_changes.is_empty() {
if tokio::time::timeout(self.sleep_interval, stop_receiver.changed())
.await
.is_ok()
{
break;
}
} else {
self.apply_status_changes(&mut cursor, status_changes)
.await?;
self.health_updater
.update(Health::from(HealthStatus::Ready).with_details(cursor));
}
}
tracing::info!("Stop signal received, exiting the batch status updater routine");
Ok(())
}
async fn get_status_changes(
&self,
status_changes: &mut StatusChanges,
mut cursor: UpdaterCursor,
) -> Result<(), UpdaterError> {
let total_latency = EN_METRICS.update_batch_statuses.start();
let Some(last_sealed_batch) = self
.pool
.connection_tagged("sync_layer")
.await?
.blocks_dal()
.get_sealed_l1_batch_number()
.await?
else {
return Ok(()); };
let mut batch = cursor.last_executed_l1_batch.next();
while batch <= last_sealed_batch {
let l2_block_number = self.client.resolve_l1_batch_to_l2_block(batch).await?;
let Some(l2_block_number) = l2_block_number else {
return Ok(());
};
let Some(batch_info) = self.client.block_details(l2_block_number).await? else {
let err = anyhow::anyhow!(
"Node API is inconsistent: L2 block {l2_block_number} was reported to be a part of {batch} L1 batch, \
but API has no information about this L2 block",
);
return Err(err.into());
};
cursor.update(status_changes, &batch_info)?;
if batch_info.base.commit_tx_hash.is_none() {
break;
} else if batch_info.base.prove_tx_hash.is_none()
&& batch < cursor.last_committed_l1_batch
{
batch = cursor.last_committed_l1_batch.next();
} else if batch_info.base.executed_at.is_none() && batch < cursor.last_proven_l1_batch {
batch = cursor.last_proven_l1_batch.next();
} else {
batch += 1;
}
}
total_latency.observe();
Ok(())
}
async fn apply_status_changes(
&self,
cursor: &mut UpdaterCursor,
changes: StatusChanges,
) -> anyhow::Result<()> {
let total_latency = EN_METRICS.batch_status_updater_loop_iteration.start();
let mut connection = self.pool.connection_tagged("sync_layer").await?;
let mut transaction = connection.start_transaction().await?;
let last_sealed_batch = transaction
.blocks_dal()
.get_sealed_l1_batch_number()
.await?
.context("L1 batches disappeared from Postgres")?;
for change in &changes.commit {
tracing::info!(
"Commit status change: number {}, hash {}, happened at {}",
change.number,
change.l1_tx_hash,
change.happened_at
);
anyhow::ensure!(
change.number <= last_sealed_batch,
"Incorrect update state: unknown batch marked as committed"
);
transaction
.eth_sender_dal()
.insert_bogus_confirmed_eth_tx(
change.number,
AggregatedActionType::Commit,
change.l1_tx_hash,
change.happened_at,
)
.await?;
cursor.last_committed_l1_batch = change.number;
}
for change in &changes.prove {
tracing::info!(
"Prove status change: number {}, hash {}, happened at {}",
change.number,
change.l1_tx_hash,
change.happened_at
);
anyhow::ensure!(
change.number <= cursor.last_committed_l1_batch,
"Incorrect update state: proven batch must be committed"
);
transaction
.eth_sender_dal()
.insert_bogus_confirmed_eth_tx(
change.number,
AggregatedActionType::PublishProofOnchain,
change.l1_tx_hash,
change.happened_at,
)
.await?;
cursor.last_proven_l1_batch = change.number;
}
for change in &changes.execute {
tracing::info!(
"Execute status change: number {}, hash {}, happened at {}",
change.number,
change.l1_tx_hash,
change.happened_at
);
anyhow::ensure!(
change.number <= cursor.last_proven_l1_batch,
"Incorrect update state: executed batch must be proven"
);
transaction
.eth_sender_dal()
.insert_bogus_confirmed_eth_tx(
change.number,
AggregatedActionType::Execute,
change.l1_tx_hash,
change.happened_at,
)
.await?;
cursor.last_executed_l1_batch = change.number;
}
transaction.commit().await?;
total_latency.observe();
#[cfg(test)]
self.changes_sender.send(changes).ok();
Ok(())
}
}
async fn projected_first_l1_batch(
storage: &mut Connection<'_, Core>,
) -> anyhow::Result<L1BatchNumber> {
let snapshot_recovery = storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?;
Ok(snapshot_recovery.map_or(L1BatchNumber(0), |recovery| recovery.l1_batch_number + 1))
}