#![warn(
missing_debug_implementations,
missing_docs,
unsafe_code,
bare_trait_objects
)]
#![warn(clippy::pedantic, clippy::nursery)]
#![allow(
clippy::cast_possible_wrap, clippy::cast_possible_truncation, clippy::cast_sign_loss,
clippy::module_name_repetitions, clippy::similar_names, clippy::must_use_candidate,
clippy::pub_enum_variant_names,
clippy::indexing_slicing,
clippy::missing_errors_doc, clippy::missing_const_for_fn
)]
pub use crate::{
connect_list::{ConnectInfo, ConnectListConfig},
plugin::{NodePlugin, PluginApiContext, SharedNodeState},
};
use actix_rt::System;
use anyhow::{ensure, format_err};
use exonum::{
blockchain::{
config::GenesisConfig, ApiSender, Blockchain, BlockchainBuilder, BlockchainMut,
ConsensusConfig, Schema, SendError,
},
crypto::{self, Hash, PublicKey},
helpers::{user_agent, Height, Milliseconds, Round, ValidateInput, ValidatorId},
keys::Keys,
merkledb::{Database, ObjectHash},
messages::{AnyTx, IntoMessage, SignedMessage, Verified},
runtime::RuntimeInstance,
};
use exonum_api::{
AllowOrigin, ApiAccess, ApiAggregator, ApiManager, ApiManagerConfig, UpdateEndpoints,
WebServerConfig,
};
use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use log::{info, trace};
use serde_derive::{Deserialize, Serialize};
use tokio::time::delay_for;
use std::{
collections::{HashMap, HashSet},
convert::TryFrom,
fmt, io,
net::SocketAddr,
sync::Arc,
thread,
time::{Duration, SystemTime},
};
use crate::{
connect_list::ConnectList,
events::{
noise::HandshakeParams, HandlerPart, InternalEvent, InternalPart, InternalRequest,
NetworkEvent, NetworkPart, NetworkRequest, SyncSender, TimeoutRequest,
},
messages::Connect,
proposer::{ProposeBlock, StandardProposer},
schema::NodeSchema,
state::{RequestData, State},
};
mod basic;
mod connect_list;
mod consensus;
mod events;
mod events_impl;
pub mod helpers;
mod messages;
mod plugin;
pub mod proposer;
mod proto;
mod requests;
#[cfg(test)]
mod sandbox;
mod schema;
mod state;
#[doc(hidden)]
pub mod _bench_types {
pub use crate::{
events::{
Event, EventHandler, EventOutcome, HandlerPart, InternalPart, InternalRequest,
NetworkEvent,
},
messages::Message as PeerMessage,
};
}
#[derive(Debug)]
#[non_exhaustive]
pub enum ExternalMessage {
PeerAdd(ConnectInfo),
Enable(bool),
Shutdown,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum NodeTimeout {
Status(Height),
Round(Height, Round),
Request(RequestData, Option<PublicKey>),
Propose(Height, Round),
UpdateApiState,
PeerExchange,
FlushPool,
}
pub(crate) trait SystemStateProvider: fmt::Debug + Send + 'static {
fn listen_address(&self) -> SocketAddr;
fn current_time(&self) -> SystemTime;
}
pub(crate) struct NodeHandler {
pub api_state: SharedNodeState,
pub blockchain: BlockchainMut,
plugins: Vec<Box<dyn NodePlugin>>,
state: State,
system_state: Box<dyn SystemStateProvider>,
channel: NodeSender,
peer_discovery: Vec<String>,
is_enabled: bool,
node_role: NodeRole,
config_manager: Option<Box<dyn ConfigManager>>,
allow_expedited_propose: bool,
block_proposer: Box<dyn ProposeBlock>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct NodeApiConfig {
pub state_update_timeout: usize,
pub public_api_address: Option<SocketAddr>,
pub private_api_address: Option<SocketAddr>,
pub public_allow_origin: Option<AllowOrigin>,
pub private_allow_origin: Option<AllowOrigin>,
#[serde(default)]
pub server_restart: ServerRestartPolicy,
}
impl Default for NodeApiConfig {
fn default() -> Self {
Self {
state_update_timeout: 10_000,
public_api_address: None,
private_api_address: None,
public_allow_origin: None,
private_allow_origin: None,
server_restart: ServerRestartPolicy::default(),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct ServerRestartPolicy {
pub max_retries: u16,
pub retry_timeout: u64,
}
impl Default for ServerRestartPolicy {
fn default() -> Self {
Self {
max_retries: 20,
retry_timeout: 500,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct NetworkConfiguration {
pub max_incoming_connections: usize,
pub max_outgoing_connections: usize,
pub tcp_nodelay: bool,
pub tcp_keep_alive: Option<u64>,
pub tcp_connect_retry_timeout: Milliseconds,
pub tcp_connect_max_retries: u64,
}
impl Default for NetworkConfiguration {
fn default() -> Self {
Self {
max_incoming_connections: 128,
max_outgoing_connections: 128,
tcp_keep_alive: None,
tcp_nodelay: true,
tcp_connect_retry_timeout: 15_000,
tcp_connect_max_retries: 10,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct EventsPoolCapacity {
network_requests_capacity: usize,
network_events_capacity: usize,
internal_events_capacity: usize,
api_requests_capacity: usize,
}
impl Default for EventsPoolCapacity {
fn default() -> Self {
Self {
network_requests_capacity: 512,
network_events_capacity: 512,
internal_events_capacity: 128,
api_requests_capacity: 1024,
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct MemoryPoolConfig {
pub events_pool_capacity: EventsPoolCapacity,
#[serde(default)]
pub flush_pool_strategy: FlushPoolStrategy,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum FlushPoolStrategy {
Never,
Timeout {
timeout: Milliseconds,
},
Immediate,
}
impl Default for FlushPoolStrategy {
fn default() -> Self {
Self::Timeout { timeout: 20 }
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct NodeConfig {
pub consensus: ConsensusConfig,
pub listen_address: SocketAddr,
pub external_address: String,
pub network: NetworkConfiguration,
pub api: NodeApiConfig,
pub mempool: MemoryPoolConfig,
pub connect_list: ConnectListConfig,
pub thread_pool_size: Option<u8>,
}
impl ValidateInput for NodeConfig {
type Error = anyhow::Error;
fn validate(&self) -> Result<(), Self::Error> {
let capacity = &self.mempool.events_pool_capacity;
ensure!(
capacity.internal_events_capacity > 3,
"internal_events_capacity({}) must be strictly larger than 2",
capacity.internal_events_capacity
);
ensure!(
capacity.network_requests_capacity > 0,
"network_requests_capacity({}) must be strictly larger than 0",
capacity.network_requests_capacity
);
let restart_policy = &self.api.server_restart;
ensure!(
restart_policy.max_retries > 0,
"`server_restart.max_retries` must be strictly larger than 0"
);
ensure!(
restart_policy.retry_timeout > 0,
"`server_restart.retry_timeout` must be strictly larger than 0"
);
let sanity_max = 2_usize.pow(16);
ensure!(
capacity.internal_events_capacity < sanity_max,
"internal_events_capacity({}) must be smaller than {}",
capacity.internal_events_capacity,
sanity_max,
);
ensure!(
capacity.network_requests_capacity < sanity_max,
"network_requests_capacity({}) must be smaller than {}",
capacity.network_requests_capacity,
sanity_max,
);
self.consensus.validate()
}
}
#[derive(Debug, Clone)]
pub(crate) struct Configuration {
pub connect_list: ConnectList,
pub network: NetworkConfiguration,
pub peer_discovery: Vec<String>,
pub mempool: MemoryPoolConfig,
pub keys: Keys,
}
#[derive(Debug)]
pub(crate) struct NodeSender {
pub internal_requests: SyncSender<InternalRequest>,
pub network_requests: SyncSender<NetworkRequest>,
pub transactions: SyncSender<Verified<AnyTx>>,
pub api_requests: SyncSender<ExternalMessage>,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum NodeRole {
Validator(ValidatorId),
Auditor,
}
impl Default for NodeRole {
fn default() -> Self {
Self::Auditor
}
}
impl NodeRole {
pub fn new(validator_id: Option<ValidatorId>) -> Self {
match validator_id {
Some(validator_id) => Self::Validator(validator_id),
None => Self::Auditor,
}
}
pub fn is_validator(self) -> bool {
match self {
Self::Validator(_) => true,
_ => false,
}
}
}
impl NodeHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
blockchain: BlockchainMut,
external_address: &str,
sender: NodeSender,
system_state: Box<dyn SystemStateProvider>,
config: Configuration,
api_state: SharedNodeState,
config_manager: Option<Box<dyn ConfigManager>>,
block_proposer: Box<dyn ProposeBlock>,
) -> Self {
let snapshot = blockchain.snapshot();
let schema = Schema::new(&snapshot);
let last_block = schema.last_block();
let last_block_skip = schema.block_skip();
let consensus_config = schema.consensus_config();
info!("Creating a node with config: {:#?}", consensus_config);
let connect = Connect::new(
external_address,
system_state.current_time().into(),
&user_agent(),
);
let peers = NodeSchema::new(&blockchain.snapshot())
.peers_cache()
.iter()
.collect();
let peer_discovery = config.peer_discovery.clone();
let state = State::new(
config,
consensus_config,
connect,
peers,
&last_block,
last_block_skip.as_ref(),
system_state.current_time(),
);
let validator_id = state.validator_id();
let node_role = NodeRole::new(validator_id);
let is_enabled = api_state.is_enabled();
api_state.set_node_role(node_role);
Self {
blockchain,
api_state,
plugins: vec![],
system_state,
state,
channel: sender,
peer_discovery,
is_enabled,
node_role,
config_manager,
allow_expedited_propose: true,
block_proposer,
}
}
fn sign_message<T>(&self, message: T) -> Verified<T>
where
T: TryFrom<SignedMessage> + IntoMessage,
{
Verified::from_value(
message,
self.state.keys().consensus_pk(),
self.state.keys().consensus_sk(),
)
}
fn api_state(&self) -> &SharedNodeState {
&self.api_state
}
fn first_round_timeout(&self) -> Milliseconds {
self.state().consensus_config().first_round_timeout
}
fn round_timeout_increase(&self) -> Milliseconds {
(self.state().consensus_config().first_round_timeout
* ConsensusConfig::TIMEOUT_LINEAR_INCREASE_PERCENT)
/ 100
}
fn status_timeout(&self) -> Milliseconds {
self.state().consensus_config().status_timeout
}
fn peers_timeout(&self) -> Milliseconds {
self.state().consensus_config().peers_timeout
}
fn min_propose_timeout(&self) -> Milliseconds {
self.state().consensus_config().min_propose_timeout
}
fn max_propose_timeout(&self) -> Milliseconds {
self.state().consensus_config().max_propose_timeout
}
fn propose_timeout_threshold(&self) -> u32 {
self.state().consensus_config().propose_timeout_threshold
}
pub(crate) fn state(&self) -> &State {
&self.state
}
#[cfg(test)]
pub(crate) fn state_mut(&mut self) -> &mut State {
&mut self.state
}
pub fn initialize(&mut self) {
let listen_address = self.system_state.listen_address();
info!("Start listening address={}", listen_address);
let peers: HashSet<_> = {
let it = self.state.peers().values().map(Verified::author);
let it = it.chain(
self.state()
.connect_list()
.peers()
.into_iter()
.map(|i| i.public_key),
);
let it = it.filter(|address| address != &self.state.our_connect_message().author());
it.collect()
};
for key in peers {
self.connect(key);
info!("Trying to connect with peer {}", key);
}
self.state
.set_epoch_start_time(self.system_state.current_time());
let snapshot = self.blockchain.snapshot();
let schema = NodeSchema::new(&snapshot);
let round = schema.consensus_round();
self.state.jump_round(round);
info!("Jump to round {}", round);
self.add_timeouts();
if self.state.is_leader() && round == Round(1) {
self.add_propose_timeout();
}
let messages = schema.consensus_messages_cache();
for msg in messages.iter() {
self.handle_message(msg);
}
}
fn add_timeouts(&mut self) {
self.add_round_timeout();
self.add_status_timeout();
self.add_peer_exchange_timeout();
self.add_update_api_state_timeout();
self.maybe_add_flush_pool_timeout();
}
fn send_to_peer<T: Into<SignedMessage>>(&mut self, public_key: PublicKey, message: T) {
let message = message.into();
let request = NetworkRequest::SendMessage(public_key, message);
self.channel.network_requests.send(request);
}
fn broadcast<M: Into<SignedMessage>>(&mut self, message: M) {
let peers: Vec<PublicKey> = self
.state
.peers()
.iter()
.filter_map(|(pubkey, _)| {
if self.state.connect_list().is_peer_allowed(pubkey) {
Some(*pubkey)
} else {
None
}
})
.collect();
let message = message.into();
for address in peers {
self.send_to_peer(address, message.clone());
}
}
fn connect(&mut self, key: PublicKey) {
let connect = self.state.our_connect_message().clone();
self.send_to_peer(key, connect);
}
fn add_timeout(&mut self, timeout: NodeTimeout, time: SystemTime) {
let request = TimeoutRequest(time, timeout);
self.channel.internal_requests.send(request.into());
}
fn request(&mut self, data: RequestData, peer: PublicKey) {
let is_new = self.state.request(data.clone(), peer);
if is_new {
self.add_request_timeout(data, None);
}
}
fn add_round_timeout(&mut self) {
let time = self.round_start_time(self.state.round().next());
trace!(
"ADD ROUND TIMEOUT: time={:?}, height={}, round={}",
time,
self.state.epoch(),
self.state.round()
);
let timeout = NodeTimeout::Round(self.state.epoch(), self.state.round());
self.add_timeout(timeout, time);
}
fn add_propose_timeout(&mut self) {
let timeout = if self.need_faster_propose() {
self.min_propose_timeout()
} else {
self.max_propose_timeout()
};
let time = self.round_start_time(self.state.round()) + Duration::from_millis(timeout);
trace!(
"ADD PROPOSE TIMEOUT: time={:?}, height={}, round={}",
time,
self.state.epoch(),
self.state.round()
);
let timeout = NodeTimeout::Propose(self.state.epoch(), self.state.round());
self.add_timeout(timeout, time);
}
fn maybe_add_propose_timeout(&mut self) {
if self.allow_expedited_propose && self.need_faster_propose() {
info!("Add expedited propose timeout");
self.add_propose_timeout();
self.allow_expedited_propose = false;
}
}
fn need_faster_propose(&self) -> bool {
let snapshot = self.blockchain.snapshot();
let pending_tx_count =
Schema::new(&snapshot).transactions_pool_len() + self.state.tx_cache_len() as u64;
pending_tx_count >= u64::from(self.propose_timeout_threshold())
}
fn add_status_timeout(&mut self) {
let time = self.system_state.current_time() + Duration::from_millis(self.status_timeout());
let height = self.state.epoch();
self.add_timeout(NodeTimeout::Status(height), time);
}
fn add_request_timeout(&mut self, data: RequestData, peer: Option<PublicKey>) {
trace!("ADD REQUEST TIMEOUT");
let time = self.system_state.current_time() + data.timeout();
self.add_timeout(NodeTimeout::Request(data, peer), time);
}
fn add_peer_exchange_timeout(&mut self) {
trace!("ADD PEER EXCHANGE TIMEOUT");
let time = self.system_state.current_time() + Duration::from_millis(self.peers_timeout());
self.add_timeout(NodeTimeout::PeerExchange, time);
}
fn add_update_api_state_timeout(&mut self) {
let time = self.system_state.current_time()
+ Duration::from_millis(self.api_state().state_update_timeout());
self.add_timeout(NodeTimeout::UpdateApiState, time);
}
fn maybe_add_flush_pool_timeout(&mut self) {
if let Some(timeout) = self.state().flush_pool_timeout() {
let time = self.system_state.current_time() + timeout;
self.add_timeout(NodeTimeout::FlushPool, time);
}
}
fn last_block_hash(&self) -> Hash {
self.blockchain.as_ref().last_block().object_hash()
}
fn uncommitted_txs_count(&self) -> u64 {
self.blockchain.as_ref().pool_size() + self.state.tx_cache_len() as u64
}
fn round_start_time(&self, round: Round) -> SystemTime {
let previous_round: u64 = round.previous().into();
let ms = previous_round * self.first_round_timeout()
+ (previous_round * previous_round.saturating_sub(1)) / 2
* self.round_timeout_increase();
self.state.epoch_start_time() + Duration::from_millis(ms)
}
}
impl fmt::Debug for NodeHandler {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("NodeHandler")
.field("channel", &self.channel)
.field("blockchain", &self.blockchain)
.field("peer_discovery", &self.peer_discovery)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct ShutdownHandle {
inner: ApiSender<ExternalMessage>,
}
impl ShutdownHandle {
pub async fn shutdown(mut self) -> Result<(), SendError> {
self.inner.send_message(ExternalMessage::Shutdown).await
}
}
#[derive(Debug)]
struct DefaultSystemState(SocketAddr);
impl SystemStateProvider for DefaultSystemState {
fn listen_address(&self) -> SocketAddr {
self.0
}
fn current_time(&self) -> SystemTime {
SystemTime::now()
}
}
#[derive(Debug)]
pub struct NodeChannel {
pub(crate) network_requests: (mpsc::Sender<NetworkRequest>, mpsc::Receiver<NetworkRequest>),
#[doc(hidden)]
pub internal_requests: (
mpsc::Sender<InternalRequest>,
mpsc::Receiver<InternalRequest>,
),
endpoints: (
mpsc::Sender<UpdateEndpoints>,
mpsc::Receiver<UpdateEndpoints>,
),
#[doc(hidden)]
pub transactions: (
mpsc::Sender<Verified<AnyTx>>,
mpsc::Receiver<Verified<AnyTx>>,
),
#[doc(hidden)]
pub api_requests: (
mpsc::Sender<ExternalMessage>,
mpsc::Receiver<ExternalMessage>,
),
#[doc(hidden)]
pub network_events: (mpsc::Sender<NetworkEvent>, mpsc::Receiver<NetworkEvent>),
#[doc(hidden)]
pub internal_events: (mpsc::Sender<InternalEvent>, mpsc::Receiver<InternalEvent>),
}
pub trait ConfigManager: Send {
fn store_connect_list(&mut self, connect_list: ConnectListConfig);
}
#[derive(Debug)]
pub struct Node {
api_manager_config: ApiManagerConfig,
api_options: NodeApiConfig,
network_config: NetworkConfiguration,
handler: NodeHandler,
channel: NodeChannel,
max_message_len: u32,
thread_pool_size: Option<u8>,
disable_signals: bool,
}
impl Default for NodeChannel {
fn default() -> Self {
Self::new(&EventsPoolCapacity::default())
}
}
impl NodeChannel {
pub fn new(buffer_sizes: &EventsPoolCapacity) -> Self {
Self {
network_requests: mpsc::channel(buffer_sizes.network_requests_capacity),
internal_requests: mpsc::channel(buffer_sizes.internal_events_capacity),
endpoints: mpsc::channel(buffer_sizes.internal_events_capacity),
transactions: mpsc::channel(buffer_sizes.api_requests_capacity),
api_requests: mpsc::channel(buffer_sizes.api_requests_capacity),
network_events: mpsc::channel(buffer_sizes.network_events_capacity),
internal_events: mpsc::channel(buffer_sizes.internal_events_capacity),
}
}
pub fn api_sender(&self) -> ApiSender {
ApiSender::new(self.transactions.0.clone())
}
pub fn endpoints_sender(&self) -> mpsc::Sender<UpdateEndpoints> {
self.endpoints.0.clone()
}
fn node_sender(&self) -> NodeSender {
NodeSender {
internal_requests: SyncSender::new(self.internal_requests.0.clone()),
network_requests: SyncSender::new(self.network_requests.0.clone()),
transactions: SyncSender::new(self.transactions.0.clone()),
api_requests: SyncSender::new(self.api_requests.0.clone()),
}
}
}
pub struct NodeBuilder {
channel: NodeChannel,
blockchain_builder: BlockchainBuilder,
node_config: NodeConfig,
node_keys: Keys,
config_manager: Option<Box<dyn ConfigManager>>,
block_proposer: Box<dyn ProposeBlock>,
plugins: Vec<Box<dyn NodePlugin>>,
disable_signals: bool,
}
impl fmt::Debug for NodeBuilder {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("NodeBuilder")
.field("channel", &self.channel)
.field("blockchain_builder", &self.blockchain_builder)
.field("node_config", &self.node_config)
.finish()
}
}
impl NodeBuilder {
pub fn new(
database: impl Into<Arc<dyn Database>>,
node_config: NodeConfig,
node_keys: Keys,
) -> Self {
node_config
.validate()
.expect("Node configuration is inconsistent");
let channel = NodeChannel::new(&node_config.mempool.events_pool_capacity);
let blockchain = Blockchain::new(database, node_keys.service.clone(), channel.api_sender());
let blockchain_builder = BlockchainBuilder::new(blockchain);
Self {
channel,
blockchain_builder,
node_config,
node_keys,
config_manager: None,
plugins: vec![],
block_proposer: Box::new(StandardProposer),
disable_signals: false,
}
}
pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
self.blockchain_builder = self.blockchain_builder.with_genesis_config(genesis_config);
self
}
pub fn with_runtime<T>(mut self, runtime: T) -> Self
where
T: Into<RuntimeInstance>,
{
self.blockchain_builder = self.blockchain_builder.with_runtime(runtime);
self
}
pub fn with_runtime_fn<T, F>(mut self, runtime_fn: F) -> Self
where
T: Into<RuntimeInstance>,
F: FnOnce(&NodeChannel) -> T,
{
let runtime = runtime_fn(&self.channel);
self.blockchain_builder = self.blockchain_builder.with_runtime(runtime);
self
}
pub fn with_config_manager<T: ConfigManager + 'static>(mut self, manager: T) -> Self {
self.config_manager = Some(Box::new(manager));
self
}
pub fn with_block_proposer<T: ProposeBlock + 'static>(mut self, proposer: T) -> Self {
self.block_proposer = Box::new(proposer);
self
}
pub fn with_plugin<T: NodePlugin + 'static>(mut self, plugin: T) -> Self {
self.plugins.push(Box::new(plugin));
self
}
pub fn disable_signals(mut self) -> Self {
self.disable_signals = true;
self
}
pub fn build(self) -> Node {
let blockchain = self.blockchain_builder.build();
let mut node = Node::with_blockchain(
blockchain,
self.channel,
self.node_config,
self.node_keys,
self.config_manager,
self.plugins,
self.block_proposer,
);
node.disable_signals = self.disable_signals;
node
}
}
impl Node {
fn with_blockchain(
blockchain: BlockchainMut,
channel: NodeChannel,
node_cfg: NodeConfig,
node_keys: Keys,
config_manager: Option<Box<dyn ConfigManager>>,
plugins: Vec<Box<dyn NodePlugin>>,
block_proposer: Box<dyn ProposeBlock>,
) -> Self {
crypto::init();
let peers = node_cfg.connect_list.addresses();
let config = Configuration {
connect_list: ConnectList::from_config(node_cfg.connect_list),
mempool: node_cfg.mempool,
network: node_cfg.network,
peer_discovery: peers,
keys: node_keys,
};
let api_state = SharedNodeState::new(node_cfg.api.state_update_timeout as u64);
let mut api_aggregator = ApiAggregator::new();
let plugin_api_context = PluginApiContext::new(
blockchain.as_ref(),
&api_state,
ApiSender::new(channel.api_requests.0.clone()),
);
for plugin in &plugins {
let endpoints = plugin.wire_api(plugin_api_context.clone());
api_aggregator.extend(endpoints);
}
let system_state = Box::new(DefaultSystemState(node_cfg.listen_address));
let network_config = config.network;
let api_cfg = node_cfg.api.clone();
let mut servers = HashMap::new();
if let Some(listen_address) = api_cfg.public_api_address {
let mut server_config = WebServerConfig::new(listen_address);
server_config.allow_origin = api_cfg.public_allow_origin.clone();
servers.insert(ApiAccess::Public, server_config);
}
if let Some(listen_address) = api_cfg.private_api_address {
let mut server_config = WebServerConfig::new(listen_address);
server_config.allow_origin = api_cfg.private_allow_origin.clone();
servers.insert(ApiAccess::Private, server_config);
}
let restart_policy = node_cfg.api.server_restart;
let api_runtime_config = ApiManagerConfig::new(servers, api_aggregator)
.with_retries(restart_policy.retry_timeout, restart_policy.max_retries);
let mut handler = NodeHandler::new(
blockchain,
&node_cfg.external_address,
channel.node_sender(),
system_state,
config,
api_state,
config_manager,
block_proposer,
);
handler.plugins = plugins;
Self {
api_options: api_cfg,
handler,
channel,
network_config,
max_message_len: node_cfg.consensus.max_message_len,
thread_pool_size: node_cfg.thread_pool_size,
api_manager_config: api_runtime_config,
disable_signals: false,
}
}
async fn run_handler(mut self, handshake_params: HandshakeParams) -> anyhow::Result<()> {
const STOP_TIMEOUT: Duration = Duration::from_millis(50);
self.handler.initialize();
let res = Reactor::new(self).run(handshake_params).await;
delay_for(STOP_TIMEOUT).await;
res
}
pub async fn run(self) -> anyhow::Result<()> {
trace!("Running node.");
let handshake_params = HandshakeParams::new(
&self.state().keys().consensus,
self.state().connect_list(),
self.state().our_connect_message().clone(),
self.max_message_len,
);
self.run_handler(handshake_params).await
}
fn state(&self) -> &State {
self.handler.state()
}
pub fn blockchain(&self) -> &Blockchain {
self.handler.blockchain.as_ref()
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
ShutdownHandle {
inner: ApiSender::new(self.channel.api_requests.0.clone()),
}
}
}
struct Reactor {
handler_part: HandlerPart<NodeHandler>,
network_part: NetworkPart,
internal_part: InternalPart,
api_part: oneshot::Receiver<io::Result<()>>,
shutdown_handle: ShutdownHandle,
needs_signal_handler: bool,
}
impl Reactor {
fn new(node: Node) -> Self {
let connect_message = node.state().our_connect_message().clone();
let connect_list = node.state().connect_list();
let shutdown_handle = node.shutdown_handle();
let mut api_config = node.api_manager_config;
api_config.disable_signals = node.disable_signals;
let needs_signal_handler = !node.disable_signals && api_config.servers.is_empty();
let api_manager = ApiManager::new(api_config);
let endpoints = node.channel.endpoints.1;
let api_task = api_manager.run(endpoints);
let (api_part_tx, api_part) = oneshot::channel();
thread::spawn(|| {
let res = System::new("exonum-node").block_on(api_task);
if let Err(ref err) = res {
log::error!("Error in actix thread: {}", err);
}
api_part_tx.send(res).ok();
});
let (network_tx, network_rx) = node.channel.network_events;
let internal_requests_rx = node.channel.internal_requests.1;
let network_part = NetworkPart {
our_connect_message: connect_message,
listen_address: node.handler.system_state.listen_address(),
network_requests: node.channel.network_requests.1,
network_tx,
network_config: node.network_config,
max_message_len: node.max_message_len,
connect_list,
};
let (internal_tx, internal_rx) = node.channel.internal_events;
let handler_part = HandlerPart {
handler: node.handler,
internal_rx,
network_rx,
transactions_rx: node.channel.transactions.1,
api_rx: node.channel.api_requests.1,
};
let internal_part = InternalPart {
internal_tx,
internal_requests_rx,
};
Self {
handler_part,
network_part,
internal_part,
api_part,
shutdown_handle,
needs_signal_handler,
}
}
#[cfg(unix)]
#[allow(clippy::mut_mut)]
async fn listen_to_signals() {
use futures::StreamExt;
use tokio::signal::unix::{signal, SignalKind};
let int_listener = tokio::signal::ctrl_c().fuse();
futures::pin_mut!(int_listener);
let mut term_listener = signal(SignalKind::terminate())
.map_or_else(|_| stream::pending().right_stream(), StreamExt::left_stream)
.fuse();
let mut quit_listener = signal(SignalKind::quit())
.map_or_else(|_| stream::pending().right_stream(), StreamExt::left_stream)
.fuse();
if let Ok(mut hangup_listener) = signal(SignalKind::hangup()) {
tokio::spawn(async move {
while let Some(()) = hangup_listener.next().await {
log::info!("Received SIGHUP; ignoring");
}
});
}
futures::select! {
_ = int_listener => (),
_ = term_listener.next() => (),
_ = quit_listener.next() => (),
}
}
#[cfg(not(unix))]
async fn listen_to_signals() {
tokio::signal::ctrl_c().await.ok();
}
#[allow(clippy::mut_mut)]
async fn run(self, handshake_params: HandshakeParams) -> anyhow::Result<()> {
let internal_task = self.internal_part.run().fuse();
futures::pin_mut!(internal_task);
let network_task = self.network_part.run(handshake_params).fuse();
futures::pin_mut!(network_task);
let handler_task = self.handler_part.run().fuse();
futures::pin_mut!(handler_task);
let mut api_task = self.api_part.fuse();
if self.needs_signal_handler {
let shutdown_handle = self.shutdown_handle.clone();
let signal_rx = Self::listen_to_signals();
tokio::spawn(async {
signal_rx.await;
shutdown_handle.shutdown().await.ok();
});
};
let (res, should_clean_up) = futures::select! {
() = internal_task => (Ok(()), true),
() = network_task => (Ok(()), true),
() = handler_task => (Ok(()), false),
res = api_task => {
let res = match res {
Err(_) => Err(format_err!("Actix thread panicked")),
Ok(res) => res.map_err(From::from),
};
(res, true)
}
};
if should_clean_up {
self.shutdown_handle.shutdown().await.ok();
handler_task.await;
}
log::info!("Node terminated with status {:?}", res);
res
}
}
#[doc(hidden)]
pub fn generate_testnet_config(count: u16, start_port: u16) -> Vec<(NodeConfig, Keys)> {
use exonum::blockchain::ValidatorKeys;
let keys: Vec<_> = (0..count as usize).map(|_| Keys::random()).collect();
let validator_keys = keys
.iter()
.map(|keys| ValidatorKeys::new(keys.consensus_pk(), keys.service_pk()))
.collect();
let consensus = ConsensusConfig::default().with_validator_keys(validator_keys);
let peers = (0..keys.len())
.map(|x| format!("127.0.0.1:{}", start_port + x as u16))
.collect::<Vec<_>>();
keys.into_iter()
.enumerate()
.map(|(idx, keys)| {
let config = NodeConfig {
listen_address: peers[idx].parse().unwrap(),
external_address: peers[idx].clone(),
network: NetworkConfiguration::default(),
consensus: consensus.clone(),
connect_list: ConnectListConfig::from_validator_keys(
&consensus.validator_keys,
&peers,
),
api: NodeApiConfig::default(),
mempool: MemoryPoolConfig::default(),
thread_pool_size: None,
};
(config, keys)
})
.collect::<Vec<_>>()
}
#[cfg(test)]
mod tests {
use exonum::merkledb::TemporaryDB;
use super::*;
#[test]
fn test_good_internal_events_config() {
let db = TemporaryDB::new();
let (node_cfg, node_keys) = generate_testnet_config(1, 16_500).pop().unwrap();
NodeBuilder::new(db, node_cfg, node_keys);
}
#[test]
#[should_panic(expected = "internal_events_capacity(0) must be strictly larger than 2")]
fn test_bad_internal_events_capacity_too_small() {
let db = TemporaryDB::new();
let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500).pop().unwrap();
node_cfg
.mempool
.events_pool_capacity
.internal_events_capacity = 0;
NodeBuilder::new(db, node_cfg, node_keys);
}
#[test]
#[should_panic(expected = "network_requests_capacity(0) must be strictly larger than 0")]
fn test_bad_network_requests_capacity_too_small() {
let db = TemporaryDB::new();
let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500)[0].clone();
node_cfg
.mempool
.events_pool_capacity
.network_requests_capacity = 0;
NodeBuilder::new(db, node_cfg, node_keys);
}
#[test]
#[should_panic(expected = "must be smaller than 65536")]
fn test_bad_internal_events_capacity_too_large() {
let accidental_large_value = usize::max_value();
let db = TemporaryDB::new();
let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500).pop().unwrap();
node_cfg
.mempool
.events_pool_capacity
.internal_events_capacity = accidental_large_value;
NodeBuilder::new(db, node_cfg, node_keys);
}
#[test]
#[should_panic(expected = "must be smaller than 65536")]
fn test_bad_network_requests_capacity_too_large() {
let accidental_large_value = usize::max_value();
let db = TemporaryDB::new();
let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500)[0].clone();
node_cfg
.mempool
.events_pool_capacity
.network_requests_capacity = accidental_large_value;
NodeBuilder::new(db, node_cfg, node_keys);
}
#[test]
fn flush_pool_strategy_is_serializable() {
let mut mempool_config = MemoryPoolConfig::default();
let s = toml::to_string(&mempool_config).unwrap();
let restored: MemoryPoolConfig = toml::from_str(&s).unwrap();
assert_eq!(restored, mempool_config);
mempool_config.flush_pool_strategy = FlushPoolStrategy::Never;
let s = toml::to_string(&mempool_config).unwrap();
let restored: MemoryPoolConfig = toml::from_str(&s).unwrap();
assert_eq!(restored, mempool_config);
let config_without_strategy = r#"
[events_pool_capacity]
network_requests_capacity = 512
network_events_capacity = 512
internal_events_capacity = 128
api_requests_capacity = 1024
"#;
let restored: MemoryPoolConfig = toml::from_str(config_without_strategy).unwrap();
assert_eq!(restored, MemoryPoolConfig::default());
}
}