zksync_node_consensus 0.1.0

Consensus integration for ZKsync node
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, Attester};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::{
    config,
    storage::{ConnectionPool, Store},
};

/// Task running a consensus validator for the main node.
/// Main node is currently the only leader of the consensus - i.e. it proposes all the
/// L2 blocks (generated by `Statekeeper`).
pub async fn run_main_node(
    ctx: &ctx::Ctx,
    cfg: ConsensusConfig,
    secrets: ConsensusSecrets,
    pool: ConnectionPool,
) -> anyhow::Result<()> {
    let validator_key = config::validator_key(&secrets)
        .context("validator_key")?
        .context("missing validator_key")?;

    let attester_key_opt = config::attester_key(&secrets).context("attester_key")?;

    scope::run!(&ctx, |ctx, s| async {
        if let Some(spec) = &cfg.genesis_spec {
            let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?;

            pool.connection(ctx)
                .await
                .wrap("connection()")?
                .adjust_genesis(ctx, &spec)
                .await
                .wrap("adjust_genesis()")?;
        }

        // The main node doesn't have a payload queue as it produces all the L2 blocks itself.
        let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?;
        s.spawn_bg(runner.run(ctx));

        let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone()))
            .await
            .wrap("BlockStore::new()")?;
        s.spawn_bg(runner.run(ctx));

        anyhow::ensure!(
            block_store.genesis().leader_selection
                == validator::LeaderSelectionMode::Sticky(validator_key.public()),
            "unsupported leader selection mode - main node has to be the leader"
        );

        let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone()))
            .await
            .wrap("BatchStore::new()")?;
        s.spawn_bg(runner.run(ctx));

        let executor = executor::Executor {
            config: config::executor(&cfg, &secrets)?,
            block_store,
            batch_store,
            validator: Some(executor::Validator {
                key: validator_key,
                replica_store: Box::new(store.clone()),
                payload_manager: Box::new(store.clone()),
            }),
            attester: attester_key_opt.map(|key| Attester { key }),
        };
        executor.run(ctx).await
    })
    .await
}