1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
//! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate.
use crate::io::Dispatcher;
use anyhow::Context as _;
use network::http;
pub use network::{gossip::attestation, RpcConfig};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;
mod io;
#[cfg(test)]
mod tests;
/// Validator-related part of [`Executor`].
#[derive(Debug)]
pub struct Validator {
/// Consensus network configuration.
pub key: validator::SecretKey,
/// Store for replica state.
pub replica_store: Box<dyn ReplicaStore>,
/// Payload manager.
pub payload_manager: Box<dyn bft::PayloadManager>,
}
/// Config of the node executor.
#[derive(Debug)]
pub struct Config {
/// Label identifying the build version of the binary that this node is running.
pub build_version: Option<semver::Version>,
/// IP:port to listen on, for incoming TCP connections.
/// Use `0.0.0.0:<port>` to listen on all network interfaces (i.e. on all IPs exposed by this VM).
pub server_addr: std::net::SocketAddr,
/// Public TCP address that other nodes are expected to connect to.
/// It is announced over gossip network.
pub public_addr: net::Host,
/// Maximal size of the block payload.
pub max_payload_size: usize,
/// Maximal size of a batch, which includes `max_payload_size` per block in the batch,
/// plus the size of the Merkle proof of the commitment being included on L1.
pub max_batch_size: usize,
/// Key of this node. It uniquely identifies the node.
/// It should match the secret key provided in the `node_key` file.
pub node_key: node::SecretKey,
/// Limit on the number of inbound connections outside
/// of the `static_inbound` set.
pub gossip_dynamic_inbound_limit: usize,
/// Inbound connections that should be unconditionally accepted.
pub gossip_static_inbound: HashSet<node::PublicKey>,
/// Outbound connections that the node should actively try to
/// establish and maintain.
pub gossip_static_outbound: HashMap<node::PublicKey, net::Host>,
/// RPC rate limits config.
/// Use `RpcConfig::default()` for defaults.
pub rpc: RpcConfig,
/// Http debug page configuration.
/// If None, debug page is disabled
pub debug_page: Option<http::DebugPageConfig>,
/// How often to poll the database looking for the batch commitment.
pub batch_poll_interval: time::Duration,
}
impl Config {
/// Returns gossip network configuration.
pub(crate) fn gossip(&self) -> network::GossipConfig {
network::GossipConfig {
key: self.node_key.clone(),
dynamic_inbound_limit: self.gossip_dynamic_inbound_limit,
static_inbound: self.gossip_static_inbound.clone(),
static_outbound: self.gossip_static_outbound.clone(),
}
}
}
/// Executor allowing to spin up all actors necessary for a consensus node.
#[derive(Debug)]
pub struct Executor {
/// General-purpose executor configuration.
pub config: Config,
/// Block storage used by the node.
pub block_store: Arc<BlockStore>,
/// Batch storage used by the node.
pub batch_store: Arc<BatchStore>,
/// Validator-specific node data.
pub validator: Option<Validator>,
/// Attestation controller. Caller should actively configure the batch
/// for which the attestation votes should be collected.
pub attestation: Arc<attestation::Controller>,
}
impl Executor {
/// Extracts a network crate config.
fn network_config(&self) -> network::Config {
network::Config {
build_version: self.config.build_version.clone(),
server_addr: net::tcp::ListenerAddr::new(self.config.server_addr),
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
max_batch_size: self.config.max_batch_size.saturating_add(kB),
max_block_queue_size: 20,
tcp_accept_rate: limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(100),
},
rpc: self.config.rpc.clone(),
}
}
/// Runs this executor to completion. This should be spawned on a separate task.
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let network_config = self.network_config();
// Generate the communication pipes. We have one for each actor.
let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new();
let (network_actor_pipe, network_dispatcher_pipe) = pipe::new();
// Create the IO dispatcher.
let dispatcher = Dispatcher::new(consensus_dispatcher_pipe, network_dispatcher_pipe);
tracing::debug!("Starting actors in separate threads.");
scope::run!(ctx, |ctx, s| async {
s.spawn(async {
dispatcher.run(ctx).await;
Ok(())
});
let (net, runner) = network::Network::new(
network_config,
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
self.attestation,
);
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });
if let Some(debug_config) = self.config.debug_page {
s.spawn(async {
http::DebugPageServer::new(debug_config, net)
.run(ctx)
.await
.context("Http Server stopped")
});
}
// Run the bft actor iff this node is an active validator.
let Some(validator) = self.validator else {
tracing::info!("Running the node in non-validator mode.");
return Ok(());
};
if !self
.block_store
.genesis()
.validators
.contains(&validator.key.public())
{
tracing::warn!(
"This node is an inactive validator. It will NOT vote in consensus."
);
return Ok(());
}
bft::Config {
secret_key: validator.key.clone(),
block_store: self.block_store.clone(),
replica_store: validator.replica_store,
payload_manager: validator.payload_manager,
max_payload_size: self.config.max_payload_size,
}
.run(ctx, consensus_actor_pipe)
.await
.context("Consensus stopped")
})
.await
}
}