use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{self as storage, BatchStoreState};
use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError};
use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo;
use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState};
use zksync_state_keeper::io::common::IoCursor;
use zksync_types::{commitment::L1BatchWithMetadata, L1BatchNumber};
use super::{InsertCertificateError, PayloadQueue};
use crate::config;
#[derive(Debug, Clone)]
pub(crate) struct ConnectionPool(pub(crate) zksync_dal::ConnectionPool<Core>);
impl ConnectionPool {
pub(crate) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result<Connection<'a>> {
Ok(Connection(
ctx.wait(self.0.connection_tagged("consensus"))
.await?
.map_err(DalError::generalize)?,
))
}
pub async fn wait_for_payload(
&self,
ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<Payload> {
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50);
loop {
if let Some(payload) = self
.connection(ctx)
.await
.wrap("connection()")?
.payload(ctx, number)
.await
.with_wrap(|| format!("payload({number})"))?
{
return Ok(payload);
}
ctx.sleep(POLL_INTERVAL).await?;
}
}
}
pub(crate) struct Connection<'a>(pub(crate) zksync_dal::Connection<'a, Core>);
impl<'a> Connection<'a> {
pub async fn start_transaction<'b, 'c: 'b>(
&'c mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Connection<'b>> {
Ok(Connection(
ctx.wait(self.0.start_transaction())
.await?
.context("sqlx")?,
))
}
pub async fn commit(self, ctx: &ctx::Ctx) -> ctx::Result<()> {
Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?)
}
pub async fn payload(
&mut self,
ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<Option<Payload>> {
Ok(ctx
.wait(self.0.consensus_dal().block_payload(number))
.await?
.map_err(DalError::generalize)?)
}
pub async fn payloads(
&mut self,
ctx: &ctx::Ctx,
numbers: std::ops::Range<validator::BlockNumber>,
) -> ctx::Result<Vec<Payload>> {
Ok(ctx
.wait(self.0.consensus_dal().block_payloads(numbers))
.await?
.map_err(DalError::generalize)?)
}
pub async fn block_certificate(
&mut self,
ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<Option<validator::CommitQC>> {
Ok(ctx
.wait(self.0.consensus_dal().block_certificate(number))
.await??)
}
pub async fn insert_block_certificate(
&mut self,
ctx: &ctx::Ctx,
cert: &validator::CommitQC,
) -> Result<(), InsertCertificateError> {
Ok(ctx
.wait(self.0.consensus_dal().insert_block_certificate(cert))
.await??)
}
pub async fn insert_batch_certificate(
&mut self,
ctx: &ctx::Ctx,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
use crate::storage::consensus_dal::InsertCertificateError as E;
let l1_batch_number = L1BatchNumber(cert.message.number.0 as u32);
let Some(l1_batch) = self
.0
.blocks_dal()
.get_l1_batch_metadata(l1_batch_number)
.await
.map_err(E::Dal)?
else {
return Err(E::MissingPayload.into());
};
let l1_batch_info = StoredBatchInfo::from(&l1_batch);
if l1_batch_info.hash().0 != *cert.message.hash.0.as_bytes() {
return Err(E::PayloadMismatch.into());
}
Ok(ctx
.wait(self.0.consensus_dal().insert_batch_certificate(cert))
.await??)
}
pub async fn replica_state(&mut self, ctx: &ctx::Ctx) -> ctx::Result<storage::ReplicaState> {
Ok(ctx
.wait(self.0.consensus_dal().replica_state())
.await?
.map_err(DalError::generalize)?)
}
pub async fn set_replica_state(
&mut self,
ctx: &ctx::Ctx,
state: &storage::ReplicaState,
) -> ctx::Result<()> {
Ok(ctx
.wait(self.0.consensus_dal().set_replica_state(state))
.await?
.context("sqlx")?)
}
pub async fn batch(
&mut self,
ctx: &ctx::Ctx,
number: L1BatchNumber,
) -> ctx::Result<Option<L1BatchWithMetadata>> {
Ok(ctx
.wait(self.0.blocks_dal().get_l1_batch_metadata(number))
.await?
.context("get_l1_batch_metadata()")?)
}
pub async fn new_payload_queue(
&mut self,
ctx: &ctx::Ctx,
actions: ActionQueueSender,
sync_state: SyncState,
) -> ctx::Result<PayloadQueue> {
Ok(PayloadQueue {
inner: ctx.wait(IoCursor::for_fetcher(&mut self.0)).await??,
actions,
sync_state,
})
}
pub async fn genesis(&mut self, ctx: &ctx::Ctx) -> ctx::Result<Option<validator::Genesis>> {
Ok(ctx
.wait(self.0.consensus_dal().genesis())
.await?
.map_err(DalError::generalize)?)
}
pub async fn try_update_genesis(
&mut self,
ctx: &ctx::Ctx,
genesis: &validator::Genesis,
) -> ctx::Result<()> {
Ok(ctx
.wait(self.0.consensus_dal().try_update_genesis(genesis))
.await??)
}
async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(ctx.wait(self.0.consensus_dal().next_block()).await??)
}
pub(crate) async fn block_certificates_range(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<storage::BlockStoreState> {
Ok(ctx
.wait(self.0.consensus_dal().block_certificates_range())
.await??)
}
pub(crate) async fn adjust_genesis(
&mut self,
ctx: &ctx::Ctx,
spec: &config::GenesisSpec,
) -> ctx::Result<()> {
let mut txn = self
.start_transaction(ctx)
.await
.wrap("start_transaction()")?;
let old = txn.genesis(ctx).await.wrap("genesis()")?;
if let Some(old) = &old {
if &config::GenesisSpec::from_genesis(old) == spec {
return Ok(());
}
}
tracing::info!("Performing a hard fork of consensus.");
let genesis = validator::GenesisRaw {
chain_id: spec.chain_id,
fork_number: old
.as_ref()
.map_or(validator::ForkNumber(0), |old| old.fork_number.next()),
first_block: txn.next_block(ctx).await.context("next_block()")?,
protocol_version: spec.protocol_version,
validators: spec.validators.clone(),
attesters: spec.attesters.clone(),
leader_selection: spec.leader_selection.clone(),
}
.with_hash();
txn.try_update_genesis(ctx, &genesis)
.await
.wrap("try_update_genesis()")?;
txn.commit(ctx).await.wrap("commit()")?;
Ok(())
}
pub(crate) async fn block(
&mut self,
ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<Option<validator::FinalBlock>> {
let Some(justification) = self
.block_certificate(ctx, number)
.await
.wrap("block_certificate()")?
else {
return Ok(None);
};
let payload = self
.payload(ctx, number)
.await
.wrap("payload()")?
.context("L2 block disappeared from storage")?;
Ok(Some(validator::FinalBlock {
payload: payload.encode(),
justification,
}))
}
pub async fn get_last_batch_number(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
Ok(ctx
.wait(self.0.blocks_dal().get_sealed_l1_batch_number())
.await?
.context("get_sealed_l1_batch_number()")?
.map(|nr| attester::BatchNumber(nr.0 as u64)))
}
pub async fn get_last_batch_certificate_number(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
Ok(ctx
.wait(self.0.consensus_dal().get_last_batch_certificate_number())
.await?
.context("get_last_batch_certificate_number()")?)
}
pub async fn batch_certificate(
&mut self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::BatchQC>> {
Ok(ctx
.wait(self.0.consensus_dal().batch_certificate(number))
.await?
.context("batch_certificate()")?)
}
pub async fn get_l2_block_range_of_l1_batch(
&mut self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<(validator::BlockNumber, validator::BlockNumber)>> {
let number = L1BatchNumber(number.0.try_into().context("number")?);
let range = ctx
.wait(self.0.blocks_dal().get_l2_block_range_of_l1_batch(number))
.await?
.context("get_l2_block_range_of_l1_batch()")?;
Ok(range.map(|(min, max)| {
let min = validator::BlockNumber(min.0 as u64);
let max = validator::BlockNumber(max.0 as u64);
(min, max)
}))
}
pub async fn get_batch(
&mut self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::SyncBatch>> {
let Some((min, max)) = self
.get_l2_block_range_of_l1_batch(ctx, number)
.await
.context("get_l2_block_range_of_l1_batch()")?
else {
return Ok(None);
};
let payloads = self.payloads(ctx, min..max).await.wrap("payloads()")?;
let payloads = payloads.into_iter().map(|p| p.encode()).collect();
let batch = attester::SyncBatch {
number,
payloads,
proof: Vec::new(),
};
Ok(Some(batch))
}
pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result<storage::BatchStoreState> {
let first = self
.0
.blocks_dal()
.get_earliest_l1_batch_number()
.await
.context("get_earliest_l1_batch_number()")?;
let first = if first.is_some() {
first
} else {
self.0
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await
.context("get_earliest_l1_batch_number()")?
.map(|s| s.l1_batch_number)
};
let last = self
.get_last_batch_number(ctx)
.await
.context("get_last_batch_number()")?;
let last = if let Some(last) = last {
Some(
self.get_batch(ctx, last)
.await
.context("get_batch()")?
.context("last batch not available")?,
)
} else {
None
};
Ok(BatchStoreState {
first: first
.map(|n| attester::BatchNumber(n.0 as u64))
.unwrap_or(attester::BatchNumber(0)),
last,
})
}
}