1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
//! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate.
use crate::io::Dispatcher;
use anyhow::Context as _;
use network::http;
pub use network::{gossip::attestation, RpcConfig};
use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;

mod io;
#[cfg(test)]
mod tests;

/// Validator-related part of [`Executor`].
#[derive(Debug)]
pub struct Validator {
    /// Consensus network configuration.
    pub key: validator::SecretKey,
    /// Store for replica state.
    pub replica_store: Box<dyn ReplicaStore>,
    /// Payload manager.
    pub payload_manager: Box<dyn bft::PayloadManager>,
}

/// Config of the node executor.
#[derive(Debug)]
pub struct Config {
    /// Label identifying the build version of the binary that this node is running.
    pub build_version: Option<semver::Version>,
    /// IP:port to listen on, for incoming TCP connections.
    /// Use `0.0.0.0:<port>` to listen on all network interfaces (i.e. on all IPs exposed by this VM).
    pub server_addr: std::net::SocketAddr,
    /// Public TCP address that other nodes are expected to connect to.
    /// It is announced over gossip network.
    pub public_addr: net::Host,
    /// Maximal size of the block payload.
    pub max_payload_size: usize,
    /// Maximal size of a batch, which includes `max_payload_size` per block in the batch,
    /// plus the size of the Merkle proof of the commitment being included on L1.
    pub max_batch_size: usize,
    /// Key of this node. It uniquely identifies the node.
    /// It should match the secret key provided in the `node_key` file.
    pub node_key: node::SecretKey,
    /// Limit on the number of inbound connections outside
    /// of the `static_inbound` set.
    pub gossip_dynamic_inbound_limit: usize,
    /// Inbound connections that should be unconditionally accepted.
    pub gossip_static_inbound: HashSet<node::PublicKey>,
    /// Outbound connections that the node should actively try to
    /// establish and maintain.
    pub gossip_static_outbound: HashMap<node::PublicKey, net::Host>,
    /// RPC rate limits config.
    /// Use `RpcConfig::default()` for defaults.
    pub rpc: RpcConfig,

    /// Http debug page configuration.
    /// If None, debug page is disabled
    pub debug_page: Option<http::DebugPageConfig>,

    /// How often to poll the database looking for the batch commitment.
    pub batch_poll_interval: time::Duration,
}

impl Config {
    /// Returns gossip network configuration.
    pub(crate) fn gossip(&self) -> network::GossipConfig {
        network::GossipConfig {
            key: self.node_key.clone(),
            dynamic_inbound_limit: self.gossip_dynamic_inbound_limit,
            static_inbound: self.gossip_static_inbound.clone(),
            static_outbound: self.gossip_static_outbound.clone(),
        }
    }
}

/// Executor allowing to spin up all actors necessary for a consensus node.
#[derive(Debug)]
pub struct Executor {
    /// General-purpose executor configuration.
    pub config: Config,
    /// Block storage used by the node.
    pub block_store: Arc<BlockStore>,
    /// Batch storage used by the node.
    pub batch_store: Arc<BatchStore>,
    /// Validator-specific node data.
    pub validator: Option<Validator>,
    /// Attestation controller. Caller should actively configure the batch
    /// for which the attestation votes should be collected.
    pub attestation: Arc<attestation::Controller>,
}

impl Executor {
    /// Extracts a network crate config.
    fn network_config(&self) -> network::Config {
        network::Config {
            build_version: self.config.build_version.clone(),
            server_addr: net::tcp::ListenerAddr::new(self.config.server_addr),
            public_addr: self.config.public_addr.clone(),
            gossip: self.config.gossip(),
            validator_key: self.validator.as_ref().map(|v| v.key.clone()),
            ping_timeout: Some(time::Duration::seconds(10)),
            max_block_size: self.config.max_payload_size.saturating_add(kB),
            max_batch_size: self.config.max_batch_size.saturating_add(kB),
            max_block_queue_size: 20,
            tcp_accept_rate: limiter::Rate {
                burst: 10,
                refresh: time::Duration::milliseconds(100),
            },
            rpc: self.config.rpc.clone(),
        }
    }

    /// Runs this executor to completion. This should be spawned on a separate task.
    pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
        let network_config = self.network_config();

        // Generate the communication pipes. We have one for each actor.
        let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new();
        let (network_actor_pipe, network_dispatcher_pipe) = pipe::new();
        // Create the IO dispatcher.
        let dispatcher = Dispatcher::new(consensus_dispatcher_pipe, network_dispatcher_pipe);

        tracing::debug!("Starting actors in separate threads.");
        scope::run!(ctx, |ctx, s| async {
            s.spawn(async {
                dispatcher.run(ctx).await;
                Ok(())
            });
            let (net, runner) = network::Network::new(
                network_config,
                self.block_store.clone(),
                self.batch_store.clone(),
                network_actor_pipe,
                self.attestation,
            );
            net.register_metrics();
            s.spawn(async { runner.run(ctx).await.context("Network stopped") });

            if let Some(debug_config) = self.config.debug_page {
                s.spawn(async {
                    http::DebugPageServer::new(debug_config, net)
                        .run(ctx)
                        .await
                        .context("Http Server stopped")
                });
            }

            // Run the bft actor iff this node is an active validator.
            let Some(validator) = self.validator else {
                tracing::info!("Running the node in non-validator mode.");
                return Ok(());
            };
            if !self
                .block_store
                .genesis()
                .validators
                .contains(&validator.key.public())
            {
                tracing::warn!(
                    "This node is an inactive validator. It will NOT vote in consensus."
                );
                return Ok(());
            }
            bft::Config {
                secret_key: validator.key.clone(),
                block_store: self.block_store.clone(),
                replica_store: validator.replica_store,
                payload_manager: validator.payload_manager,
                max_payload_size: self.config.max_payload_size,
            }
            .run(ctx, consensus_actor_pipe)
            .await
            .context("Consensus stopped")
        })
        .await
    }
}